I am trying to build a module to be deployed on multiple nodes using Spring boot. Due to time constraints of the specific application, I have to use UDP and cannot rely on the easier-to-use REST facilities that Spring provides.
I have to be able to send datagrams to a set of nodes that may vary in time (i.e. the set may grow or shrink, or some nodes may move to new ip/port "coordinates"). Communication must be unicast.
I have been reading the official documentation about TCP and UDP support TCP and UDP support, but it is rather... compact, and opaque. The javadocs on the org.springframework.integration classes are also rather brief for that matter.
For what I could understand, an "inbound" channel is used to send a packet, while an outbound channel is used to receive packets.
I haven't been able so far to find an answer to the following issues for inbound (i.e. "send" channels, if I understood well):
- How can I create more channels at runtime, to send packets to multiple destinations?
- If a host gets moved, should I just destroy the channel and set up a new one, or may I change a channel's parameters (destination ip/port) at runtime?
For outbound channels ("receive" channels if I understood well), I have similar questions to the above, as in:
- How do I set up multiple channels at runtime?
- How do I change destination for an existing channel at runtime, not to have to tear it down and set it up anew?
- Should I just open/close "raw" UDP sockets instead?
You have inbound and outbound reversed.
Here's an example that should provide you with what you need; it uses a pub/sub channel to broadcast...
@SpringBootApplication
public class So48213450Application {
private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();
public static void main(String[] args) {
SpringApplication.run(So48213450Application.class, args);
}
@Bean
public PublishSubscribeChannel channel() {
return new PublishSubscribeChannel();
}
@Bean
public ApplicationRunner runner(PublishSubscribeChannel channel) {
return args -> {
makeANewUdpAdapter(1234);
makeANewUdpAdapter(1235);
channel.send(MessageBuilder.withPayload("foo\n").build());
registrations.values().forEach(r -> {
r.stop();
r.destroy();
});
};
}
@Autowired
private IntegrationFlowContext flowContext;
public void makeANewUdpAdapter(int port) {
System.out.println("Creating an adapter to send to port " + port);
IntegrationFlow flow = IntegrationFlows.from(channel())
.handle(Udp.outboundAdapter("localhost", port))
.get();
IntegrationFlowRegistration registration = flowContext.registration(flow).register();
registrations.put(port, registration);
}
}
result:
$ nc -u -l 1234 &
[1] 56730
$ nc -u -l 1235 &
[2] 56739
$ jobs
[1]- Running nc -u -l 1234 &
[2]+ Running nc -u -l 1235 &
$ foo
foo
You can't change parameters at runtime, you would have to create new ones.
EDIT
In response to your comments below...
You can't mix and match spring integration jars (2.1.x and 5.0.x); they must all be with the same version. My example above used Boot 2.0.0.M7 (boot 2 is scheduled to be released next month).
The Udp factory class was added to spring-integration-ip in 5.0.0.
Here is a similar example (which also adds receiving adapters) for boot 1.5.9 and spring integration 4.3.13...
@SpringBootApplication
public class So482134501Application {
private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();
@Autowired
private IntegrationFlowContext flowContext;
public static void main(String[] args) {
SpringApplication.run(So482134501Application.class, args);
}
@Bean
public PublishSubscribeChannel channel() {
return new PublishSubscribeChannel();
}
@Bean
public ApplicationRunner runner(PublishSubscribeChannel channel) {
return args -> {
makeANewUdpInbound(1234);
makeANewUdpInbound(1235);
makeANewUdpOutbound(1234);
makeANewUdpOutbound(1235);
Thread.sleep(5_000);
channel.send(MessageBuilder.withPayload("foo\n").build());
this.registrations.values().forEach(r -> {
r.stop();
r.destroy();
});
this.registrations.clear();
};
}
public void makeANewUdpOutbound(int port) {
System.out.println("Creating an adapter to send to port " + port);
IntegrationFlow flow = IntegrationFlows.from(channel())
.handle(new UnicastSendingMessageHandler("localhost", port))
.get();
IntegrationFlowRegistration registration = flowContext.registration(flow).register();
registrations.put(port, registration);
}
public void makeANewUdpInbound(int port) {
System.out.println("Creating an adapter to receive from port " + port);
IntegrationFlow flow = IntegrationFlows.from(new UnicastReceivingChannelAdapter(port))
.<byte[], String>transform(String::new)
.handle(System.out::println)
.get();
IntegrationFlowRegistration registration = flowContext.registration(flow).register();
registrations.put(port, registration);
}
}
result:
GenericMessage [payload=foo
, headers={ip_packetAddress=localhost/127.0.0.1:54881, ip_address=127.0.0.1, id=db7dae61-078c-5eb6-dde4-f83fc6c591d1, ip_port=54881, ip_hostname=localhost, timestamp=1515764556722}]
GenericMessage [payload=foo
, headers={ip_packetAddress=localhost/127.0.0.1:54880, ip_address=127.0.0.1, id=d1f79e79-569b-637b-57c5-549051f1b031, ip_port=54880, ip_hostname=localhost, timestamp=1515764556722}]