1

I written a piece of code that is multiprocessing. Connecting to cassandra where I am running 32 queries to fetch data. I have tried to parralelize the fetch using the multiprocessing library in python. The code looks like this.

    from cassandra.cluster import Cluster
    cluster = Cluster(['xyz'])
    session = cluster.connect()

    query = session.prepare('SELECT stuff')
    session.default_timeout = 600000
    session.default_fetch_size = 100
    queries = [
        session.execute_async(query, ['2021-10-19'] + [i])
        for i in range(32)
    ]
    pool = mp.Pool(32)
    inter_obj = pool.map_async(compute, queries)
    inter_obj.wait()
    res = inter_obj.get()

    pool.close()
    pool.join()
    final_response = reduce(aggregate, res)
    resp = json.dumps(final_response, sort_keys=True, indent=4).encode("utf-8")
    print("RESPONSE", resp)

On running the program it errors on the wait()

Traceback (most recent call last):
  File "/usr/local/bin/date-run", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/sc_eol/run_stuff.py", line 75, in main
    res = inter_obj.get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 768, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

1 Answer 1

2

The execute_async() returns a ResponseFuture object. You're better off building a list of "futures" with:

futures = []
query = ...
for ... :
    futures.append(session.execute_async(query, ...)

This approach executes the queries concurrently. You can then iterate over the results with:

for future in futures:
    rows = future.result()
    # insert processing here

The call to result() is blocked until the request has returned (a) a result or (b) an error.

For details, see the Cassandra Python driver Getting Started guide. Cheers!

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.