-1

My project is completely a Synchronized Web Application and i'm try to use netty server to save resource since it is using event loop for each request.

Now my requirement is to make a sync(blocking) external api call and for that I'm using WebClient with .block() at the end. But i'm getting below error

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-6

Here is my code


Mono<Map<String,Object>> resMono=wc.post().body(Mono.just(bodyStr), String.class)
  .retrieve().bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {})
  .subscribeOn(Schedulers.boundedElastic())
  .publishOn(Schedulers.boundedElastic())
//.subscribeOn(Schedulers.fromExecutor(webClientExecutor))
  .timeout(Duration.ofSeconds(10)).log("info");


 resMono.share().block();
// resMono.block();

I've tried with exchangeToMono and SubscribeOn, publishOn with Schedulars.boundedElastic() or custom executors. But nothing helps.

With resMono.share().block() works untill specific threshold and later when concurrent hits reach like 5 to 10, requests start hung (I assume, event loop is full at this point).

I aware that same WebClient code with .block() at end will work fine with Tomcat since it is sync server. But i'm tryng to do blocking call in netty application with WebClient.

Thanks in advance.

4
  • 1
    Just move it out of reactor-http-nio-6. Obviously, you cannot block inside that thread because it could freeze the whole app. Commented Jan 19, 2024 at 14:23
  • Sorry, I didn't get you. You mean move to tomcat ? Because reactor-http-nio-6 is just random thread and it will be different in each request. I tried with different and separate thread pool by using custom executor. @IgorArtamonov Commented Jan 19, 2024 at 18:16
  • You have to have another thread pool, that is not part of Reactor (or any other IO actually). That's the part of your business logic, and how exactly you do that very depend on your application architecture. But the rule that you do not block a Reactor thread, because in this case it stops working. Commented Jan 19, 2024 at 20:43
  • Thanks for clarification. I've tried with separate thread pool but I missed NioEventLoop & ResourceFactory. That's why it keep on creating in common reactor thread pool. I'll update complete code in answer. Commented Jan 20, 2024 at 2:31

1 Answer 1

0

With the help of @igor Artamonov & inspiration from SO Answer

Here is the complete solution:

1: Create NioEventLoopGroup with custom Executor (with no. of threads, name, Blocking Queue)

{
    Integer THREADS = 10; 
    Executor EXECUTOR = new ThreadPoolExecutor(THREADS, THREADS, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(10), new CustomizableThreadFactory("ThreadNamePrefix-"));
  NioEventLoopGroup RESOURCE = new NioEventLoopGroup(THREADS, EXECUTOR);
}

2: Create Registory factory

public ReactorResourceFactory getReactorResourceFactory() {
        ReactorResourceFactory rf = new ReactorResourceFactory();
        rf.setLoopResources(new LoopResources() {
            @Override
            public EventLoopGroup onServer(boolean b) {
                return RESOURCE;
            }
        });
        rf.setConnectionProvider(ConnectionProvider.create("Custom-WebClient-Name"));
        return rf;
    }

3: Create ReactorHttpConnector (created with SSL Context to allow insecure sites)

private ClientHttpConnector getCustomReactorHttpConnector() {
    try {
        SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
                .build();
        return new ReactorClientHttpConnector(getReactorResourceFactory(),
                httpClient -> httpClient.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
    } catch (Exception e) {

    }
}

4: Build WebClient ( Custom Exchange strategies to set max in-memory size)

WebClient wc = WebClient.builder().clientConnector(getCustomReactorHttpConnector())
                    .baseUrl(endpoint.getHost() + endpoint.getUrl()).exchangeStrategies(getExchangeStrategies()).build();

5: Call API with block

Map<String,Object> resMap= wc.post().body(Mono.just(bodyStr), String.class).retrieve().bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {}).share().block();
Sign up to request clarification or add additional context in comments.

11 Comments

You are just hiding the problem here by replacing the default executor with one that simply doesn't check if there is any blocking code. Event if it doesn't show you error (because you specifically configured to not check for errors) it still breaks Reactors and makes it ineffective. The solution is simply not having .block() in a reactive IO thread, just move it outside to a main thread.
Oh okay. I'm new to reactor and multi threading. That's why I can't understand "move outside to main thread" can you help with any snippet or example?
Also when I log thread name, I get request runs on reactor-http-nio- and webclient runs on "wc-" threads from custom executor. That's why I assumed that I'm doing correct.
How do you call that code? Or even in general, how do you start your application? Where is this wc.post()... is called from? You always have something that is not a part of Reactor. For example a code that starts the reactor. This is your own tread, and that's where you can block
It's a spring boot application & running it as jar with netty server. And the flow is controller -> service class ->make block call (api provider is synchronised) only if it meets certain business condition -> perform business logic -> return response (consumer is synchronised). If I run application with tomcat, no issues since it has servlet container & thread based system. But here I'm trying to do with netty where I can't have block. Technically, I'm planning to merge both reactive & non reactive world.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.