I'm currently working on a project which requires TCP communication between an external system and an application which I will write (in Java). As we all know, this can easily be achieved using regular NIO. However, as part of this new project I'm working on, I have to use Vert.x to provide the TCP communication. Please refer to the image below:
On the right, I have my application which runs as a TCP server waiting for a connection from the external system, on the left. I've read that to create a TCP and listen for connections you simple do something like this:
NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});
However, the bit I can't figure out is how to handle when the external system connects to my application and sends EchoRequestMessages via TCP. My application has to take the received Buffer of bytes, decode it into a EchoRequestMessage POJO and then encode the EchoResponseMessage into a Buffer of bytes to send back to the external system.
How do i use vert.x-rx to perform reactive programming of the receipt of the EchoRequestMessage, its decoding, the encoding of the EchoResponseMessage, and then sending that back to the external system, all in one builder pattern type setup. I've read about Observables and subscribing, but i can't figure out what to observe or what to subscribe to. Any help would be greatly appreciated.
To read data from the socket you can use a
RecordParser
. On a socket connection, data is often separated by newline characters:A
RecordParser
is a Vert.xReadStream
so it can be transformed into aFlowable
:Now if an
EchoRequestMessage
can be created from aBuffer
:And an
EchoResponseMessage
converted to aBuffer
:You can use RxJava operators to implement an echo server flow:
[EDIT] If in your protocol messages are not line separated but length-prefixed, then you can create a custom
ReadStream
:And transform it to a
Flowable
:I find rather complicated to use the RecordParser.fixedSizeMode() method for aggregating incoming packets to one message.
I use RecordParser.newDelimited. If you can manipulate the data sent to your TCP receiver, then instead of prefixing the 4 byte message length to every message sent, you can suffix a unique delimiter to the end of it.
code sample: (I'm not using the RxJava syntax)
sending:
reading: