0

Given the rsocket client retrieving data from server using stream semantic, I would like to utilize Flux.retryWhen(retryBackoffSpec) mechanizm to reconnect in case connection is lost. Streaming endpoint on server requires valid JWT tokens to be provided.

Initial connection works fine, however I'm having problems during reconnection:

  • if the reconnection happens while the JWT token is still valid the process is successful,
  • if the reconnection happens after JWT expired, it's failing with:

last failure={RejectedSetupException (0x3): Jwt expired at 2022-12-13T10:26:39Z} error

I was not able to find a way to replace a JWT token send as metadata to streaming endpoint before retry is performed. In my attempt, original JWT token (expired one) sent during initial request is used, which of course causes the error:

this.rSocketRequester
        .route("stream")
        .metadata(getAccessToken(), AUTHENTICATION)
        .data(streamRequest)
        .retrieveFlux(GenericRecord.class)
        .retryWhen(retryBackoffSpec); // ?? how to replace the JWT token ??

I'm looking for a way to replace the metadata send during the retry with newly obtained JWT token (via getAccessToken() method).

Client

Connection initialization:

private static final MimeType AUTHENTICATION = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString());

public void connect() {
    final SocketAcceptor responder = RSocketMessageHandler.responder(rSocketStrategies, new ClientHandler());

    //@formatter:off
    retryBackoffSpec = Retry
            .fixedDelay(10, Duration.ofSeconds(10))
            .doBeforeRetry(signal -> log.warn("Connecting retry: {}", signal));

    final RSocketRequester.Builder rSocketRequesterTpl = rSocketRequesterBuilder
            .setupRoute("subscriber")
            .setupData(CLIENT_ID)
            .rsocketStrategies(builder -> {
                builder.encoders(encoders -> {
                    encoders.add(new BearerTokenAuthenticationEncoder());
                });
            })
            .rsocketConnector(connector -> {
                connector.acceptor(responder);
                connector.reconnect(retryBackoffSpec);
            });
    //@formatter:on

    this.rSocketRequester = rSocketRequesterTpl.tcp("localhost", 7000);
}

Stream subscription:

public void stream(final MessageCallback callback, final StreamRequest streamRequest) {
    final Flux<GenericRecord> streamFlux = this.rSocketRequester
            .route("stream")
            .metadata(getAccessToken(), AUTHENTICATION)
            .data(streamRequest)
            .retrieveFlux(GenericRecord.class)
            .retryWhen(retryBackoffSpec); // ?? how to replace the JWT token ??

    streamFlux.publishOn(scheduler).subscribe(m -> callback.doOnMessage(m), t -> callback.doOnError(t));
}

Server controller


    @ConnectMapping("subscriber")
    void connectSubscriber(final RSocketRequester requester, @Payload final String client) {
    // removed for clarity
    }


@PreAuthorize("isAuthenticated()")
@MessageMapping({ "stream" })
Flux<StreamMessage > stream(@AuthenticationPrincipal(expression = "@authenticationConverter.convert(#this)") final JwtAuthenticationToken user,
        @Payload(required = false) StreamRequest streamRequest) {
        return Flux.interval(Duration.ofMillis(100)).map(i -> {
            final StreamMessage message = new StreamMessage();
            message.setIndex(i);
            return message;
        });
}

Relevant dependencies:

spring-boot-starter-parent 2.7.6
rsocket-core 1.1.3

1 Answer 1

1

From the documentation of metadata() method:

The metadata value be a concrete value or any producer of a single value that can be adapted to a Publisher via ReactiveAdapterRegistry.

So, you can provide updated JWT token like this:

Flux.defer(
    this.rSocketRequester
        .route("stream")
        .metadata(Mono.fromSupplier(this::getAccessToken), AUTHENTICATION)
        .data(streamRequest)
        .retrieveFlux(GenericRecord.class)
)
    .retryWhen(retryBackoffSpec); 

Edit: I updated the answer with a working code. Just providing a supplier won't work in case of errors.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.