From my application, I need to configure multiple client connections that needs to connect to a single server. To do this, I create a variable amount of beans with the ApplicationContext Beanfactory, based on how many clients I have configured. Here is the code for 2 clients:
//setup beans;
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.scan("pkg");
ConnectionFactory factory = new ConnectionFactory();
int clients = 2; //TODO read this value from file
ConfigurableListableBeanFactory beanFactory = ctx.getBeanFactory();
for (int count = 1; count <= clients; count++) {
TcpNetClientConnectionFactory connectionFactory = factory.createClientConnectionFactory("127.0.0.1", 6680);
//connection factory
beanFactory.registerSingleton("connectionFactory_" + String.valueOf(count), connectionFactory);
//inbound gateway
MessageChannel input = new DirectChannel();
MessageChannel output = new DirectChannel();
TcpInboundGateway gateway = factory.createInboundGateway(connectionFactory, beanFactory, input, output, 10000, 20000);
beanFactory.registerSingleton("gateway_" + String.valueOf(count), gateway);
//message transformation and handling
IntegrationFlow flow = factory.createFlow(input);
beanFactory.registerSingleton("flow_" + String.valueOf(count), flow);
}
ctx.refresh();
//open connections
for(int count = 1; count <= clients; count++) {
TcpInboundGateway gateway = ctx.getBean("gateway_" + count, TcpInboundGateway.class);
//necessary for the client to connect
gateway.retryConnection();
}
Here is my factory methods:
@EnableIntegration
@IntegrationComponentScan
@Configuration
public class ConnectionFactory {
public TcpNetClientConnectionFactory createClientConnectionFactory(String ip, int port) {
TcpNetClientConnectionFactory factory = new TcpNetClientConnectionFactory(ip, port);
factory.setSingleUse(false);
factory.setSoTimeout(10000);
factory.setSerializer(new ByteArrayLfSerializer());
factory.setDeserializer(new ByteArrayLfSerializer());
return factory;
}
public TcpInboundGateway createInboundGateway(
AbstractConnectionFactory factory,
BeanFactory beanFactory,
MessageChannel input,
int replyTimeout,
int retryInterval) {
TcpInboundGateway gateway = new TcpInboundGateway();
gateway.setRequestChannel(input);
gateway.setConnectionFactory(factory);
gateway.setClientMode(true);
gateway.setReplyTimeout(replyTimeout);
gateway.setRetryInterval(retryInterval);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
gateway.setTaskScheduler(scheduler);
gateway.setBeanFactory(beanFactory);
return gateway;
}
public IntegrationFlow createFlow(MessageChannel input) {
IntegrationFlowBuilder builder = IntegrationFlows.from(input);
builder.transform(Transformers.objectToString()).handle(System.out::println);
return builder.get();
}
}
When I run my program, both clients connects to my server. However, as soon as the server sends its first payload to each client I get the following exception (one for each client):
Exception sending message: GenericMessage [payload=byte[5], headers={ip_tcp_remotePort=6680, ip_connectionId=localhost:6680:33372:e26b9973-a32e-4c28-b808-1f2556576d01, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4443ca34-fb53-a753-7603-53f6d7d82e11, ip_hostname=localhost, timestamp=1464098102462}]
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) ~[spring-messaging-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:422) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182) ~[spring-integration-ip-4.2.5.RELEASE.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
... 14 common frames omitted
The idea was that the data would be read, sent through the channels I configured for my InboundGateway to the transformer, which will then transform the data to a String after which it will be printed out.
Why does the framework not know which channel to put the data? As far as I can see, I did create a unique channel for each client in the inbound gateway factory method. Can someone please have a look at my configuration and let me know what I missed, as I am absolutely stumped by this one.
Here is the working simplified solution:
Beans.java
IntegrationTest.java
Basically there were a couple of things wrong with my initial attempt. Here is what I changed to make it work:
1) When creating the AnnotationConfigApplicationContext, it must be created with a configuration class as parameter that is marked with the @EnableIntegration annotation. If not, then a component must be scanned by the context that contains this annotation. I did do this in my first attempt but called refresh too late, it should be called directly after ctx.scan. Because my ctx.refresh() was after my beanfactory registrations, @EnableIntegration was actually not set when the integration beans were created. Moving ctx.refresh() directly below ctx.scan() solves the problem.
2) Each bean registered into the context must also be initialized by the beanfactory. This is to ensure that the BeanPostProcessors are run (this is not done automatically by registerSingleton).
3) ctx.start() then needs to be called to enable the beans that were created after ctx.refresh().
There is no one who is going to consume message from your
gateway.setReplyChannel(output);
.At least we don't see anything like:
In most cases we have
Dispatcher has no subscribers
if some yourSubscribableChannel
is without any subscribers: not configured or stopped.EDIT
Forget my previous expression. It is for the
outbound
case.Your
TcpInboundGateway
is good. Although you don't needsetReplyChannel()
because you always can rely on the default built-inTemporaryReplyChannel
to wait for some result from downstream flow.Your
IntegrationFlow
also looks good. And that's correct that the.transform()
doesn't send anything to any other channel. It just relies on theTemporaryReplyChannel
in headers.I think your problem is that you don't specify
@EnableIntegraiton
for any of your@Configuration
class: http://docs.spring.io/spring-integration/reference/html/overview.html#_configurationEDIT 2
See the GH issue on the matter.
So, what you need in addition to your code is:
beanFactory.initializeBean();
for each your manualregisterSingleton()
. Because see JavaDocs of the last one:Do that already after
ctx.refresh()
to let to be registered all necessaryBeanPostProcessor
s including one for Spring Integration Java DSL parsing.Invoke
ctx.start()
to start all theLifecycle
s. Because these new manually added haven't been visible by the regularctx.refresh()
process.