I started a project using vertX and RxJava, and I have a problem for which I don't find a solution.
I have an Observable that emits WebSocketFrame for incoming communications, each WebSocketFrame is composed of a payload (a ByteBuffer) and flags that indicates wether it is the first frame of the message or the last.
I want to make an operation on this Observable to transform it to an Observable that emits ByteBufferd that contain all the frame of a each message.
I tried the buffer
method but it seems to be designed to regroup items by an arbitrary criteria (time or another observable).
Another way seems to use compose
to subscribe to the WebSocketFrame observable, to add to buffer on non-ending frame, and to "feed" the ByteBuffer Observable on ending frame. But I don't know how to create and feed a buffer manually.
So if someone has already seen this issue (which IMHO seems pretty common) and have enough knowledge of RxJava for proposing an implementation I would be most gratefull.
thank you for reading.
I guess you have to use the
buffer
operator for this (maybe you could do with the simplerbuffer
, but I'm not sure about that). See also this other question that covers roughly the same topic and this GitHub page for more discussion. Hope this helps you!