RxJava Observable buffer based on content

2019-07-09 20:10发布

问题:

I started a project using vertX and RxJava, and I have a problem for which I don't find a solution.

I have an Observable that emits WebSocketFrame for incoming communications, each WebSocketFrame is composed of a payload (a ByteBuffer) and flags that indicates wether it is the first frame of the message or the last.

I want to make an operation on this Observable to transform it to an Observable that emits ByteBufferd that contain all the frame of a each message.

I tried the buffer method but it seems to be designed to regroup items by an arbitrary criteria (time or another observable).

Another way seems to use compose to subscribe to the WebSocketFrame observable, to add to buffer on non-ending frame, and to "feed" the ByteBuffer Observable on ending frame. But I don't know how to create and feed a buffer manually.

So if someone has already seen this issue (which IMHO seems pretty common) and have enough knowledge of RxJava for proposing an implementation I would be most gratefull.

thank you for reading.

回答1:

I guess you have to use the buffer operator for this (maybe you could do with the simpler buffer, but I'm not sure about that). See also this other question that covers roughly the same topic and this GitHub page for more discussion. Hope this helps you!