I have an application that reads from 4 Cassandra tables, 4 tables are in different clusters, and I make 4 sessions through the datastax java driver
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version>
</dependency>
I am doing single row fetch (point queries) on the first table, and am using pagination on the other 3 tables. The result of the query made on the second table is used in the query for 3rd and 4th tables.
Table 4 primary key - (id, store) store is the clustering key and id is the partition key
From Table 3 a bunch of stores are obtained.
The Cassandra session details for the 4th cluster is -
DriverConfigLoader.programmaticBuilder()
.withLong(DefaultDriverOption.REQUEST_PAGE_SIZE, 1000)
.withLong(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 4)
.withLong(DefaultDriverOption.CONNECTION_MAX_REQUESTS, 8192)
.withBoolean(DefaultDriverOption.SOCKET_KEEP_ALIVE, true)
.withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, true)
.withBoolean(DefaultDriverOption.METADATA_SCHEMA_ENABLED, false)
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30))
.withString(DefaultDriverOption.REQUEST_CONSISTENCY, DefaultConsistencyLevel.LOCAL_ONE.toString())
.build();
The code for the 4th tables looks like
return cqlSession.executeAsync(ps.bind(id))
.thenCompose(rs -> {
Map<String, Set<String>> finalMap = new HashMap<>();
return processRows(rs, finalMap, listedStores, limoEligStartTime);
})
.exceptionally(ex -> {
// do something, record metrics
return Collections.emptyMap();
` });
the processRows function looks like
private CompletionStage<Map<String, Set<String>>> processRows(AsyncResultSet rs, Map<String, Set<String>> finalMap, Set<String> thirdTableResult, long resultGetTime) {
METRICS.recordExecutionTime("LimoEligibilityQueryTime", System.currentTimeMillis() - resultGetTime);
Iterable<Row> iterable = rs.currentPage();
for (Row row : iterable) {
try {
if (row != null) {
// do something
} else {
//record metrics for row not found
}
} catch (IOException ex) {
//record exception
}
}
if (rs.hasMorePages()) {
return rs.fetchNextPage()
.thenCompose(asyncRs -> processRows(rs, finalMap, thirdTableResult, limoEligRowsProcessingCompletionTime))
.exceptionally(ex -> {
// record exception
return Collections.emptyMap();
});
} else {
return CompletableFuture.completedFuture(finalMap);
}
}
This returns the following exception:
Exception occurred in table future - java.util.concurrent.CompletionException: com.datastax.oss.driver.api.core.DriverTimeoutException:
I have tried tuning the page size but that doesn't work. Table 3 and 4 are queried parallelly. And i use pagination on Table 2,3,4.
With table 4 I am facing issues with teh queries
Could it be possible that the driver could time out because the query to two different cluster is being made concurrently?
Tried the following, but nothing worked Tune page size - 100 to 1000 Increase number of connection per host - 4 Max request per connection - tried 2048 to 8192
When I use single row fetch from the Table 4 using the results of the Table 3(From Table 3 a bunch of stores are obtained), the query works completely fine and I do not get any exception, but that increases the reads throughput on the cassandra cluster(which is not something we want)