I'm attempting to load data into Cassandra using the python driver. The fastest I've been able to get is around 6k writes/second. My csv that I'm reading from has around 1.15 million rows leading to an overall insertion time of around 3 minutes and 10 seconds. I really need to get this time down to 2 minutes or less to keep up with data as it comes in.
My data consists of 1.15 million rows with 52 columns.
Currently I'm using the session.execute_async function to insert the data. Making changes to how many asnyc requests I allow at one time does seem to speed it up. It seems that blocking after about 5-6k requests leads to the fastest insertion rate.
I did attempt batch inserts but they were abysmally slow.
Here is my current method for inserting the data into Cassandra.
# insert data into cassandra table
execution_profile = ExecutionProfile(request_timeout=10)
profiles = {'node1': execution_profile}
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['11.111.11.11'], 9042, auth_provider=auth_provider, execution_profiles=profiles)
session = cluster.connect() # connect to your keyspace
# Read csv rows into cassandra
count = 0
futures = []
with open('/massaged.csv') as f:
next(f) #skip the header row
for line in f:
query = SimpleStatement("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,sfc_vis,sfc_gust,sfc_pres,sfc_hgt,sfc_tmp,sfc_snow_0Xacc,sfc_cnwat,sfc_weasd,sfc_snowc,sfc_snod,two_m_tmp,two_m_pot,two_m_spfh,two_m_dpt,two_m_rh,ten_m_ugrd,ten_m_vgrd,ten_m_wind_1hr_max,ten_m_maxuw_1hr_max,ten_m_maxvw_1hr_max,sfc_cpofp,sfc_prate,sfc_apcp_0Xacc,sfc_weasd_0Xacc,sfc_frozr_0Xacc,sfc_frzr_0Xacc,sfc_ssrun_1hr_acc,sfc_bgrun_1hr_acc,sfc_apcp_1hr_acc,sfc_weasd_1hr_acc,sfc_frozr_1hr_acc,sfc_csnow,sfc_cicep,sfc_cfrzr,sfc_crain,sfc_sfcr,sfc_fricv,sfc_shtfl,sfc_lhtfl,sfc_gflux,sfc_vgtyp,sfc_cape,sfc_cin,sfc_dswrf,sfc_dlwrf,sfc_uswrf,sfc_ulwrf,sfc_vbdsf,sfc_vddsf,sfc_hpbl) VALUES (%s)" %(line), consistency_level=ConsistencyLevel.ONE)
futures.append(session.execute_async(query, execution_profile='node1'))
count += 1
if count % 5000 == 0:
for f in futures:
f.result() # blocks until remaining inserts are completed.
futures = []
print("rows processed: " + str(count))
# Catch any remaining async requests that haven't finished
for f in futures:
f.result() # blocks until remaining inserts are completed.
print("rows processed: " + str(count))
I need to get my insertion time down to around 2 minutes or less (roughly 10K insertions per second). Should I be using multiprocessing to achieve this or am I using the execute_async function incorrectly?
UPDATE
As per Alex's suggestion, I attempted to implement a prepared statement. This is what I came up with but it seems to be significantly slower? Any thoughts on what I've done wrong?
hrrr_prepared = session.prepare("INSERT INTO hrrr.hrrr_18hr( loc_id,utc,...,sfc_hpbl) VALUES (?, ..., ?)")
for row in range(0, len(data)):
futures.append(session.execute_async(hrrr_prepared, tuple(data.iloc[row])))
count += 1
if count % 5000 == 0:
for f in futures:
f.result() # blocks until remaining inserts are completed.
futures = []
print("rows processed: " + str(count))
NOTE: I put the "..." in the prepared statement for readability, the actual code does not have that.