0

I am trying to query data from Cassandra using the datastax python driver. I would like this query to be as fast as possible. I was using the synchronous session.execute method but would like to use the asynchronous session.execute_async as it will presumably be more efficient by not having to wait for the row processing code to complete.

When I try to use this approach, as per the documentation it does not work. I am on Ubuntu 14.0.4 using Cassandra 2.1.3 and the v2.5 cassandra python driver.

The following code is enough to reproduce the issue on my machine:

def print_row_count(rows, label):
    for i, row in enumerate(rows):
        do_something = row
    print "{}: processed {} rows".format(label, i+1)

def print_err(reason):
    print "Error: {}".format(reason)

cluster = Cluster(['192.168.1.100'])
session = cluster.connect()
session.set_keyspace("foo")
session.default_fetch_size = 200
future_res = session.execute_async("SELECT * FROM bar LIMIT 1000;")
future_res.add_callback(print_row_count, 'Async')
future_res.add_errback(print_err)

When I execute this code nothing gets printed.

However when I add into the blocking call to ResponseFuture.result that synchronous call gets executed as well as the original async calls.

block_future_res = future_res.result()
print_row_count(block_future_res, 'BlockFuture') 

Output after running with these two lines added:

Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
Async: processed 200 rows
BlockFuture: processed 1000 rows

Perhaps I misunderstand how the callback system works but I can't see anywhere in the docs where you need to call result() first for the callbacks to execute.

Any help appreciated.

Update:

The initial question about callback behaviour is answered by @WangST However I wanted to be able to page through async_executions and the following code allows this:

future = session.execute_async("SELECT * FROM metadata LIMIT 1000;")
future.add_callback(print_row_count, 'Async')
future.add_errback(print_err)

# Call this once so that the future has_more_pages value is set
future_res = future.result()
while future.has_more_pages:
    future.start_fetching_next_page()
    future_res = future.result()

1 Answer 1

2

without:

block_future_res = future_res.result()

the main thread has nothing to do and exit, before the execute_async() query finish. so nothing printed.

Sign up to request clarification or add additional context in comments.

3 Comments

Ok that makes sense that the main thread exits because those callbacks happen on another thread. Not sure why the Async: processed 200 rows code only gets executed once when you just have the line block_future_res = future_res.result() but gets executed 5 times when you include the next line: print_row_count(block_future_res, 'BlockFuture'). This is telling me I should just use a PagedResult or I am still missing how to use execute_async properly with multiple pages of results
What I am saying really is that I expected there would a away to page through results using execute_async or should I stick to execute and use to PagedResult class to handle paging? Thanks
The SQL statement LIMIT 1000 limit the records to 1000. If you want multiple pages of results, maybe you can add another async SQL query as a future to the previous, like future_res.add_callback(session.execute_async("...") . The SQL query is in a chain, it will continue if success, or print_err will be executed.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.