First, I'm writing Spark df named calendar as a parquet file named cal.
calendar.write.parquet("/user/vusal.babashov/dataset/cal", mode="overwrite")
Then, I'm copying it from Hadoop to my personal folder.
!hdfs dfs -copyToLocal -f /user/vusal.babashov/dataset/cal /home/vusal.babashov/dataset
Finally, I'm trying to read the parquet into Dask.
import dask.dataframe as dd
df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
This last step is where I keep getting OSError: [Errno 22] Invalid argument error. The path is correct and the parquet file named cal is there (I can confirm).
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-12-372cb3d97d10> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325 chunksize=chunksize,
326 aggregate_files=aggregate_files,
--> 327 **kwargs,
328 )
329
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732 # correspond to a row group (populated below).
733 parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734 fs, paths, gather_statistics, **kwargs
735 )
736
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163 elif gather_statistics is not False:
164 # Scan every file
--> 165 pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166 else:
167 # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91 if isinstance(fn, (tuple, list)):
92 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93 open_with=open_with, root=root)
94 if basepath:
95 self.fn = join_path(basepath, '_metadata') # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119 self.fn = join_path(fn)
120 with open_with(fn, 'rb') as f:
--> 121 self._parse_header(f, verify)
122 elif "*" in fn or fs.isdir(fn):
123 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161 if verify:
162 assert f.read(4) == b'PAR1'
--> 163 f.seek(-8, 2)
164 head_size = struct.unpack('<i', f.read(4))[0]
165 if verify:
OSError: [Errno 22] Invalid argument
When I run df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow'), I get the following error
RuntimeError Traceback (most recent call last)
<ipython-input-1-9b03dc4d018b> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
290
291 if isinstance(engine, str):
--> 292 engine = get_engine(engine)
293
294 if hasattr(path, "name"):
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in get_engine(engine)
917
918 if pa_version < parse_version("0.13.1"):
--> 919 raise RuntimeError("PyArrow version >= 0.13.1 required")
920
921 if engine == "pyarrow-dataset" and pa_version.major >= 1:
RuntimeError: PyArrow version >= 0.13.1 required
I'm using Spark 2.4 and Python 3.7 on the server. The PyArrow version I have is 0.11.1. Upgrading to other versions cause inconsistent environment. According to Dask documentation, default engine is auto which selects fastparquet (I have it installed). When I run df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto'), I get the same OSError: [Errno 22] Invalid argument
--
OSError Traceback (most recent call last)
<ipython-input-8-361b3123f3d5> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325 chunksize=chunksize,
326 aggregate_files=aggregate_files,
--> 327 **kwargs,
328 )
329
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732 # correspond to a row group (populated below).
733 parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734 fs, paths, gather_statistics, **kwargs
735 )
736
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163 elif gather_statistics is not False:
164 # Scan every file
--> 165 pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166 else:
167 # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91 if isinstance(fn, (tuple, list)):
92 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93 open_with=open_with, root=root)
94 if basepath:
95 self.fn = join_path(basepath, '_metadata') # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119 self.fn = join_path(fn)
120 with open_with(fn, 'rb') as f:
--> 121 self._parse_header(f, verify)
122 elif "*" in fn or fs.isdir(fn):
123 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161 if verify:
162 assert f.read(4) == b'PAR1'
--> 163 f.seek(-8, 2)
164 head_size = struct.unpack('<i', f.read(4))[0]
165 if verify:
OSError: [Errno 22] Invalid argument
file://prefix. Also, Linux has no/userdirectory, only HDFS does. Did you mean to use/home?calendar.write.parquet("file:///home/vusal.babashov/dataset/cal", mode="overwrite")should work, too. Don't need hdfs commands