1

I have a parquet folder created with dask containing multiple files of about 100MB each. When I load the dataframe with df = dask.dataframe.read_parquet(path_to_parquet_folder), and run any sort of computation (such as df.describe().compute()), my kernel crashes.

Things I have noticed:

  • CPU usage (about 100%) indicates that multithreading is not used
  • memory usage shoots way past the size of a single file
  • the kernel crashes after system memory usage approaches 100%

EDIT:

I tried to create a reproducible example, without success, but I discovered some other oddities, seemingly all related to the newer pandas dtypes that I'm using:

import pandas as pd
from dask.diagnostics import ProgressBar
ProgressBar().register()
from dask.diagnostics import ResourceProfiler
rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd

# generate dataframe with 3 different nullable dtypes and n rows
n = 10000000
test = pd.DataFrame({
    1:pd.Series(['a', pd.NA]*n, dtype = pd.StringDtype()), 
    2:pd.Series([1, pd.NA]*n, dtype = pd.Int64Dtype()),
    3:pd.Series([0.56, pd.NA]*n, dtype = pd.Float64Dtype())
})

dd_df = dd.from_pandas(test, npartitions = 2) # convert to dask df

dd_df.to_parquet('test.parquet') # save as parquet directory

dd_df = dd.read_parquet('test.parquet') # load files back

dd_df.mean().compute() # compute something
dd_df.describe().compute() # compute something
dd_df.count().compute() # compute something
dd_df.max().compute() # compute something

Output, respectively:

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

Kernel appears to have died.

KeyError: "None of [Index(['2', '1', '3'], dtype='object')] are in the [columns]"

It seems that the dtypes are preserved even throughout the parquet IO, but dask has some trouble actually doing anything with these columns.

Python version: 3.9.7 dask version: 2021.11.2

7
  • Do you instantiate the cluster manually? Also, Parquet file size can be misleading due to compression. Commented Dec 12, 2021 at 11:08
  • What do you mean by "instantiate the cluster"? I am running dask locally. The Parquet folder contains a metadata file. I can use df.get_partition(0).compute() without problems, the memory usage of a single partition is about 500MB. Commented Dec 12, 2021 at 11:12
  • 1
    I totally sympathize! But that's not typical dask.dataframe behavior - I use it all the time without this issue. So unless we see your code and setup it's hard for us to know what's going on, especially given the complexity of the issue. Thanks :) Commented Dec 13, 2021 at 17:34
  • 1
    Does this happen for all methods, or just describe? I ask, because that includes std.dev and percentiles, which are not trivial to calculate. What is the total size of the dataset, how many files? Commented Dec 13, 2021 at 17:49
  • 1
    It's a minor thing, but naming the columns using strings rather than ints, will remove the KeyError. Commented Dec 17, 2021 at 5:20

1 Answer 1

1

It seems the main error is due to NAType which is not yet fully supported by numpy (version 1.21.4):

~/some_env/python3.8/site-packages/numpy/core/_methods.py in _var(a, axis, dtype, out, ddof, keepdims, where)
    240     # numbers and complex types with non-native byteorder
    241     else:
--> 242         x = um.multiply(x, um.conjugate(x), out=x).real
    243 
    244     ret = umr_sum(x, axis, dtype, out, keepdims=keepdims, where=where)

TypeError: loop of ufunc does not support argument 0 of type NAType which has no callable conjugate method

As a workaround, casting columns to float will compute the descriptives. Note that to avoid KeyError the column names are given as strings rather than int.

import pandas as pd
from dask.diagnostics import ProgressBar

ProgressBar().register()
from dask.diagnostics import ResourceProfiler

rprof = ResourceProfiler(dt=0.5)
import dask.dataframe as dd

# generate dataframe with 3 different nullable dtypes and n rows
n = 1000

# note that column names are changed to strings rather than ints
test = pd.DataFrame(
    {
        "1": pd.Series(["a", pd.NA] * n, dtype=pd.StringDtype()),
        "2": pd.Series([1, pd.NA] * n, dtype=pd.Int64Dtype()),
        "3": pd.Series([0.56, pd.NA] * n, dtype=pd.Float64Dtype()),
    }
)

dd_df = dd.from_pandas(test, npartitions=2)  # convert to dask df

dd_df.to_parquet("test.parquet", engine="fastparquet")  # save as parquet directory

dd_df = dd.read_parquet("test.parquet", engine="fastparquet")  # load files back

dd_df.mean().compute()  # compute something
dd_df.astype({"2": "float"}).describe().compute()  # compute something
dd_df.count().compute()  # compute something
dd_df.max().compute()  # compute something
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks @Sultan for providing the workaround! I've also opened up an issue for this.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.