This is variant of my question How to implement simple echo socket service in Spring Integration DSL. A good working solutions was introduced but I would like to explore alternatives. Particularly I am interested in solution based on using inbound and outbound channels explicitly, in client and server implementations. Is that possible?
So far I was able to come up with:
HeartbeatClientConfig
...
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(outboundChannel)
.handle(Tcp.outboundGateway(clientConnectionFactory))
.channel(inboundChannel)
.get();
}
...
HeartbeatClient
public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in scheduled intervals in loop
outboudChannel.send(new GenericMessage<String>("status"));
Message<?> message = inboundChannel.receive(1000);
}
The client part seems to be working fine. Problem is on the server side.
HeartbeatServer
public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in some kind of loop
Message<?> message = inboundChannel.receive(1000); // presumably a blocking call
...
outboudChannel.send(new GenericMessage<>("OK"));
...
}
HeartbeatServerConfig
Here comes the most tricky part where I am sure I am wrong. I just don't know what I should do. Here I naively use inverse approach from client implementation, where it seems to be working; inverse in sense of switching inbound and outbound channels in the Flow definition.
...
@Bean
public IntegrationFlow heartbeatServerFlow(
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(inboundChannel)
.handle(Tcp.inboundGateway(Tcp.netServer(7777)))
.channel(outboundChannel)
.get();
}
...
The server does not work, throwing cryptic exception about Found ambiguous parameter type [class java.lang.Boolean] for method match ...
followed by a long list of Spring and Spring Integration methods.
You can't start the server-side flow with a channel.
The flow starts with the gateway; it handles all the socket communication. When is receives a message it sends it to a channel.
You could do this...
But I would ask why you would want to because now you have to manage your own thread to receive from the request channel and write to the reply channel. In order for this to work, the
replyChannel
header from the request message must be copied to the reply message. In fact, you don't really need a reply channel; you can send the reply to thereplyChannel
header directly (that's what happens internally, we bridge the reply channel to the header channel).It's much simpler to handle the request on the gateway's thread.
Just to complement Gary's perfect answer, here is the full code if someone is interested.
I had to specify
TcpNetServerConnectionFactory
explicitly, to setByteArrayLengthHeaderSerializer
as serializer/deserializer. It did NOT work without it.HeartbeatServerConfig full code
HeartbeatServer full code
The crucial bit is of course getting the outbound channel from request message itself as:
MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel()
Full code can be found here.