1

I have a large (10s of GB) CSV file that I want to load into dask, and for each row, perform some computation. I also want to write the results of the manipulated CSV into BigQuery, but it'd be better to batch network requests to BigQuery in groups of say, 10,000 rows each, so I don't incur network overhead per row.

I've been looking at dask delayed and see that you can create an arbitrary computation graph, but I'm not sure if this is the right approach: how do I collect and fire off intermediate computations based on some group size (or perhaps time elapsed). Can someone provide a simple example on that? Say for simplicity we have these functions:

def change_row(r):
    # Takes 10ms
    r = some_computation(r)
    return r

def send_to_bigquery(rows): 
    # Ideally, in large-ish groups, say 10,000 rows at a time
    make_network_request(rows)

# And here's how I'd use it
import dask.dataframe as dd
df = dd.read_csv('my_large_dataset.csv') # 20 GB
# run change_row(r) for each r in df
# run send_to_big_query(rows) for each appropriate size group based on change_row(r)

Thanks!

1 Answer 1

2

The easiest thing that you can do is provide a block size parameter to read_csv, which will get you approximately the right number of rows per block. You may need to measure some of your data or experiment to get this right.

The rest of your task will work the same way as any other "do this generic thing to blocks of data-frame": the `map_partitions' method (docs).

def alter_and_send(df):
    rows = [change_row(r) for r in df.iterrows()]
    send_to_big_query(rows)
    return df

df.map_partitions(alter_and_send)

Basically, you are running the function on each piece of the logical dask dataframe, which are real pandas dataframes. You may actually want map, apply or other dataframe methods in the function.

This is one way to do it - you don't really need the "output" of the map, and you could have used to_delayed() instead.

Sign up to request clarification or add additional context in comments.

1 Comment

Follow-up: is there a way to get the partition number/index in the alter_and_send function?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.