0

I have this sink configured in my app:

createdSink = Sinks.many().multicast().onBackpressureBuffer(4096);

public void activityCreated(ActivityResource createdActivity) {
    try {
        var notification = new ActivityCreatedNotification(createdActivity);
        createdSink.emitNext(notification, getFailureHandler());
    } catch (Exception e) {
        log.error("Emit activity created notification for activity id {} failed", createdActivity.getId(), e);
    }
}

private static Sinks.EmitFailureHandler getFailureHandler() {
    return Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(5));
}

Now unfortunately I'm not sure if I understand what my failure handler does, or how I can configure it to do what I want.

But I was hoping to get some help from here.

I'm sometimes observing an overflow excpetion like this:

2025-05-05T05:46:01.312Z ERROR [] [reactor-http-epoll-25]
[00000000-0000-4000-a000-000000000000][][][TRACE_ID:b42fa551661d9427][][c.e.d.p.a.e.ActivityExternalNotificationService.lambda$prepareExternalNotifications$3(79)]
 - Error while processing activity notification     
reactor.core.Exceptions$OverflowException: Backpressure overflow during Sinks.Many#emitNext
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:249)

This is even though I have the backpressure configured, so it probably means that the backpressure queue was already full and elements kept on being emitted to the sink, while one of the subscribers could not handle the rate, right?

So what I would like in that case is to drop elements, and to avoid the sink from collapsing/terminating.

Basically I must have this hot publisher, the sink, to be infinite, it can't stop working.

So for dropping elements all I saw was that there is this directBestEffort() method in reactive:

Sinks.many().multicast().directBestEffort()

Which, if I understood correctly, would just drop elements if my subscribers can't handle the rate in which elements are being emitted to the sink. So that could be a solution, but as it seems, this does not let me also have a backpressure queue like today.

So I guess the question is: How can I configure a backpressure queue like the one I have today which is nice to have (buffer some elements instead of dropping right away) but to also avoid the overflow error and to drop instead?

Is there a way to configure this using the errorHandler? Or by different means? Thanks!

1 Answer 1

0

how can I configure a backpressure queue like the one I have today which is nice to have (buffer some elements instead of dropping right away) but to also avoid the overflow error and to drop instead?

As far as I know the Sinks.many().multicast().onBackpressureBuffer(4096) does not drop on overflow — it throws.

And if you like to stay with multicast you have to explicitly handle the exception. I would rather use tryEmitNext() than emitNext() and check whether there is a failure or not. With all these in mind you should have next one method body for activityCreated(), I guess

public void activityCreated(ActivityResource createdActivity) {
       var notification = new ActivityCreatedNotification(createdActivity);
       var result = createdSink.tryEmitNext(notification);
    if (result.isFailure()) {
       System.out.println("Dropping activity created event due to backpressure");
     // or here you can have a fallback handler
    }
}
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for the reply! I tried your suggestion but I'm getting FAIL_NON_SERIALIZED. It seems that because I'm emitting concurrently from multiple threads, the tryEmitNext is not the way to go. Do you have a suggestion what I can do to avoid the overflow exception?
Have you tried to use .serialize() when creating the sink yet? after .onBackpressureBuffer(4096)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.