I am calling dask.dataframe.read_sql_table() on a table include_retweets with the following types
Table "public.includes_retweets"
Column | Type |
-------------------------+---------+
level_0 | integer |
reshared_message_id | integer |
resharing_message_id | integer |
resharing_user_id | integer |
index | integer |
original_message_id | integer |
political_link | text |
link_slant | text |
political_phrases | text[] |
phrase_slant | text |
slant_overall | text |
original_user_id | integer |
intermediate_user_id | integer |
intermediate_message_id | integer |
When I don't specify the meta parameter, Dask reads in the data as
level_0 reshared_message_id resharing_message_id resharing_user_id index original_message_id political_link link_slant political_phrases phrase_slant slant_overall original_user_id intermediate_user_id
intermediate_message_id
8135 0 209789789 8135 209780224 26 209780224 news.yahoo.com democrat None None democrat 3178208 209780224
8135 1 209789789 8135 209780224 27 209780224 news.yahoo.com democrat None None democrat 3178208 209780224
1785557 0 209829307 1785557 209828919 94 209828919 None None [{, ", T, R, U, M, P, , 2, 0, 2, 0, ", }] republican republican 237437 209828919
In what is returned by Dask, I want the 3rd row to have [{TRUMP2020}] for political_phrases.
How do I specify meta correctly? I have tried (per https://github.com/dask/dask/issues/4723)
meta3={'level_0': [1], 'reshared_message_id': [1], 'resharing_message_id': [1], 'resharing_user_id': [1], 'index': [1], 'original_message_id': [1], 'political_link': ["Text"], 'link_slant': ["Text"], 'political_phrases': [["Text", "secondText"]], 'phrase_slant': ["Text"], 'slant_overall': ["Text"], 'original_user_id': [1], 'intermediate_user_id': [1], 'intermediate_message_id': [1]}
meta4 = pd.DataFrame(meta3)[0:0]
and (per Create Empty Dataframe in Pandas specifying column types)
dtypes=np.dtype([('level_0',int), ('reshared_message_id',int), ('resharing_message_id',int) , ('resharing_user_id',int) , ('index',int) , ('original_message_id',int) , ('political_link',str), ('link_slant',str) ,('political_phrases', np.str_), ('phrase_slant', str), ('slant_overall', str), ('original_user_id',int), ('intermediate_user_id',int) , ('intermediate_message_id',int),])
meta4 = np.empty(0, dtype=dtypes)
And then passing these values or variations of these values in for meta here
dask.dataframe.read_sql_table(includes_retweets_table, db_uri, meta=meta4, index_col='intermediate_message_id', npartitions=12)
When I print what is returned from dask.dataframe.read_sql_table(), I get
Dask DataFrame Structure:
level_0 reshared_message_id resharing_message_id resharing_user_id index original_message_id political_link link_slant political_phrases phrase_slant slant_overall original_user_id intermediate_user_id intermediate_message_id
npartitions=12
8135.0 int64 int64 int64 int64 int64 int64 object object object object object int64 int64 int64
17494966.0 ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
192363276.0 ... ... ... ... ... ... ... ... ... ... ... ... ... ...
209850107.0 ... ... ... ... ... ... ... ... ... ... ... ... ... ...
When I try to execute .compute() I get the error below
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function check_meta at 0x7f8b040ca290>, (<function apply at 0x7f8b02de3830>, <function _read_sql_chunk at 0x7f8b041dc0e0>, [<sqlalchemy.sql.selectable.Select at 0x7f8b04353710; Select object>, 'postgresql+psycopg2://postgres:password@localhost:5432/stocktwits', Empty DataFrame
Columns: [level_0, reshared_message_id, resharing_message_id, resharing_user_id, index, original_message_id, political_link, link_slant, political_phrases, phrase_slant, slant_overall, original_user_id, intermediate_user_id, intermediate_message_id]
Index: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'intermediate_message_id']])), Empty DataFrame
Columns: [level_0, reshared_message_id, resharing_message_id, resharing_user_id, index, original_message_id, political_link, link_slant, political_phrases, phrase_slant, slant_overall, original_user_id, intermediate_user_id, intermediate_message_id]
Index: [], 'from_delayed'))
kwargs: {}
Exception: KeyError('Only a column name can be used for the key in a dtype mappings argument.')
Traceback (most recent call last):
File "interpreterScript.py", line 359, in <module>
main()
File "interpreterScript.py", line 162, in main
print(includes_retweets.compute())
File "/opt/anaconda3/lib/python3.7/site-packages/dask/base.py", line 166, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/dask/base.py", line 444, in compute
results = schedule(dsk, keys, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 2682, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 1982, in gather
asynchronous=asynchronous,
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 832, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 1841, in _gather
raise exception.with_traceback(traceback)
File "/opt/anaconda3/lib/python3.7/site-packages/dask/utils.py", line 31, in apply
return func(*args, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 216, in _read_sql_chunk
return df.astype(meta.dtypes.to_dict(), copy=False)
File "/opt/anaconda3/lib/python3.7/site-packages/pandas/core/generic.py", line 5857, in astype
"Only a column name can be used for the "
KeyError: 'Only a column name can be used for the key in a dtype mappings argument.'
This error doesn't make sense to me because I am including all of the column names from the table include_retweets with the correct data types.