2

I started a project using vertX and RxJava, and I have a problem for which I don't find a solution.

I have an Observable that emits WebSocketFrame for incoming communications, each WebSocketFrame is composed of a payload (a ByteBuffer) and flags that indicates wether it is the first frame of the message or the last.

I want to make an operation on this Observable to transform it to an Observable that emits ByteBufferd that contain all the frame of a each message.

I tried the buffer method but it seems to be designed to regroup items by an arbitrary criteria (time or another observable).

Another way seems to use compose to subscribe to the WebSocketFrame observable, to add to buffer on non-ending frame, and to "feed" the ByteBuffer Observable on ending frame. But I don't know how to create and feed a buffer manually.

So if someone has already seen this issue (which IMHO seems pretty common) and have enough knowledge of RxJava for proposing an implementation I would be most gratefull.

thank you for reading.

1

1 Answer 1

2

I guess you have to use the buffer operator for this (maybe you could do with the simpler buffer, but I'm not sure about that). See also this other question that covers roughly the same topic and this GitHub page for more discussion. Hope this helps you!

Sign up to request clarification or add additional context in comments.

2 Comments

Very good answer, I have implemented most of the functionnality, but I still have a problem. the closingObservable get notified first by the publisher, while I need the item to be added before the buffer emits (I cannot wait the next frame to emit the previous message)
After much try, I found a solution. The problem come from buffer which subscribe on the closing observable before subscribing from the producer. My solution is to chain .delaySubscription(1, TimeUnit.NANOSECONDS) to the closing observable to force subscription after, and hence receive the notification after.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.