After I understood how to run queries including to evaluate the metadata in this question I am running into a design flaw:
Instant threshold = Instant.now().minus(retention);
String query = String.format("delete from %s.%s.%s where _lastmodified < \"%s\"", bucketName, scopeName, collectionName, threshold);
QueryResult result = cluster.query(query);
The above code returns QueryResult, and it only has methods like rowsAs... that will return the full data set. It was just a matter of time until I run queries returning more data than my JVM heap can cover.
The answer is to stream the data: Read as much as you can digest, digest it and forget. Repeat until done.
The Couchbase documentation mentions to switch to the Reactive and Async APIs:
Reactive And Async APIs
[...]
Also, there is another reason you want to use the reactive API: streaming large results with backpressure from the application side. Both the blocking and async APIs have no means of signalling backpressure in a good way, so if you need it the reactive API is your best option.
So I changed my code snippet to this:
Instant threshold = Instant.now().minus(retention);
String query = String.format("delete from %s.%s.%s where _lastmodified < \"%s\"", bucketName, scopeName, collectionName, threshold);
ReactiveCluster rc = couchbase.reactive();
try {
rc.query(query,
QueryOptions.queryOptions()
.readonly(true)
.scanConsistency(QueryScanConsistency.REQUEST_PLUS)
.metrics(true)
).flux().flatMap(result -> {
Flux<JsonObject> rows = result.rowsAs(JsonObject.class);
return rows;
}).subscribe(row -> {
// process one row
}, (Throwable t) -> {
log.error("Could not read data", t);
}
);
} finally {
rc.disconnect();
}
As in my last question: How can I get hold of the query metrics in this async setup?