0

First, I'm writing Spark df named calendar as a parquet file named cal.

calendar.write.parquet("/user/vusal.babashov/dataset/cal", mode="overwrite")

Then, I'm copying it from Hadoop to my personal folder.

!hdfs dfs -copyToLocal -f /user/vusal.babashov/dataset/cal /home/vusal.babashov/dataset

Finally, I'm trying to read the parquet into Dask.

import dask.dataframe as dd

df = dd.read_parquet('/home/vusal.babashov/dataset/cal')

This last step is where I keep getting OSError: [Errno 22] Invalid argument error. The path is correct and the parquet file named cal is there (I can confirm).

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-12-372cb3d97d10> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    325         chunksize=chunksize,
    326         aggregate_files=aggregate_files,
--> 327         **kwargs,
    328     )
    329 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
    732         # correspond to a row group (populated below).
    733         parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734             fs, paths, gather_statistics, **kwargs
    735         )
    736 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
    163         elif gather_statistics is not False:
    164             # Scan every file
--> 165             pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
    166         else:
    167             # Use _common_metadata file if it is available.

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
     91         if isinstance(fn, (tuple, list)):
     92             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93                                                open_with=open_with, root=root)
     94             if basepath:
     95                 self.fn = join_path(basepath, '_metadata')  # effective file

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
    119                 self.fn = join_path(fn)
    120                 with open_with(fn, 'rb') as f:
--> 121                     self._parse_header(f, verify)
    122             elif "*" in fn or fs.isdir(fn):
    123                 fn2 = join_path(fn, '_metadata')

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
    161                 if verify:
    162                     assert f.read(4) == b'PAR1'
--> 163                 f.seek(-8, 2)
    164                 head_size = struct.unpack('<i', f.read(4))[0]
    165                 if verify:

OSError: [Errno 22] Invalid argument

When I run df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow'), I get the following error


RuntimeError                              Traceback (most recent call last)
<ipython-input-1-9b03dc4d018b> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    290 
    291     if isinstance(engine, str):
--> 292         engine = get_engine(engine)
    293 
    294     if hasattr(path, "name"):

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in get_engine(engine)
    917 
    918         if pa_version < parse_version("0.13.1"):
--> 919             raise RuntimeError("PyArrow version >= 0.13.1 required")
    920 
    921         if engine == "pyarrow-dataset" and pa_version.major >= 1:

RuntimeError: PyArrow version >= 0.13.1 required

I'm using Spark 2.4 and Python 3.7 on the server. The PyArrow version I have is 0.11.1. Upgrading to other versions cause inconsistent environment. According to Dask documentation, default engine is auto which selects fastparquet (I have it installed). When I run df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto'), I get the same OSError: [Errno 22] Invalid argument


--
OSError                                   Traceback (most recent call last)
<ipython-input-8-361b3123f3d5> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    325         chunksize=chunksize,
    326         aggregate_files=aggregate_files,
--> 327         **kwargs,
    328     )
    329 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
    732         # correspond to a row group (populated below).
    733         parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734             fs, paths, gather_statistics, **kwargs
    735         )
    736 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
    163         elif gather_statistics is not False:
    164             # Scan every file
--> 165             pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
    166         else:
    167             # Use _common_metadata file if it is available.

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
     91         if isinstance(fn, (tuple, list)):
     92             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93                                                open_with=open_with, root=root)
     94             if basepath:
     95                 self.fn = join_path(basepath, '_metadata')  # effective file

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
    119                 self.fn = join_path(fn)
    120                 with open_with(fn, 'rb') as f:
--> 121                     self._parse_header(f, verify)
    122             elif "*" in fn or fs.isdir(fn):
    123                 fn2 = join_path(fn, '_metadata')

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
    161                 if verify:
    162                     assert f.read(4) == b'PAR1'
--> 163                 f.seek(-8, 2)
    164                 head_size = struct.unpack('<i', f.read(4))[0]
    165                 if verify:

OSError: [Errno 22] Invalid argument

Parquet Files Snapshot

5
  • Why are you using hdfs commands? If you're running a Spark driver in a Jupyter notebook, you can use Spark to write to local disk using file:// prefix. Also, Linux has no /user directory, only HDFS does. Did you mean to use /home? Commented Apr 3, 2022 at 14:49
  • Yes, that should be /home. I tried file:// but I get the same error. I'm running this on a remote server. I'm just copying the file from HDFS to EdgeNode. Commented Apr 3, 2022 at 14:51
  • Like I said, calendar.write.parquet("file:///home/vusal.babashov/dataset/cal", mode="overwrite") should work, too. Don't need hdfs commands Commented Apr 3, 2022 at 14:54
  • Sorry, I had to make a few changes on my end. The error is actually OSError: [Errno 22] Invalid argument. Commented Apr 3, 2022 at 15:11
  • Well, error is self descriptive. I'm not sure what inconsistent environment issues you're referring to Commented Apr 3, 2022 at 20:05

1 Answer 1

0

The invalid argument means that the code is trying to seek() to a location in a file that doesn't exist. When reading the metadata footer of a parquet file, the last 8 bytes contain the size of the footer, which is why you seek the (-8, 2) (8 bytes before the end of a file).

Very likely the situation is this: spark has written some extra files into the directory that are not data files, such as the weird zero-length "_SUCCESS" file and any checksums; but has not written a _metadata file to tell dask which files are to be regarded as data.

This issue of filtering for real files I very probably fixed in latest dask and fastparquet (and probably arrow). If you cannot update, then you will need to remove the files that are causing the confusion.

Sign up to request clarification or add additional context in comments.

7 Comments

Thank for the reply. I've attached the snapshot of the parquet files here: i.sstatic.net/2M35t.jpg. Does it seem okay to you? Also, I'm using Spark 2.4.0 and Python 3.7 on the server. Are Dask engines ,i.e., fastparquet or pyarrow version compatible?
Package versions are as follows: dask : 2021.7.2 , fastparquet: 0.7.0, pyarrow : 0.11.1
dask dropped 3.7 sometime recently, but fastparquet still supports it.
Just FYI, I copied the parquet folder to my local PC, I was able to read it into dask using pyarrow but not fastparquet in Python 3.9.7 (conda base).
do you know if this is still the case for fastparquet 0.8.1?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.