0

What are the best practices of streaming lots of data with GRPC? I am sending a request to a GRPC server that will stream back data. The data send back can be a in the order of 100 protobuf messages or can be a couple of 100.000 protobuf messages.

service CrossEngineSelector {
    rpc QueryDB (QueryRequest) returns (stream QueryResponse) {}
}

The server is a simple implementation that sends the protobuf messages.

@Override
public void queryBD(QueryRequest request, StreamObserver<QueryResponse> responseObserver) {

      Iterables.partition( dataLoader.getData(), 1000).forEach(batch -> {
          responseObserver.onNext(QueryResponse.newBuilder().addAllRows(batch).build());
      });
      responseObserver.onCompleted();
}

The client side uses a blockingStub that calls this method (generated code by protobuf):

public Iterator<QueryResponse> queryDB(QueryRequest request) {
            return ClientCalls.blockingServerStreamingCall(this.getChannel(),
                   CrossEngineSelectorGrpc.getQueryEnginesMethod(),
                   this.getCallOptions(), request);
}

Once the client calls this method I just iterate over the QueryResponse.

All of this works fine for streams that only send back a small amount of messages. Once I try to stream 100.000 messages the max-inbound-message-size keeps increasing and I end up with an error: RESOURCE_EXHAUSTED: Compressed gRPC message exceeds maximum size 4194304: 4196022 bytes read

My current fix for this is to set the max-inbound-message-size very high +1Gb. This is a hardcoded value, so it doesn't scale. The client has no idea how many messages the server will return. I might run into use cases were even the 1Gb max-inbound-message-size will not be enough.

I hope I am making an implementation mistake. I hope there is a way of resetting the message size for every stream (onNext()) from the server or is it normal it keeps increasing the message size?

I would assume that a single responseObserver.onNext(QueryResponse.newBuilder().addAllRows(batch).build()); sends a couple of Mb and that this would be considered as the message size and not the entire stream for as long as it runs.

I am using Micronaut for both server and client.

1 Answer 1

2

The maxInboundMessageSize is for protecting receiver going out of memory if malicious peer sends very large payload to attack. The real problem here is the 1000 chunks can be larger than 4MB (default maxInboundMessageSize).

you can fix this in two ways,

  • fix the sender to limit by size of the payload, not by # of rows.
  • or, the receiver have large enough maxInboundMessageSize to handle normal 1000 rows worth of data.

the later method is essentially what you implemented. It should be lower than 1GB+ though. gRPC won't use the maxInboundMessageSize amount of memory as I mentioned above this is purely for protecting. (added later)

Sign up to request clarification or add additional context in comments.

3 Comments

This is indeed how I would expect it to work. However form my tests it looks like it takes the whole stream size as max-size. Excample: Streaming 10 Mb in 1 Mb chunks will result in an error after the 4th stream, unless you set max-inbound-message-size to 10Mb In my case I can always stream about 8500 lines and after that it crashes io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: gRPC message exceeds maximum size 4194304: 4335181
grpc checks the frame size from the header of each http2 frame (see MessageDeframer#processHeader). So it is unlike happen (i hope =p) if you can create a minimal repro, we can definitely take a look.
Thanks you @creamsoup, after checking all the code again I found a mistake server side that was increasing the protobuf message with every iteration. All works as expected.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.