30

What is the proper and fastest way to read Cassandra data into pandas? Now I use the following code but it's very slow...

import pandas as pd

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

df = pd.DataFrame()

for row in session.execute(sql_query):
    df = df.append(pd.DataFrame(row, index=[0]))

df = df.reset_index(drop=True).fillna(pd.np.nan)

Reading 1000 rows takes 1 minute, and I have a "bit more"... If I run the same query eg. in DBeaver, I get the whole results (~40k rows) within a minute.

Thank you!!!

3
  • If the output of session.execute(sql_query) is a list of dicts, I'd try just df = pd.DataFrame(session.execute(sql_query)) or run pd.DataFrame on some portion of this list. Appending rows to a data frame one by one is inefficient. Commented Dec 20, 2016 at 23:43
  • The result of session.execute(sql_query) is a special <cassandra.cluster.ResultSet at 0x1b4b61d0> iterable object. Its rows can be tuples, named_tuples or dictionaries. Commented Dec 21, 2016 at 10:45
  • I see. Still, it's better to convert it first to a list, for example lst=[]; for row in session...: lst.append(row) if nothing else works. And then concatenate the results: df = pd.concat(lst). This way you could avoid costly 40k calls to pd.DataFrame.append. Commented Dec 21, 2016 at 12:55

6 Answers 6

58

I got the answer at the official mailing list (it works perfectly):

Hi,

try to define your own pandas row factory:

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT ..."
rslt = session.execute(query, timeout=None)
df = rslt._current_rows

That's the way i do it - an it should be faster...

If you find a faster method - i'm interested in :)

Michael

Sign up to request clarification or add additional context in comments.

3 Comments

This should be marked as the answer, It is neat, concise and versatile.
Works like a charm even for the stranger cassandra types
Works well for reads, but can have problems when writing data back because pandas has to guess dtypes. Ex: I read an int column with many empty rows, pandas guessed float, then CQL gave an error on insert into similar table because the column was the wrong type.
17

What I do (in python 3) is :

query = "SELECT ..."
df = pd.DataFrame(list(session.execute(query)))

Comments

1

I used the row_factory solution for a few weeks, then hit datatype problems when trying to write the dataframe into another table with identical structure. Pandas guessed float datatype for an int column with many empty fields. During write, the cassandra driver complained about type mismatch.

TypeError: Received an argument of invalid type for column "frequency". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'float'>; (required argument is not an integer)

Pandas int columns can't support NaN or None, so best option is probably make that column a python object.

A quick hack was tweaking pandas_factory to avoid pandas inference. Not an ideal blanket policy:

def pandas_factory(colnames, rows):
    df = pd.DataFrame(rows, columns=colnames, dtype=object)
    return df

I also found that I can do: df = pandas.DataFrame(result.all()) if I don't want the row factory.

As an interim solution, I'm wishing for a robust result_to_df() function that uses result.column_types (ex: cassandra.cqltypes.Int32Type) and makes good guesses about translating those to python objects or numpy types. Will edit this answer if/when I get time to write that. Pandas read_cql & to_cql would be ideal, but probably beyond my bandwidth.

Comments

0

I have been working on moving data from Cassandra to mssql, and used answers given here for the reference, I am able to move data but my source table in cassandra is huge and my query is getting timeout error from cassandra , the thing is we cannot increase the timeout and I am only left with the option of selecting rows in batches in my query, my code is also converting the cassandra collection data types to str as I want to insert those in mssql and then parse it, please let me know if anyone faces similar issue, the code I built is given below:

import sys
import pandas as pd
import petl as etl
import pyodbc
import sqlalchemy
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from sqlalchemy import *
from cassandra.query import SimpleStatement


def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)
    engine = sqlalchemy.create_engine('sql_server_connection string')

cluster = Cluster(
    contact_points=['cassandra_host'], 
    auth_provider = PlainTextAuthProvider(username='username', password='passwrd')
)

session = cluster.connect('keyspace',wait_for_all_pools=True)

session.row_factory = pandas_factory
request_timeout = 60000
query = "SELECT * FROM cassandratable"
statement = SimpleStatement(query, fetch_size=5000) 
rows = session.execute(statement)

df = rows._current_rows
df['attributes'] = df.attributes.astype(str)
df['attributesgenerated'] = df.attributesgenerated.astype(str)
df['components'] = df.components.astype(str)
df['distributioncenterinfo'] = df.distributioncenterinfo.astype(str)
df['images'] = df.images.astype(str)
df['itemcustomerzonezoneproductids'] = 
df.itemcustomerzonezoneproductids.astype(str)
df['itempodconfigids'] = df.itempodconfigids.astype(str)
df['keywords'] = df.keywords.astype(str)
df['validationmessages'] = df.validationmessages.astype(str)
df['zones'] = df.zones.astype(str)
#error_bad_lines=False
#print(df)
df.to_sql(
           name='mssql_table_name',
           con=engine,
           index=False,
           if_exists='append',
           chunksize=1
         )

Comments

0

Fastest way to read Cassandra data into pandas with automatic iteration of pages. Create dictionary and add each to it by automatically iterating all pages. Then, create dataframe with this dictionary.

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

dictionary ={"column1":[],"column2":[]}

for row in session.execute(sql_query):
    dictionary["column1"].append(row.column1)
    dictionary["column1"].append(row.column1)

df = pd.DataFrame(dictionary)

Comments

0

Simply you can run a loop inside pandas DataFrame get job done!!

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
        auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
data = session.execute("SELECT * FROM <table_name>;")

df = pd.DataFrame([d for d in data])

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.