I'm trying to run same project as packaged jar file and as debug config from intellij. Intellij works fine, but in packaged .jar, it seems integration fails to initialize some channels, leading to Dispatcher has no subscribers for channel 'unknown.channel.name'.
This is DEBUG output from working case (you can see that Starting beans in phase 1073741823 is after everything is configured):
INFO [2018-02-08 11:54:19,450] org.eclipse.jetty.server.handler.ContextHandler: Started i.d.j.MutableServletContextHandler@3fa50a24{/,null,AVAILABLE}
INFO [2018-02-08 11:54:19,463] org.eclipse.jetty.server.AbstractConnector: Started application@26f480c6{HTTP/1.1,[http/1.1]}{0.0.0.0:9070}
INFO [2018-02-08 11:54:19,465] org.eclipse.jetty.server.AbstractConnector: Started admin@2567c091{HTTP/1.1,[http/1.1]}{0.0.0.0:9071}
INFO [2018-02-08 11:54:19,465] org.eclipse.jetty.server.Server: Started @6044ms
INFO [2018-02-08 11:54:19,881] org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler: Initializing ExecutorService 'taskScheduler'
INFO [2018-02-08 11:54:19,917] org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor: JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
INFO [2018-02-08 11:54:20,132] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase -2147483648
INFO [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {service-activator:searchAggregatorComponentConfig.manualNack.serviceActivator} as a subscriber to the 'manualNackChannel' channel
INFO [2018-02-08 11:54:20,133] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.manualNackChannel' has 1 subscriber(s).
INFO [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: started searchAggregatorComponentConfig.manualNack.serviceActivator
INFO [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {service-activator:searchAggregatorComponentConfig.manualAck.serviceActivator} as a subscriber to the 'manualAckChannel' channel
INFO [2018-02-08 11:54:20,133] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.manualAckChannel' has 1 subscriber(s).
INFO [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: started searchAggregatorComponentConfig.manualAck.serviceActivator
INFO [2018-02-08 11:54:20,133] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase 0
INFO [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
INFO [2018-02-08 11:54:20,133] org.springframework.integration.channel.PublishSubscribeChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.errorChannel' has 1 subscriber(s).
INFO [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: started _org.springframework.integration.errorLogger
INFO [2018-02-08 11:54:20,143] org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Attempting to connect to: [10.200.32.172, 10.200.32.160]
INFO [2018-02-08 11:54:20,371] org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Created new connection: connectionFactory#5bb0cfb4:0/SimpleConnection@6ef23b4b [delegate=amqp://betradar@10.200.32.172:5672/betradarMQ, localPort= 3485]
INFO [2018-02-08 11:54:20,375] org.springframework.amqp.rabbit.core.RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (phoenix.searchservice3.q.archiveMatchUpdate) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
INFO [2018-02-08 11:54:20,375] org.springframework.amqp.rabbit.core.RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (phoenix.searchservice3.q.settlements) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
INFO [2018-02-08 11:54:26,576] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started matchUpdateInboundAdapter
INFO [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {json-to-object-transformer} as a subscriber to the 'matchUpdateFlow.channel#0' channel
INFO [2018-02-08 11:54:26,576] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.matchUpdateFlow.channel#0' has 1 subscriber(s).
INFO [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
INFO [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {filter} as a subscriber to the 'matchUpdateFlow.channel#1' channel
INFO [2018-02-08 11:54:26,576] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.matchUpdateFlow.channel#1' has 1 subscriber(s).
INFO [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
INFO [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {transformer} as a subscriber to the 'matchUpdateFlow.channel#2' channel
INFO [2018-02-08 11:54:26,576] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.matchUpdateFlow.channel#2' has 1 subscriber(s).
INFO [2018-02-08 11:54:26,577] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
INFO [2018-02-08 11:54:27,176] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started settlementInboundAdapter
INFO [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {json-to-object-transformer} as a subscriber to the 'settlementFlow.channel#0' channel
INFO [2018-02-08 11:54:27,177] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.settlementFlow.channel#0' has 1 subscriber(s).
INFO [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
INFO [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {filter} as a subscriber to the 'settlementFlow.channel#1' channel
INFO [2018-02-08 11:54:27,177] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.settlementFlow.channel#1' has 1 subscriber(s).
INFO [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
INFO [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {aggregator} as a subscriber to the 'amqpInputChannel' channel
INFO [2018-02-08 11:54:27,177] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.amqpInputChannel' has 1 subscriber(s).
INFO [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
INFO [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {amqp:outbound-channel-adapter} as a subscriber to the 'aggregatingFlow.channel#0' channel
INFO [2018-02-08 11:54:27,177] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.aggregatingFlow.channel#0' has 1 subscriber(s).
INFO [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#6
INFO [2018-02-08 11:54:27,177] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase 1073741823
And this is output from failing case (notice that Starting beans in phase 1073741823 is before integration is run):
INFO [2018-02-08 11:52:27,923] org.eclipse.jetty.server.handler.ContextHandler: Started i.d.j.MutableServletContextHandler@40f1aa95{/,null,AVAILABLE}
INFO [2018-02-08 11:52:27,939] org.eclipse.jetty.server.AbstractConnector: Started application@4aebee4b{HTTP/1.1,[http/1.1]}{0.0.0.0:9070}
INFO [2018-02-08 11:52:27,942] org.eclipse.jetty.server.AbstractConnector: Started admin@18d47df0{HTTP/1.1,[http/1.1]}{0.0.0.0:9071}
INFO [2018-02-08 11:52:27,942] org.eclipse.jetty.server.Server: Started @3432ms
INFO [2018-02-08 11:52:28,116] org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler: Initializing ExecutorService 'taskScheduler'
INFO [2018-02-08 11:52:28,203] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase -2147483648
INFO [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {service-activator:searchAggregatorComponentConfig.manualNack.serviceActivator} as a subscriber to the 'manualNackChannel' channel
INFO [2018-02-08 11:52:28,203] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@47d384ee.manualNackChannel' has 1 subscriber(s).
INFO [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: started searchAggregatorComponentConfig.manualNack.serviceActivator
INFO [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {service-activator:searchAggregatorComponentConfig.manualAck.serviceActivator} as a subscriber to the 'manualAckChannel' channel
INFO [2018-02-08 11:52:28,203] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@47d384ee.manualAckChannel' has 1 subscriber(s).
INFO [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: started searchAggregatorComponentConfig.manualAck.serviceActivator
INFO [2018-02-08 11:52:28,203] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase 0
INFO [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
INFO [2018-02-08 11:52:28,203] org.springframework.integration.channel.PublishSubscribeChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@47d384ee.errorChannel' has 1 subscriber(s).
INFO [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: started _org.springframework.integration.errorLogger
INFO [2018-02-08 11:52:28,203] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase 1073741823
INFO [2018-02-08 11:52:28,220] org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Attempting to connect to: [10.200.32.172, 10.200.32.160]
INFO [2018-02-08 11:52:28,452] org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Created new connection: connectionFactory#21cf8ba8:0/SimpleConnection@7aab9c72 [delegate=amqp://betradar@10.200.32.172:5672/betradarMQ, localPort= 3435]
INFO [2018-02-08 11:52:28,454] org.springframework.amqp.rabbit.core.RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (phoenix.searchservice3.q.archiveMatchUpdate) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
INFO [2018-02-08 11:52:28,454] org.springframework.amqp.rabbit.core.RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (phoenix.searchservice3.q.settlements) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
INFO [2018-02-08 11:52:34,774] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started matchUpdateInboundAdapter
INFO [2018-02-08 11:52:35,357] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started settlementInboundAdapter
Hopefully somebody has some idea what could be wrong? I also noticed that line INFO [2018-02-08 11:54:19,917] org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor: JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
is in working printout.
** UPDATE **
Here is config:
@Configuration
@EnableIntegration
@Import({ CommonSearchConfig.class })
public class SearchAggregatorComponentConfig {
private final org.slf4j.Logger logger = LoggerFactory.getLogger(SearchAggregatorComponentConfig.class);
private Param param;
@Resource
private CommonSearchConfig commonConfig;
public SearchAggregatorComponentConfig(Param param) {
this.param = param;
}
@Bean
public SearchAggregatorComponent searchAggregatorComponent() {
return new SearchAggregatorComponent(param, commonConfig.metricsServlet());
}
@Bean
public Exchange matchExchange() {
return new TopicExchange("phoenix.fixture.ex.match");
}
@Bean
public FanoutExchange settlementExchange() {
return new FanoutExchange("phoenix.lcoo.ex.searchservice.settlements");
}
@Bean
public Queue matchUpdateQueue() {
return QueueBuilder.nonDurable(RABBIT_PREFIX + "q.archiveMatchUpdate")
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
.withArgument("x-max-length", 10_000)
.withArgument("x-expires", TimeUnit.MINUTES.toMillis(10))
.build();
}
@Bean
public Queue settlementQueue() {
return QueueBuilder.nonDurable(RABBIT_PREFIX + "q.settlements")
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
.withArgument("x-max-length", 10_000)
.withArgument("x-expires", TimeUnit.MINUTES.toMillis(10))
.build();
}
@Bean
public Binding matchUpdateBinding() {
return BindingBuilder
.bind(matchUpdateQueue())
.to(matchExchange())
.with("finished")
.noargs();
}
@Bean
public Binding settlementsBinding() {
return BindingBuilder
.bind(settlementQueue())
.to(settlementExchange());
}
@Bean
public IntegrationFlow matchUpdateFlow() {
return IntegrationFlows
.from(matchUpdateInboundAdapter())
.transform(new JsonToObjectTransformer(Match.class))
.filter((Match m) -> m.getGuthMatchId() != null, f -> f.discardChannel(manualNackChannel()))
.transform((Match m) -> { m.setEventUrn(new UrnImpl(EventUrnType.GUTH_MATCH, m.getGuthMatchId())); return m; })
.channel(amqpInputChannel())
.get();
}
@Bean
public IntegrationFlow settlementFlow() {
return IntegrationFlows
.from(settlementInboundAdapter())
.transform(new JsonToObjectTransformer(LcooSettlementInfo.class))
.filter((LcooSettlementInfo s) -> s.getEventId() != null && s.getEventId().isGuthMatch(), f -> f.discardChannel(manualNackChannel())) //allow only guthmatch urns
.channel(amqpInputChannel())
.get();
}
@Bean
public IntegrationFlow aggregatingFlow() {
return IntegrationFlows
.from(amqpInputChannel())
.aggregate(a -> a //if declared as AggregatingMessageHandler @Bean, we can use handle()
.outputProcessor(messageProcessor())
.messageStore(messageGroupStore())
.correlationStrategy(correlationStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10))
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
)
.handle(amqpOutboundEndpoint())
.get();
}
@Bean
public SimpleMessageListenerContainer matchUpdateListenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(commonConfig.connectionFactory());
listenerContainer.setQueues(matchUpdateQueue());
listenerContainer.setConcurrentConsumers(50);
listenerContainer.setMessageConverter(jackson2JsonConverter());
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setConsumerTagStrategy(consumerTagStrategy());
return listenerContainer;
}
@Bean
public SimpleMessageListenerContainer settlementListenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(commonConfig.connectionFactory());
listenerContainer.setQueues(settlementQueue());
listenerContainer.setConcurrentConsumers(4);
listenerContainer.setMessageConverter(jackson2JsonConverter());
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setConsumerTagStrategy(consumerTagStrategy());
listenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor());
return listenerContainer;
}
@Bean
public ConsumerTagStrategy consumerTagStrategy() {
return queue -> RABBIT_PREFIX + "aggregator." + UUID.randomUUID();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel manualAckChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel manualNackChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "manualNackChannel")
public void manualNack(@Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag, @Payload Object payload) throws IOException {
channel.basicNack(tag, false, false);
logger.warn(String.format("NACK-ED message with tag %s and payload %s", tag, payload));
}
@ServiceActivator(inputChannel = "manualAckChannel")
public void manualAck(@Header(MANUAL_ACK_PAIRS) List<ManualAckPair> manualAckPairs) {
manualAckPairs.forEach(ManualAckPair::basicAck);
}
@Bean
public AmqpInboundChannelAdapter matchUpdateInboundAdapter() {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(matchUpdateListenerContainer());
adapter.setRecoveryCallback(new ErrorMessageSendingRecoverer(manualNackChannel(), nackStrategy()));
adapter.setRetryTemplate(commonConfig.retryTemplate());
return adapter;
}
private ErrorMessageStrategy nackStrategy() {
return (throwable, attributes) -> {
Message inputMessage = (Message)attributes.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY);
return new ErrorMessage(throwable, inputMessage.getHeaders());
};
}
@Bean
public AmqpInboundChannelAdapter settlementInboundAdapter() {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(settlementListenerContainer());
adapter.setRecoveryCallback(new ErrorMessageSendingRecoverer(manualNackChannel(), nackStrategy()));
adapter.setRetryTemplate(commonConfig.retryTemplate());
return adapter;
}
@Bean
public MatchAggregatorService aggregator() {
return new MatchAggregatorService();
}
@Bean
public CorrelationStrategy correlationStrategy() {
InOutLambdaCorrelationStrategy correlationStrategy = new InOutLambdaCorrelationStrategy();
Function<LcooSettlementInfo, String> correlator1 = (in) -> aggregator().correlate(in);
Function<Match, String> correlator2 = (in) -> aggregator().correlate(in);
correlationStrategy.addFunction(correlator1);
correlationStrategy.addFunction(correlator2);
return correlationStrategy;
}
@Bean
public MessageGroupProcessor messageProcessor() {
InOutLambdaMessageGroupProcessor<ElasticMatch> processor = new InOutLambdaMessageGroupProcessor<>(ElasticMatch.class);
BiConsumer<ElasticMatch, LcooSettlementInfo> aggregator1 = (out, in) -> aggregator().aggregate(out, in);
BiConsumer<ElasticMatch, Match> aggregator2 = (out, in) -> aggregator().aggregate(out, in);
processor.addConsumer(aggregator1);
processor.addConsumer(aggregator2);
return processor;
}
@Bean
public MessageGroupStore messageGroupStore() {
return new SimpleMessageStore();
}
@Bean
public RabbitTemplate ackTemplate() {
RabbitTemplate ackTemplate = new RabbitTemplate(commonConfig.connectionFactory());
ackTemplate.setMessageConverter(jackson2JsonConverter());
return ackTemplate;
}
@Bean
public MessageConverter jackson2JsonConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public PartitionMapper partitionMapper() {
return new PartitionMapper();
}
private FunctionExpression<Message<? extends ElasticMatch>> routingKeyExpression() {
return new FunctionExpression<>(m -> partitionMapper().mapToRoutingKey(m.getPayload().getEventUrn()));
}
@Bean
public AmqpOutboundEndpoint amqpOutboundEndpoint() {
AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
outboundEndpoint.setConfirmAckChannel(manualAckChannel());
outboundEndpoint.setConfirmCorrelationExpressionString("#root");
outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
return outboundEndpoint;
}
}
** UPDATE 2 **
INFO [2018-02-08 19:19:47,688] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started settlementInboundAdapter
WARN [2018-02-08 19:30:54,888] com.oddsarbitrage.phoenix.searchservice.infrastructure.components.aggregator.SearchAggregatorComponentConfig: NACK-ED message with tag 1 and payload org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[214], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=phoenix.fixture.ex.match, amqp_deliveryTag=1, amqp_consumerQueue=phoenix.searchservice3.q.archiveMatchUpdate, amqp_channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://betradar@10.200.32.172:5672/betradarMQ,5), conn: Proxy@4b49d16 Shared Rabbit Connection: SimpleConnection@72385bd9 [delegate=amqp://betradar@10.200.32.172:5672/betradarMQ, localPort= 30706], amqp_redelivered=false, amqp_receivedRoutingKey=finished, amqp_contentEncoding=UTF-8, json__TypeId__=com.oddsarbitrage.phoenix.fixtureservice.application.dto.Match, id=1e4eb74f-aa90-495c-f12c-8ac7d375a31a, amqp_consumerTag=phoenix.searchservice3.aggregator.b01d7f4d-ea00-40c0-926b-7013f9c8d363, contentType=application/json, __TypeId__=com.oddsarbitrage.phoenix.fixtureservice.application.dto.Match, timestamp=1518118254876}], failedMessage=GenericMessage [payload=byte[214], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=phoenix.fixture.ex.match, amqp_deliveryTag=1, amqp_consumerQueue=phoenix.searchservice3.q.archiveMatchUpdate, amqp_channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://betradar@10.200.32.172:5672/betradarMQ,5), conn: Proxy@4b49d16 Shared Rabbit Connection: SimpleConnection@72385bd9 [delegate=amqp://betradar@10.200.32.172:5672/betradarMQ, localPort= 30706], amqp_redelivered=false, amqp_receivedRoutingKey=finished, amqp_contentEncoding=UTF-8, json__TypeId__=com.oddsarbitrage.phoenix.fixtureservice.application.dto.Match, id=1e4eb74f-aa90-495c-f12c-8ac7d375a31a, amqp_consumerTag=phoenix.searchservice3.aggregator.b01d7f4d-ea00-40c0-926b-7013f9c8d363, contentType=application/json, __TypeId__=com.oddsarbitrage.phoenix.fixtureservice.application.dto.Match, timestamp=1518118254876}]
and other place
! org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
! at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154)
! at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
! at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
! ... 34 common frames omitted
! Causing: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[123], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=phoenix.searchservice2.ix.refreshreq, amqp_deliveryTag=4, amqp_consumerQueue=phoenix.searchservice2.q.archive.refresh, amqp_redelivered=false, amqp_receivedRoutingKey=archive, amqp_contentEncoding=UTF-8, json__TypeId__=x.RefreshRequest, id=2233fa47-146a-9644-7784-98ab3a3d1183, amqp_consumerTag=amq.ctag-isa8OgreltRjHj0kll9Lxg, contentType=application/json, __TypeId__=x.RefreshRequest, timestamp=1518022018505}]
! at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93)
! at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
! at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
! at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
! at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
! at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
! at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
If you are using the maven shade plugin, it won't work out of the box because the shade plugin doesn't know how to merge the
META-INF/spring.factories
files which are used to bootstrap certain features such as the DSL.See my answer here.
You either have to fix up the
spring.factories
file in the uber jar or move to Spring Integration 5.0.1 where the DSL is now part of core and the initializer is in the corespring.factories
file.The answer includes a work-around if you can't move to Spring Integration 5 for some reason.