1

I am calling dask.dataframe.read_sql_table() on a table include_retweets with the following types

Table "public.includes_retweets"
         Column          |  Type   | 
-------------------------+---------+
 level_0                 | integer |
 reshared_message_id     | integer |
 resharing_message_id    | integer |
 resharing_user_id       | integer |
 index                   | integer |
 original_message_id     | integer |
 political_link          | text    |
 link_slant              | text    |
 political_phrases       | text[]  |
 phrase_slant            | text    |
 slant_overall           | text    |
 original_user_id        | integer |
 intermediate_user_id    | integer |
 intermediate_message_id | integer |

When I don't specify the meta parameter, Dask reads in the data as

                         level_0 reshared_message_id resharing_message_id resharing_user_id  index  original_message_id  political_link  link_slant                                  political_phrases phrase_slant slant_overall  original_user_id  intermediate_user_id
intermediate_message_id                                                                                                                                                                                                                                                  
8135                           0           209789789                 8135         209780224     26            209780224  news.yahoo.com    democrat                                               None         None      democrat           3178208             209780224
8135                           1           209789789                 8135         209780224     27            209780224  news.yahoo.com    democrat                                               None         None      democrat           3178208             209780224
1785557                        0           209829307              1785557         209828919     94            209828919            None        None         [{, ", T, R, U, M, P,  , 2, 0, 2, 0, ", }]   republican    republican            237437             209828919

In what is returned by Dask, I want the 3rd row to have [{TRUMP2020}] for political_phrases.

How do I specify meta correctly? I have tried (per https://github.com/dask/dask/issues/4723)

meta3={'level_0': [1], 'reshared_message_id': [1], 'resharing_message_id': [1], 'resharing_user_id': [1], 'index': [1], 'original_message_id': [1], 'political_link': ["Text"], 'link_slant': ["Text"], 'political_phrases': [["Text", "secondText"]], 'phrase_slant': ["Text"], 'slant_overall': ["Text"], 'original_user_id': [1], 'intermediate_user_id': [1], 'intermediate_message_id': [1]}
meta4 = pd.DataFrame(meta3)[0:0]

and (per Create Empty Dataframe in Pandas specifying column types)

dtypes=np.dtype([('level_0',int), ('reshared_message_id',int), ('resharing_message_id',int) , ('resharing_user_id',int) , ('index',int) , ('original_message_id',int) , ('political_link',str), ('link_slant',str) ,('political_phrases', np.str_), ('phrase_slant', str), ('slant_overall', str), ('original_user_id',int), ('intermediate_user_id',int) , ('intermediate_message_id',int),])
meta4 = np.empty(0, dtype=dtypes)

And then passing these values or variations of these values in for meta here

dask.dataframe.read_sql_table(includes_retweets_table, db_uri, meta=meta4, index_col='intermediate_message_id', npartitions=12)

When I print what is returned from dask.dataframe.read_sql_table(), I get

Dask DataFrame Structure:
               level_0 reshared_message_id resharing_message_id resharing_user_id  index original_message_id political_link link_slant political_phrases phrase_slant slant_overall original_user_id intermediate_user_id intermediate_message_id
npartitions=12                                                                                                                                                                                                                                   
8135.0           int64               int64                int64             int64  int64               int64         object     object            object       object        object            int64                int64                   int64
17494966.0         ...                 ...                  ...               ...    ...                 ...            ...        ...               ...          ...           ...              ...                  ...                     ...
...                ...                 ...                  ...               ...    ...                 ...            ...        ...               ...          ...           ...              ...                  ...                     ...
192363276.0        ...                 ...                  ...               ...    ...                 ...            ...        ...               ...          ...           ...              ...                  ...                     ...
209850107.0        ...                 ...                  ...               ...    ...                 ...            ...        ...               ...          ...           ...              ...                  ...                     ...

When I try to execute .compute() I get the error below

distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function check_meta at 0x7f8b040ca290>, (<function apply at 0x7f8b02de3830>, <function _read_sql_chunk at 0x7f8b041dc0e0>, [<sqlalchemy.sql.selectable.Select at 0x7f8b04353710; Select object>, 'postgresql+psycopg2://postgres:password@localhost:5432/stocktwits', Empty DataFrame
Columns: [level_0, reshared_message_id, resharing_message_id, resharing_user_id, index, original_message_id, political_link, link_slant, political_phrases, phrase_slant, slant_overall, original_user_id, intermediate_user_id, intermediate_message_id]
Index: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'intermediate_message_id']])), Empty DataFrame
Columns: [level_0, reshared_message_id, resharing_message_id, resharing_user_id, index, original_message_id, political_link, link_slant, political_phrases, phrase_slant, slant_overall, original_user_id, intermediate_user_id, intermediate_message_id]
Index: [], 'from_delayed'))
kwargs:    {}
Exception: KeyError('Only a column name can be used for the key in a dtype mappings argument.')

Traceback (most recent call last):
  File "interpreterScript.py", line 359, in <module>
    main()
  File "interpreterScript.py", line 162, in main
    print(includes_retweets.compute())
  File "/opt/anaconda3/lib/python3.7/site-packages/dask/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/anaconda3/lib/python3.7/site-packages/dask/base.py", line 444, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 2682, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 1982, in gather
    asynchronous=asynchronous,
  File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 832, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 1841, in _gather
    raise exception.with_traceback(traceback)
  File "/opt/anaconda3/lib/python3.7/site-packages/dask/utils.py", line 31, in apply
    return func(*args, **kwargs)
  File "/opt/anaconda3/lib/python3.7/site-packages/dask/dataframe/io/sql.py", line 216, in _read_sql_chunk
    return df.astype(meta.dtypes.to_dict(), copy=False)
  File "/opt/anaconda3/lib/python3.7/site-packages/pandas/core/generic.py", line 5857, in astype
    "Only a column name can be used for the "
KeyError: 'Only a column name can be used for the key in a dtype mappings argument.'

This error doesn't make sense to me because I am including all of the column names from the table include_retweets with the correct data types.

1 Answer 1

1

Dask dataframe uses Pandas for data types. Pandas uses the Python object type for anything complex, so you want an entry like {"political_phrases": object}

Sign up to request clarification or add additional context in comments.

1 Comment

When I do {"political_phrases": object}, Dask reads in political phrases as a string. When I attempt to cast it as an ARRAY(Text) in dd.to_sql(), I get a value of something like {"\"",$,S,S,C," ",s,o,m,e," ",2,0,1,7," ",1,x,.,"\""} in Postgres. A workaround for me was doing a dd.to_sql() with no explicit data types (so things are read as strings) and then recasting as a text[] in Postgres. Is there anything more explicit than {"political_phrases": object} to tell Dask it's an array?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.