1

To Set the context, We have 4 tables in cassandra, out of those 4, one is data table remaining are search tables (Lets assumme DATA, SEARCH1, SEARCH2 and SEARCH3 are the tables).

We have an initial load requirement with upto 15k rows in one req for the DATA table and hence to the search tables to keep in sync. We do it in batch inserts with each bacth as 4 queries (one to each table) to keep consistency.

But for every batch we need to read the data. If exists, just update only the DATA table's lastUpdatedDate column, else insert to all the 4 tables.

And below is the code snippet how we are doing:

public List<Items> loadData(List<Items> items) {
    CountDownLatch latch = new CountDownLatch(items.size());
    ForkJoinPool pool = new ForkJoinPool(6);
    pool.submit(() -> items.parallelStream().forEach(item -> {
      BatchStatement batch = prepareBatchForCreateOrUpdate(item);
      batch.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
      ResultSetFuture future = getSession().executeAsync(batch);
      Futures.addCallback(future, new AsyncCallBack(latch), pool);
    }));

    try {
      latch.await();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }

    //TODO Consider what to do with the failed Items, Retry? or remove from the items in the return type
    return items;
}

private BatchStatement prepareBatchForCreateOrUpdate(Item item) {
    BatchStatement batch = new BatchStatement();
    Item existingItem = getExisting(item) //synchronous read
    if (null != data) {
      existingItem.setLastUpdatedDateTime(new Timestamp(System.currentTimeMillis()));
      batch.add(existingItem));
      return batch;
    }

    batch.add(item);
    batch.add(convertItemToSearch1(item));
    batch.add(convertItemToSearch2(item));
    batch.add(convertItemToSearch3(item));

    return batch;
  }

class AsyncCallBack implements FutureCallback<ResultSet> {
    private CountDownLatch latch;

    AsyncCallBack(CountDownLatch latch) {
      this.latch = latch;
    }

    // Cooldown the latch for either success or failure so that the thread that is waiting on latch.await() will know when all the asyncs are completed.
    @Override
    public void onSuccess(ResultSet result) {
      latch.countDown();
    }

    @Override
    public void onFailure(Throwable t) {
      LOGGER.warn("Failed async query execution, Cause:{}:{}", t.getCause(), t.getMessage());
      latch.countDown();
    }
  }

The execution is taking about 1.5 to 2 mins for 15k items considering the network roundtrip b/w application and cassandra cluster(Both reside on same DNS but different pods on kubernetes)

we have ideas to make even the read call getExisting(item) also async, but handling of the failure cases is becoming complex. Is there a better approach for data loads for cassandra(Considering only the Async wites through datastax enterprise java driver).

1 Answer 1

2

First thing - batches in Cassandra are other things than in the relational DBs. And by using them you're putting more load on the cluster.

Regarding the making everything async, I thought about following possibility:

  1. make query to the DB, obtain a Future and add listener to it - that will be executed when query is finished (override the onSuccess);
  2. from that method, you can schedule the execution of the next actions based on the result that is obtained from Cassandra.

One thing that you need to make sure to check, is that you don't issue too much simultaneous requests at the same time. In the version 3 of the protocol, you can have up to 32k in-flight requests per connection, but in your case you may issue up to 60k (4x15k) requests. I'm using following wrapper around Session class to limit the number of in-flight requests.

Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for the answer Alex, Yes Batches in cassandra are other thing, thats why we are using individual batches for 15k records, each batch with one query to update each table to keep data in sync. i like your Session throttling example. :)
I realize this comment is late -- However, assuming your inserts are idempotent, I would do individual async writes, with QUORUM/ALL consistency and retry on failure. This should give you acceptable degree of fault-tolerance as far as data consistency is concerned.
Yes. Idempotent queries + retries are useful
On a side note, the SessionLimiter code was nice -- we had to deal with the problem of limiting in-flight messages in our Cassandra cluster, which we solved via batching (primitive, but effective)
@user1694845 main problem with batches is that they are very slow if they aren’t in the same partition. SessionLimmiter has a problem that it’s not optimal - it doesn’t know about limit per connection, but throttle per cluster. Java driver 4.x has built in throttling...
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.