2

After I understood how to run queries including to evaluate the metadata in this question I am running into a design flaw:

Instant threshold = Instant.now().minus(retention);
String query = String.format("delete from %s.%s.%s where _lastmodified < \"%s\"", bucketName, scopeName, collectionName, threshold);
QueryResult result = cluster.query(query);

The above code returns QueryResult, and it only has methods like rowsAs... that will return the full data set. It was just a matter of time until I run queries returning more data than my JVM heap can cover.

The answer is to stream the data: Read as much as you can digest, digest it and forget. Repeat until done.

The Couchbase documentation mentions to switch to the Reactive and Async APIs:

Reactive And Async APIs

[...]

Also, there is another reason you want to use the reactive API: streaming large results with backpressure from the application side. Both the blocking and async APIs have no means of signalling backpressure in a good way, so if you need it the reactive API is your best option.

So I changed my code snippet to this:

  Instant threshold = Instant.now().minus(retention);
  String query = String.format("delete from %s.%s.%s where _lastmodified < \"%s\"", bucketName, scopeName, collectionName, threshold);
  ReactiveCluster rc = couchbase.reactive();
  try {
    rc.query(query,
          QueryOptions.queryOptions()
            .readonly(true)
            .scanConsistency(QueryScanConsistency.REQUEST_PLUS)
            .metrics(true)
    ).flux().flatMap(result -> {
      Flux<JsonObject> rows = result.rowsAs(JsonObject.class);
      return rows;
    }).subscribe(row -> {
        // process one row
    }, (Throwable t) -> {
      log.error("Could not read data", t);
    }
            );
  } finally {
    rc.disconnect();
  }

As in my last question: How can I get hold of the query metrics in this async setup?

1 Answer 1

4

The QueryResult you get in the blocking API is really just a wrapper: it contains both the rows and the metadata (including metrics). In the reactive API this is split — you first get a ReactiveQueryResult, and from that you can pull both the rows and the metadata/metrics.

Your current code only consumes the rows (result.rowsAs(JsonObject.class)), but you can also call result.metaData() to access metrics, warnings, and the rest of the metadata. Since it’s asynchronous, metaData() returns a Mono<QueryMetaData>.

Here’s an example:

Instant threshold = Instant.now().minus(retention);
String query = String.format(
    "delete from %s.%s.%s where _lastmodified < \"%s\"",
    bucketName, scopeName, collectionName, threshold
);

ReactiveCluster rc = couchbase.reactive();

rc.query(query,
    QueryOptions.queryOptions()
        .readonly(true)
        .scanConsistency(QueryScanConsistency.REQUEST_PLUS)
        .metrics(true)
).flatMapMany(result -> {
    // Consume the rows
    Flux<JsonObject> rows = result.rowsAs(JsonObject.class);

    // Also capture the metadata when the rows are done
    return rows
        .doOnComplete(() -> result.metaData().subscribe(meta -> {
            log.info("Query metrics: {}", meta.metrics().get());
        }));
}).subscribe(
    row -> {
        // process each row
    },
    t -> log.error("Could not read data", t)
);

Key points:

  • result.metaData() is a Mono<QueryMetaData>.

  • You need to subscribe to it separately (or chain it in a doOnComplete like above).

  • QueryMetaData.metrics() is optional, so call .get() to access them if present.

This way you keep streaming rows reactively and still get the query metrics once the query finishes.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.