4

I am new to spring web flux, I have a client application that consumes server-sent events, The events are published by the server randomly there is not fixed delay. But consumer throws io.netty.handler.timeout.ReadTimeoutException: null after 60 secs if there no event

Server-side events consumer code

webClient.get()
        .uri("http://localhost:8080/events")
        .accept(MediaType.TEXT_EVENT_STREAM)
        .retrieve()
        .bodyToFlux(type)
        .subscribe(event -> process(event));

I need the client to be connected even if there is no event for a long time.

Full Exception

[36mr.netty.http.client.HttpClientConnect   [...] The connection observed an error

io.netty.handler.timeout.ReadTimeoutException: null

reactor.Flux.MonoFlatMapMany.1    onError(org.springframework.web.reactive.function.client.WebClientRequestException: nested exception is io.netty.handler.timeout.ReadTimeoutException)
reactor.Flux.MonoFlatMapMany.1     

org.springframework.web.reactive.function.client.WebClientRequestException: nested exception is io.netty.handler.timeout.ReadTimeoutException
    at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141) ~[spring-webflux-5.3.5.jar:5.3.5]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):

3 Answers 3

2

Webflux use a default timeout fallback that will eventually show io.netty.handler.timeout.ReadTimeoutException: null. It is possible to prevent this error by passing a custom timeout fallback to the timeout method(s):

public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback);

Additionally, you can use methods like onErrorContinue, onErrorReturn, ... to properly handle exceptions in Flux, example:

return webClient.get().uri(url).retrieve().bodyToFlux(String.class)
    .timeout(timeout, Mono.error(new ReadTimeoutException("Timeout")))
    .onErrorContinue((e, i) -> {
        // Log the error here.
    });

If you want to disable all these logs by default it is possible adding this line into file application.properties:

logging.level.reactor.netty.http.client.HttpClient=OFF
Sign up to request clarification or add additional context in comments.

Comments

2

In my case the problem was the HttpClient response timeout, I just increased it

HttpClient.create(...).responseTimeout(Duration.ofMillis(5000));

Comments

0

In the Mozilla description for server sent events there is a note:

A colon as the first character of a line is in essence a comment, and is ignored. Note: The comment line can be used to prevent connections from timing out; a server can send a comment periodically to keep the connection alive.

So periodically sending comments can keep the connection alive. So how do we send a comment?

Well spring has the class ServerSentEvent that has the function ServerSentEvent#comment. So if we use this class in combination with for instance Flux#interval we can merge in events containing only the comments keep alive.

Here is an example from a project i built a while back

@Bean
public RouterFunction<ServerResponse> foobars() {
    return route()
            .path("/api", builder -> builder
                .GET("/foobar/{id}", accept(TEXT_EVENT_STREAM), request -> ok()
                        .contentType(MediaType.TEXT_EVENT_STREAM)
                        .header("Cache-Control", "no-transform")
                        .body(Flux.merge(foobarHandler.stream(request.pathVariable("id")),
                                Flux.interval(Duration.ofSeconds(15)).map(aLong -> ServerSentEvent.<List<FoobarResponse>>builder()
                                        .comment("keep alive").build())), new ParameterizedTypeReference<ServerSentEvent<List<FoobarResponse>>>(){}))
            .build();
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.