1

I'm attempting to load data into Cassandra using the python driver. The fastest I've been able to get is around 6k writes/second. My csv that I'm reading from has around 1.15 million rows leading to an overall insertion time of around 3 minutes and 10 seconds. I really need to get this time down to 2 minutes or less to keep up with data as it comes in.

My data consists of 1.15 million rows with 52 columns.

Currently I'm using the session.execute_async function to insert the data. Making changes to how many asnyc requests I allow at one time does seem to speed it up. It seems that blocking after about 5-6k requests leads to the fastest insertion rate.

I did attempt batch inserts but they were abysmally slow.

Here is my current method for inserting the data into Cassandra.

# insert data into cassandra table
execution_profile = ExecutionProfile(request_timeout=10)
profiles = {'node1': execution_profile}
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['11.111.11.11'], 9042, auth_provider=auth_provider, execution_profiles=profiles) 
session = cluster.connect() # connect to your keyspace

# Read csv rows into cassandra
count = 0
futures = []
with open('/massaged.csv') as f:
    next(f) #skip the header row
    for line in f:
        query = SimpleStatement("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,sfc_vis,sfc_gust,sfc_pres,sfc_hgt,sfc_tmp,sfc_snow_0Xacc,sfc_cnwat,sfc_weasd,sfc_snowc,sfc_snod,two_m_tmp,two_m_pot,two_m_spfh,two_m_dpt,two_m_rh,ten_m_ugrd,ten_m_vgrd,ten_m_wind_1hr_max,ten_m_maxuw_1hr_max,ten_m_maxvw_1hr_max,sfc_cpofp,sfc_prate,sfc_apcp_0Xacc,sfc_weasd_0Xacc,sfc_frozr_0Xacc,sfc_frzr_0Xacc,sfc_ssrun_1hr_acc,sfc_bgrun_1hr_acc,sfc_apcp_1hr_acc,sfc_weasd_1hr_acc,sfc_frozr_1hr_acc,sfc_csnow,sfc_cicep,sfc_cfrzr,sfc_crain,sfc_sfcr,sfc_fricv,sfc_shtfl,sfc_lhtfl,sfc_gflux,sfc_vgtyp,sfc_cape,sfc_cin,sfc_dswrf,sfc_dlwrf,sfc_uswrf,sfc_ulwrf,sfc_vbdsf,sfc_vddsf,sfc_hpbl) VALUES (%s)" %(line), consistency_level=ConsistencyLevel.ONE)
        futures.append(session.execute_async(query, execution_profile='node1'))
        count += 1
        if count % 5000 == 0:
            for f in futures:
                f.result() # blocks until remaining inserts are completed.
                futures = []
            print("rows processed: " + str(count))

# Catch any remaining async requests that haven't finished
for f in futures:
    f.result() # blocks until remaining inserts are completed.
print("rows processed: " + str(count))

I need to get my insertion time down to around 2 minutes or less (roughly 10K insertions per second). Should I be using multiprocessing to achieve this or am I using the execute_async function incorrectly?

UPDATE

As per Alex's suggestion, I attempted to implement a prepared statement. This is what I came up with but it seems to be significantly slower? Any thoughts on what I've done wrong?

hrrr_prepared = session.prepare("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,...,sfc_hpbl) VALUES (?, ..., ?)")

for row in range(0, len(data)):
    futures.append(session.execute_async(hrrr_prepared, tuple(data.iloc[row])))
    count += 1
    if count % 5000 == 0:
        for f in futures:
            f.result() # blocks until remaining inserts are completed.
            futures = []
        print("rows processed: " + str(count))

NOTE: I put the "..." in the prepared statement for readability, the actual code does not have that.

1 Answer 1

3

The big speedup should come from the use of the prepared statements instead of using SimpleStatement - for prepared statement it's parsed only once (outside of loop), and then only data is sent to server together with query ID. With SimpleStatement, query will be parsed each time.

Also, potentially you can improve throughput if you won't wait for all futures to completion, but have some kind of "counting semaphore" that won't allow you to exceed max number of "in-flight" requests, but you could send new request as soon as some of them are executed. I'm not Python expert, so can't say exactly how to do this, but you can look into Java implementation to understand an idea.

Sign up to request clarification or add additional context in comments.

3 Comments

Perhaps I'm doing it wrong but it seems like prepared statements slow it down quite a bit. I haven't attempted the counting semaphore idea yet because I'm not quite sure how, I can try to look into that though. I've updated my question with the prepared statement I put together.
that's really strange - usually the prepared statements are giving big jump in performmance
I'm sure you're correct, I think there may be something wrong in the way that I'm implementing it. I'll post back here if I come up with anything.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.