1

Could someone point out what I did wrong with following dask implementation, since it doesnt seems to use the multi cores.

[ Updated with reproducible code]

The code that uses dask :

bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats(bookingID):
    curr_book_data = book_data
    row = list()
    row.append(bookingID)
    row.append(curr_book_data.min())
    row.append(curr_book_data.max())
    row.append(curr_book_data.std())
    row.append(curr_book_data.mean())

    return row


calculate_feature_stats = dask.delayed(calculate_feature_stats)


rows = []


for bookid in bookingID.tolist():
    row = calculate_feature_stats(bookid)
    rows.append(row)

  start = time.time()
  rows = dask.persist(*rows)
  end = time.time()
  print(end - start)  # Execution time = 16s in my machine

Code with normal implementation without dask :

bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))

def calculate_feature_stats_normal(bookingID):
    curr_book_data = book_data
    row = list()
    row.append(bookingID)
    row.append(curr_book_data.min())
    row.append(curr_book_data.max())
    row.append(curr_book_data.std())
    row.append(curr_book_data.mean())
   return row


rows = []
start = time.time()
for bookid in bookingID.tolist():
    row = calculate_feature_stats_normal(bookid)
    rows.append(row)
end = time.time()
print(end - start)  # Execution time = 4s in my machine

So, without dask actually faster, how is that possible?

4
  • 1
    Do you mind to produce a mcve? Commented Jun 10, 2019 at 14:30
  • @rpanai added reproducible code above. Commented Jun 12, 2019 at 9:17
  • In general there is about 1ms overhead per task using dask see doc. So if you do the math with your 1e5 tasks, the 1e-3 seconds overhead and 8 cores you should have ~12.5 seconds of just overhead. Commented Jun 13, 2019 at 13:14
  • Then I can't understand which one is your actual problem. Do you need to calculate stats for different files? Or do you have a big df with a bookingID as column? Commented Jun 13, 2019 at 13:16

1 Answer 1

1

Answer

Extended comment. You should consider that using dask there is about 1ms overhead (see doc) so if your computation is shorther than that then dask It isn't worth the trouble.

Going to your specific question I can think of two possible real world scenario: 1. A big dataframe with a column called bookingID and another value 2. A different file for every bookingID

In the second case you can play from this answer while for the first case you can proceed as following:

import dask.dataframe as dd
import numpy as np
import pandas as pd



# create dummy df
df = []
for i in range(10_000):
    df.append(pd.DataFrame({"id":i,
                            "value":np.random.rand(1000)}))
df = pd.concat(df, ignore_index=True)
df = df.sample(frac=1).reset_index(drop=True)
df.to_parquet("df.parq")

Pandas

%%time
df = pd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":{"min", "max", "std", "mean"}})
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)

CPU times: user 1.65 s, sys: 316 ms, total: 1.96 s
Wall time: 1.08 s

Dask

%%time
df = dd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":["min", "max", "std", "mean"]}).compute()
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)

CPU times: user 4.94 s, sys: 427 ms, total: 5.36 s
Wall time: 3.94 s

Final thoughts

In this situation dask starts to make sense if the df doesn't fit in memory.

Sign up to request clarification or add additional context in comments.

1 Comment

I think you are right, when I added time.sleep(0.06) to the function, as my real function took around that time, dask execution is almost 8 times faster.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.