To Set the context, We have 4 tables in cassandra, out of those 4, one is data table remaining are search tables (Lets assumme DATA, SEARCH1, SEARCH2 and SEARCH3 are the tables).
We have an initial load requirement with upto 15k rows in one req for the DATA table and hence to the search tables to keep in sync. We do it in batch inserts with each bacth as 4 queries (one to each table) to keep consistency.
But for every batch we need to read the data. If exists, just update only the DATA table's lastUpdatedDate column, else insert to all the 4 tables.
And below is the code snippet how we are doing:
public List<Items> loadData(List<Items> items) {
CountDownLatch latch = new CountDownLatch(items.size());
ForkJoinPool pool = new ForkJoinPool(6);
pool.submit(() -> items.parallelStream().forEach(item -> {
BatchStatement batch = prepareBatchForCreateOrUpdate(item);
batch.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
ResultSetFuture future = getSession().executeAsync(batch);
Futures.addCallback(future, new AsyncCallBack(latch), pool);
}));
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
//TODO Consider what to do with the failed Items, Retry? or remove from the items in the return type
return items;
}
private BatchStatement prepareBatchForCreateOrUpdate(Item item) {
BatchStatement batch = new BatchStatement();
Item existingItem = getExisting(item) //synchronous read
if (null != data) {
existingItem.setLastUpdatedDateTime(new Timestamp(System.currentTimeMillis()));
batch.add(existingItem));
return batch;
}
batch.add(item);
batch.add(convertItemToSearch1(item));
batch.add(convertItemToSearch2(item));
batch.add(convertItemToSearch3(item));
return batch;
}
class AsyncCallBack implements FutureCallback<ResultSet> {
private CountDownLatch latch;
AsyncCallBack(CountDownLatch latch) {
this.latch = latch;
}
// Cooldown the latch for either success or failure so that the thread that is waiting on latch.await() will know when all the asyncs are completed.
@Override
public void onSuccess(ResultSet result) {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
LOGGER.warn("Failed async query execution, Cause:{}:{}", t.getCause(), t.getMessage());
latch.countDown();
}
}
The execution is taking about 1.5 to 2 mins for 15k items considering the network roundtrip b/w application and cassandra cluster(Both reside on same DNS but different pods on kubernetes)
we have ideas to make even the read call getExisting(item) also async, but handling of the failure cases is becoming complex. Is there a better approach for data loads for cassandra(Considering only the Async wites through datastax enterprise java driver).