1

I need to implement the communication between two Java environments. The recipient is a SpringBoot reactive application and the snippet to handle the communication is as follows (I'll skip the configuration of the beans)

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive() // <- Step 0
        .map(message -> {
            log.info("Step 1");
            return message.getPayloadAsText();
        })
        .map(message -> {
            log.info("Step 2");
            return webSocketSession.textMessage(this.receiveMessage(message));
        }));
}

The client part is implemented using the Http API from java 11

WebSocket webSocket = HttpClient
    .newBuilder().executor(executor).build()
    .newWebSocketBuilder()
    .buildAsync(URI.create(url), new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            log.info("onText received with data " + data);
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            log.info("Closed with status " + statusCode + ", reason: " + reason);
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            log.error("Error: " + error.getMessage());
            WebSocket.Listener.super.onError(webSocket, error);
        }

    }).join();


webSocket.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true);
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok").thenRun(() -> log.info("Sent close"));

Using debug, I can notice that once the join() is completed and the WebSocket instance is returned, the method at Step 0 of receiver is executed and the Mono<Void> instance is returned.

But the problem is that even if I send some text, Steps 1 and 2 are never executed!

If I try the reverse communication (sending something from the SpringBoot application to the Sender app) the messages are received.

Finally, this is the log from the onClose callback execution after the sendClose statement.

Closed with status 1002, reason: Server internal error

1 Answer 1

1

SOLUTION

Since buildAsync method return an instance of CompletableFuture<WebSocket>, we need the chain the send of the messages before flush the messages queue using join()

Here the solution

WebSocket webSocket = HttpClient
    .newBuilder().executor(executor).build()
    .newWebSocketBuilder()
    .buildAsync(URI.create(url), new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            log.info("onText received with data " + data);
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            log.info("Closed with status " + statusCode + ", reason: " + reason);
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            log.error("Error: " + error.getMessage());
            WebSocket.Listener.super.onError(webSocket, error);
        }

    })
    .thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), false))
    .thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true))
    .thenCompose(ws -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""))
    .join();

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.