I am trying to query data from Cassandra using the datastax python driver. I would like this query to be as fast as possible. I was using the synchronous session.execute method but would like to use the asynchronous session.execute_async as it will presumably be more efficient by not having to wait for the row processing code to complete.
When I try to use this approach, as per the documentation it does not work. I am on Ubuntu 14.0.4 using Cassandra 2.1.3 and the v2.5 cassandra python driver.
The following code is enough to reproduce the issue on my machine:
def print_row_count(rows, label):
for i, row in enumerate(rows):
do_something = row
print "{}: processed {} rows".format(label, i+1)
def print_err(reason):
print "Error: {}".format(reason)
cluster = Cluster(['192.168.1.100'])
session = cluster.connect()
session.set_keyspace("foo")
session.default_fetch_size = 200
future_res = session.execute_async("SELECT * FROM bar LIMIT 1000;")
future_res.add_callback(print_row_count, 'Async')
future_res.add_errback(print_err)
When I execute this code nothing gets printed.
However when I add into the blocking call to ResponseFuture.result that synchronous call gets executed as well as the original async calls.
block_future_res = future_res.result()
print_row_count(block_future_res, 'BlockFuture')
Output after running with these two lines added:
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
BlockFuture: processed 1000 rows
Perhaps I misunderstand how the callback system works but I can't see anywhere in the docs where you need to call result() first for the callbacks to execute.
Any help appreciated.
Update:
The initial question about callback behaviour is answered by @WangST However I wanted to be able to page through async_executions and the following code allows this:
future = session.execute_async("SELECT * FROM metadata LIMIT 1000;")
future.add_callback(print_row_count, 'Async')
future.add_errback(print_err)
# Call this once so that the future has_more_pages value is set
future_res = future.result()
while future.has_more_pages:
future.start_fetching_next_page()
future_res = future.result()