0

I have an application that reads from 4 Cassandra tables, 4 tables are in different clusters, and I make 4 sessions through the datastax java driver

        <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>java-driver-core</artifactId>
            <version>4.17.0</version>
        </dependency>

I am doing single row fetch (point queries) on the first table, and am using pagination on the other 3 tables. The result of the query made on the second table is used in the query for 3rd and 4th tables.

Table 4 primary key - (id, store) store is the clustering key and id is the partition key

From Table 3 a bunch of stores are obtained.

The Cassandra session details for the 4th cluster is -

DriverConfigLoader.programmaticBuilder()
                .withLong(DefaultDriverOption.REQUEST_PAGE_SIZE, 1000)
                .withLong(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 4)
                .withLong(DefaultDriverOption.CONNECTION_MAX_REQUESTS, 8192)
                .withBoolean(DefaultDriverOption.SOCKET_KEEP_ALIVE, true)
                .withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, true)
                .withBoolean(DefaultDriverOption.METADATA_SCHEMA_ENABLED, false)
                .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30)) 
                .withString(DefaultDriverOption.REQUEST_CONSISTENCY, DefaultConsistencyLevel.LOCAL_ONE.toString())
                .build();

The code for the 4th tables looks like

return cqlSession.executeAsync(ps.bind(id))
                .thenCompose(rs -> {
                    Map<String, Set<String>> finalMap = new HashMap<>();                    
                    return processRows(rs, finalMap, listedStores, limoEligStartTime);
                })
                .exceptionally(ex -> {
                    
                    // do something, record metrics
                    return Collections.emptyMap();
     `           });

the processRows function looks like

private CompletionStage<Map<String, Set<String>>> processRows(AsyncResultSet rs, Map<String, Set<String>> finalMap, Set<String> thirdTableResult, long resultGetTime) {
        METRICS.recordExecutionTime("LimoEligibilityQueryTime", System.currentTimeMillis() - resultGetTime);
        Iterable<Row> iterable = rs.currentPage();
        for (Row row : iterable) {
            try {
                if (row != null) {
                    // do something
                } else {
                    //record metrics for row not found
                }
            } catch (IOException ex) {
                //record exception
            }
        }
        if (rs.hasMorePages()) {
            
            return rs.fetchNextPage()
                    .thenCompose(asyncRs -> processRows(rs, finalMap, thirdTableResult, limoEligRowsProcessingCompletionTime))
                    .exceptionally(ex -> {
                        
                        // record exception
                        return Collections.emptyMap();
                    });
        } else {
            return CompletableFuture.completedFuture(finalMap);
        }
    }

This returns the following exception:

Exception occurred in table future - java.util.concurrent.CompletionException: com.datastax.oss.driver.api.core.DriverTimeoutException:

I have tried tuning the page size but that doesn't work. Table 3 and 4 are queried parallelly. And i use pagination on Table 2,3,4.

With table 4 I am facing issues with teh queries

Could it be possible that the driver could time out because the query to two different cluster is being made concurrently?

Tried the following, but nothing worked Tune page size - 100 to 1000 Increase number of connection per host - 4 Max request per connection - tried 2048 to 8192

When I use single row fetch from the Table 4 using the results of the Table 3(From Table 3 a bunch of stores are obtained), the query works completely fine and I do not get any exception, but that increases the reads throughput on the cassandra cluster(which is not something we want)

2 Answers 2

0

DriverTimeoutException indicates that the cluster you are connecting to isn't able to serve your request in time. That is, it is overloaded and unable to respond to a request before the timeout passes.

Your Cassandra cluster is basically saying "I can't keep up".

It might be your code that is overloading the cluster, effectively creating a DDOS attack by sending a great of requests or it might be that your cluster is not feeling well for other reasons. Either way, the problem needs to be identified on the server side.

If it is your code that is hammering the node into submission (which is likely considering you are running async), the options you have are to slow down your requests (For example by throttling on the server or client side) or to ensure your cluster can handle the number of requests you are sending.

Sign up to request clarification or add additional context in comments.

Comments

0

The DriverTimeoutException is a client-side error that is thrown when the driver doesn't get a response back from the request coordinator within the basic.request.timeout duration.

The DriverTimeoutException is very distinct from other exceptions such as AllNodesFailedException which is thrown when all the nodes the driver tried for a query failed, or ReadTimeoutException which is a server-side exception thrown when the coordinator didn't get a response from replica(s) within read_request_timeout.

There isn't anything in your post which indicates that the pagination is the cause for the exception being thrown other than the coordinator(s) not responding in time.

Ideally you should always provide (1) the full error message, plus (2) the full stack trace since they are crucial to diagnosing the issue. Cheers!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.