Spring integration bootstrap - intellij in debug w

2019-06-11 09:39发布

I'm trying to run same project as packaged jar file and as debug config from intellij. Intellij works fine, but in packaged .jar, it seems integration fails to initialize some channels, leading to Dispatcher has no subscribers for channel 'unknown.channel.name'.

This is DEBUG output from working case (you can see that Starting beans in phase 1073741823 is after everything is configured):

INFO  [2018-02-08 11:54:19,450] org.eclipse.jetty.server.handler.ContextHandler: Started i.d.j.MutableServletContextHandler@3fa50a24{/,null,AVAILABLE}
INFO  [2018-02-08 11:54:19,463] org.eclipse.jetty.server.AbstractConnector: Started application@26f480c6{HTTP/1.1,[http/1.1]}{0.0.0.0:9070}
INFO  [2018-02-08 11:54:19,465] org.eclipse.jetty.server.AbstractConnector: Started admin@2567c091{HTTP/1.1,[http/1.1]}{0.0.0.0:9071}
INFO  [2018-02-08 11:54:19,465] org.eclipse.jetty.server.Server: Started @6044ms
INFO  [2018-02-08 11:54:19,881] org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler: Initializing ExecutorService  'taskScheduler'
INFO  [2018-02-08 11:54:19,917] org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor: JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
INFO  [2018-02-08 11:54:20,132] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase -2147483648
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {service-activator:searchAggregatorComponentConfig.manualNack.serviceActivator} as a subscriber to the 'manualNackChannel' channel
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.manualNackChannel' has 1 subscriber(s).
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: started searchAggregatorComponentConfig.manualNack.serviceActivator
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {service-activator:searchAggregatorComponentConfig.manualAck.serviceActivator} as a subscriber to the 'manualAckChannel' channel
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.manualAckChannel' has 1 subscriber(s).
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: started searchAggregatorComponentConfig.manualAck.serviceActivator
INFO  [2018-02-08 11:54:20,133] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase 0
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.channel.PublishSubscribeChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.errorChannel' has 1 subscriber(s).
INFO  [2018-02-08 11:54:20,133] org.springframework.integration.endpoint.EventDrivenConsumer: started _org.springframework.integration.errorLogger
INFO  [2018-02-08 11:54:20,143] org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Attempting to connect to: [10.200.32.172, 10.200.32.160]
INFO  [2018-02-08 11:54:20,371] org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Created new connection: connectionFactory#5bb0cfb4:0/SimpleConnection@6ef23b4b [delegate=amqp://betradar@10.200.32.172:5672/betradarMQ, localPort= 3485]
INFO  [2018-02-08 11:54:20,375] org.springframework.amqp.rabbit.core.RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (phoenix.searchservice3.q.archiveMatchUpdate) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
INFO  [2018-02-08 11:54:20,375] org.springframework.amqp.rabbit.core.RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (phoenix.searchservice3.q.settlements) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started matchUpdateInboundAdapter
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {json-to-object-transformer} as a subscriber to the 'matchUpdateFlow.channel#0' channel
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.matchUpdateFlow.channel#0' has 1 subscriber(s).
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {filter} as a subscriber to the 'matchUpdateFlow.channel#1' channel
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.matchUpdateFlow.channel#1' has 1 subscriber(s).
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {transformer} as a subscriber to the 'matchUpdateFlow.channel#2' channel
INFO  [2018-02-08 11:54:26,576] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.matchUpdateFlow.channel#2' has 1 subscriber(s).
INFO  [2018-02-08 11:54:26,577] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
INFO  [2018-02-08 11:54:27,176] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started settlementInboundAdapter
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {json-to-object-transformer} as a subscriber to the 'settlementFlow.channel#0' channel
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.settlementFlow.channel#0' has 1 subscriber(s).
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {filter} as a subscriber to the 'settlementFlow.channel#1' channel
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.settlementFlow.channel#1' has 1 subscriber(s).
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {aggregator} as a subscriber to the 'amqpInputChannel' channel
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.amqpInputChannel' has 1 subscriber(s).
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {amqp:outbound-channel-adapter} as a subscriber to the 'aggregatingFlow.channel#0' channel
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@50a638b5.aggregatingFlow.channel#0' has 1 subscriber(s).
INFO  [2018-02-08 11:54:27,177] org.springframework.integration.endpoint.EventDrivenConsumer: started org.springframework.integration.config.ConsumerEndpointFactoryBean#6
INFO  [2018-02-08 11:54:27,177] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase 1073741823

And this is output from failing case (notice that Starting beans in phase 1073741823 is before integration is run):

INFO  [2018-02-08 11:52:27,923] org.eclipse.jetty.server.handler.ContextHandler: Started i.d.j.MutableServletContextHandler@40f1aa95{/,null,AVAILABLE}
INFO  [2018-02-08 11:52:27,939] org.eclipse.jetty.server.AbstractConnector: Started application@4aebee4b{HTTP/1.1,[http/1.1]}{0.0.0.0:9070}
INFO  [2018-02-08 11:52:27,942] org.eclipse.jetty.server.AbstractConnector: Started admin@18d47df0{HTTP/1.1,[http/1.1]}{0.0.0.0:9071}
INFO  [2018-02-08 11:52:27,942] org.eclipse.jetty.server.Server: Started @3432ms
INFO  [2018-02-08 11:52:28,116] org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler: Initializing ExecutorService  'taskScheduler'
INFO  [2018-02-08 11:52:28,203] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase -2147483648
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {service-activator:searchAggregatorComponentConfig.manualNack.serviceActivator} as a subscriber to the 'manualNackChannel' channel
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@47d384ee.manualNackChannel' has 1 subscriber(s).
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: started searchAggregatorComponentConfig.manualNack.serviceActivator
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {service-activator:searchAggregatorComponentConfig.manualAck.serviceActivator} as a subscriber to the 'manualAckChannel' channel
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.channel.DirectChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@47d384ee.manualAckChannel' has 1 subscriber(s).
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: started searchAggregatorComponentConfig.manualAck.serviceActivator
INFO  [2018-02-08 11:52:28,203] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase 0
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.channel.PublishSubscribeChannel: Channel 'org.springframework.web.context.support.AnnotationConfigWebApplicationContext@47d384ee.errorChannel' has 1 subscriber(s).
INFO  [2018-02-08 11:52:28,203] org.springframework.integration.endpoint.EventDrivenConsumer: started _org.springframework.integration.errorLogger
INFO  [2018-02-08 11:52:28,203] org.springframework.context.support.DefaultLifecycleProcessor: Starting beans in phase 1073741823
INFO  [2018-02-08 11:52:28,220] org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Attempting to connect to: [10.200.32.172, 10.200.32.160]
INFO  [2018-02-08 11:52:28,452] org.springframework.amqp.rabbit.connection.CachingConnectionFactory: Created new connection: connectionFactory#21cf8ba8:0/SimpleConnection@7aab9c72 [delegate=amqp://betradar@10.200.32.172:5672/betradarMQ, localPort= 3435]
INFO  [2018-02-08 11:52:28,454] org.springframework.amqp.rabbit.core.RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (phoenix.searchservice3.q.archiveMatchUpdate) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
INFO  [2018-02-08 11:52:28,454] org.springframework.amqp.rabbit.core.RabbitAdmin: Auto-declaring a non-durable, auto-delete, or exclusive Queue (phoenix.searchservice3.q.settlements) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
INFO  [2018-02-08 11:52:34,774] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started matchUpdateInboundAdapter
INFO  [2018-02-08 11:52:35,357] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started settlementInboundAdapter

Hopefully somebody has some idea what could be wrong? I also noticed that line INFO [2018-02-08 11:54:19,917] org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor: JSR-330 'javax.inject.Inject' annotation found and supported for autowiring is in working printout.

** UPDATE **

Here is config:

@Configuration
@EnableIntegration
@Import({ CommonSearchConfig.class })
public class SearchAggregatorComponentConfig {
    private final org.slf4j.Logger logger = LoggerFactory.getLogger(SearchAggregatorComponentConfig.class);
    private Param param;

    @Resource
    private CommonSearchConfig commonConfig;

    public SearchAggregatorComponentConfig(Param param) {
        this.param = param;
    }

    @Bean
    public SearchAggregatorComponent searchAggregatorComponent() {
        return new SearchAggregatorComponent(param, commonConfig.metricsServlet());
    }

    @Bean
    public Exchange matchExchange() {
        return new TopicExchange("phoenix.fixture.ex.match");
    }

    @Bean
    public FanoutExchange settlementExchange() {
        return new FanoutExchange("phoenix.lcoo.ex.searchservice.settlements");
    }

    @Bean
    public Queue matchUpdateQueue() {
        return QueueBuilder.nonDurable(RABBIT_PREFIX + "q.archiveMatchUpdate")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
                .withArgument("x-max-length", 10_000)
                .withArgument("x-expires", TimeUnit.MINUTES.toMillis(10))
                .build();
    }

    @Bean
    public Queue settlementQueue() {
        return QueueBuilder.nonDurable(RABBIT_PREFIX + "q.settlements")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
                .withArgument("x-max-length", 10_000)
                .withArgument("x-expires", TimeUnit.MINUTES.toMillis(10))
                .build();
    }

    @Bean
    public Binding matchUpdateBinding() {
        return BindingBuilder
                .bind(matchUpdateQueue())
                .to(matchExchange())
                .with("finished")
                .noargs();
    }

    @Bean
    public Binding settlementsBinding() {
        return BindingBuilder
                .bind(settlementQueue())
                .to(settlementExchange());
    }

    @Bean
    public IntegrationFlow matchUpdateFlow() {
        return IntegrationFlows
                .from(matchUpdateInboundAdapter())
                .transform(new JsonToObjectTransformer(Match.class))
                .filter((Match m) -> m.getGuthMatchId() != null,  f -> f.discardChannel(manualNackChannel()))   
                .transform((Match m) -> { m.setEventUrn(new UrnImpl(EventUrnType.GUTH_MATCH, m.getGuthMatchId())); return m; }) 
                .channel(amqpInputChannel())
                .get();
    }

    @Bean
    public IntegrationFlow settlementFlow() {
        return IntegrationFlows
                .from(settlementInboundAdapter())
                .transform(new JsonToObjectTransformer(LcooSettlementInfo.class))
                .filter((LcooSettlementInfo s) -> s.getEventId() != null && s.getEventId().isGuthMatch(), f -> f.discardChannel(manualNackChannel()))  //allow only guthmatch urns
                .channel(amqpInputChannel())
                .get();
    }

    @Bean
    public IntegrationFlow aggregatingFlow() {
        return IntegrationFlows
                .from(amqpInputChannel())
                .aggregate(a -> a  //if declared as AggregatingMessageHandler @Bean, we can use handle()
                        .outputProcessor(messageProcessor())
                        .messageStore(messageGroupStore())
                        .correlationStrategy(correlationStrategy())
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(TimeUnit.SECONDS.toMillis(10))
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
                )
                .handle(amqpOutboundEndpoint())
                .get();
    }

    @Bean
    public SimpleMessageListenerContainer matchUpdateListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(commonConfig.connectionFactory());
        listenerContainer.setQueues(matchUpdateQueue());
        listenerContainer.setConcurrentConsumers(50);
        listenerContainer.setMessageConverter(jackson2JsonConverter());
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setConsumerTagStrategy(consumerTagStrategy());
        return listenerContainer;
    }

    @Bean
    public SimpleMessageListenerContainer settlementListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(commonConfig.connectionFactory());
        listenerContainer.setQueues(settlementQueue());
        listenerContainer.setConcurrentConsumers(4);
        listenerContainer.setMessageConverter(jackson2JsonConverter());
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setConsumerTagStrategy(consumerTagStrategy());
        listenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor());
        return listenerContainer;
    }

    @Bean
    public ConsumerTagStrategy consumerTagStrategy() {
        return queue -> RABBIT_PREFIX + "aggregator." + UUID.randomUUID();
    }

    @Bean
    public MessageChannel amqpInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel manualAckChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel manualNackChannel() {
        return new DirectChannel();
    }

    @ServiceActivator(inputChannel = "manualNackChannel")
    public void manualNack(@Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag, @Payload Object payload) throws IOException {
        channel.basicNack(tag, false, false);
        logger.warn(String.format("NACK-ED message with tag %s and payload %s", tag, payload));
    }

    @ServiceActivator(inputChannel = "manualAckChannel")
    public void manualAck(@Header(MANUAL_ACK_PAIRS) List<ManualAckPair> manualAckPairs) {
        manualAckPairs.forEach(ManualAckPair::basicAck);
    }

    @Bean
    public AmqpInboundChannelAdapter matchUpdateInboundAdapter() {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(matchUpdateListenerContainer());
        adapter.setRecoveryCallback(new ErrorMessageSendingRecoverer(manualNackChannel(), nackStrategy()));
        adapter.setRetryTemplate(commonConfig.retryTemplate());
        return adapter;
    }

    private ErrorMessageStrategy nackStrategy() {
        return (throwable, attributes) -> {
            Message inputMessage = (Message)attributes.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY);
            return new ErrorMessage(throwable, inputMessage.getHeaders());
        };
    }

    @Bean
    public AmqpInboundChannelAdapter settlementInboundAdapter() {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(settlementListenerContainer());
        adapter.setRecoveryCallback(new ErrorMessageSendingRecoverer(manualNackChannel(), nackStrategy()));
        adapter.setRetryTemplate(commonConfig.retryTemplate());
        return adapter;
    }

    @Bean
    public MatchAggregatorService aggregator() {
        return new MatchAggregatorService();
    }

    @Bean
    public CorrelationStrategy correlationStrategy() {
        InOutLambdaCorrelationStrategy correlationStrategy = new InOutLambdaCorrelationStrategy();
        Function<LcooSettlementInfo, String> correlator1 = (in) -> aggregator().correlate(in);
        Function<Match, String> correlator2 = (in) -> aggregator().correlate(in);
        correlationStrategy.addFunction(correlator1);
        correlationStrategy.addFunction(correlator2);
        return correlationStrategy;
    }

    @Bean
    public MessageGroupProcessor messageProcessor() {
        InOutLambdaMessageGroupProcessor<ElasticMatch> processor = new InOutLambdaMessageGroupProcessor<>(ElasticMatch.class);
        BiConsumer<ElasticMatch, LcooSettlementInfo> aggregator1 = (out, in) -> aggregator().aggregate(out, in);
        BiConsumer<ElasticMatch, Match> aggregator2 = (out, in) -> aggregator().aggregate(out, in);
        processor.addConsumer(aggregator1);
        processor.addConsumer(aggregator2);
        return processor;
    }

    @Bean
    public MessageGroupStore messageGroupStore() {
        return new SimpleMessageStore();
    }

    @Bean
    public RabbitTemplate ackTemplate() {
        RabbitTemplate ackTemplate = new RabbitTemplate(commonConfig.connectionFactory());
        ackTemplate.setMessageConverter(jackson2JsonConverter());
        return ackTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public PartitionMapper partitionMapper() {
        return new PartitionMapper();
    }

    private FunctionExpression<Message<? extends ElasticMatch>> routingKeyExpression() {
        return new FunctionExpression<>(m -> partitionMapper().mapToRoutingKey(m.getPayload().getEventUrn()));
    }

    @Bean
    public AmqpOutboundEndpoint amqpOutboundEndpoint() {
        AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
        outboundEndpoint.setConfirmAckChannel(manualAckChannel());
        outboundEndpoint.setConfirmCorrelationExpressionString("#root");
        outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
        outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
        return outboundEndpoint;
    }
}

** UPDATE 2 **

INFO  [2018-02-08 19:19:47,688] org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter: started settlementInboundAdapter
WARN  [2018-02-08 19:30:54,888] com.oddsarbitrage.phoenix.searchservice.infrastructure.components.aggregator.SearchAggregatorComponentConfig: NACK-ED message with tag 1 and payload org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[214], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=phoenix.fixture.ex.match, amqp_deliveryTag=1, amqp_consumerQueue=phoenix.searchservice3.q.archiveMatchUpdate, amqp_channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://betradar@10.200.32.172:5672/betradarMQ,5), conn: Proxy@4b49d16 Shared Rabbit Connection: SimpleConnection@72385bd9 [delegate=amqp://betradar@10.200.32.172:5672/betradarMQ, localPort= 30706], amqp_redelivered=false, amqp_receivedRoutingKey=finished, amqp_contentEncoding=UTF-8, json__TypeId__=com.oddsarbitrage.phoenix.fixtureservice.application.dto.Match, id=1e4eb74f-aa90-495c-f12c-8ac7d375a31a, amqp_consumerTag=phoenix.searchservice3.aggregator.b01d7f4d-ea00-40c0-926b-7013f9c8d363, contentType=application/json, __TypeId__=com.oddsarbitrage.phoenix.fixtureservice.application.dto.Match, timestamp=1518118254876}], failedMessage=GenericMessage [payload=byte[214], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=phoenix.fixture.ex.match, amqp_deliveryTag=1, amqp_consumerQueue=phoenix.searchservice3.q.archiveMatchUpdate, amqp_channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://betradar@10.200.32.172:5672/betradarMQ,5), conn: Proxy@4b49d16 Shared Rabbit Connection: SimpleConnection@72385bd9 [delegate=amqp://betradar@10.200.32.172:5672/betradarMQ, localPort= 30706], amqp_redelivered=false, amqp_receivedRoutingKey=finished, amqp_contentEncoding=UTF-8, json__TypeId__=com.oddsarbitrage.phoenix.fixtureservice.application.dto.Match, id=1e4eb74f-aa90-495c-f12c-8ac7d375a31a, amqp_consumerTag=phoenix.searchservice3.aggregator.b01d7f4d-ea00-40c0-926b-7013f9c8d363, contentType=application/json, __TypeId__=com.oddsarbitrage.phoenix.fixtureservice.application.dto.Match, timestamp=1518118254876}]

and other place

! org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
! at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154)
! at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
! at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
! ... 34 common frames omitted
! Causing: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[123], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=phoenix.searchservice2.ix.refreshreq, amqp_deliveryTag=4, amqp_consumerQueue=phoenix.searchservice2.q.archive.refresh, amqp_redelivered=false, amqp_receivedRoutingKey=archive, amqp_contentEncoding=UTF-8, json__TypeId__=x.RefreshRequest, id=2233fa47-146a-9644-7784-98ab3a3d1183, amqp_consumerTag=amq.ctag-isa8OgreltRjHj0kll9Lxg, contentType=application/json, __TypeId__=x.RefreshRequest, timestamp=1518022018505}]
! at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93)
! at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
! at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
! at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
! at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
! at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
! at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)

1条回答
smile是对你的礼貌
2楼-- · 2019-06-11 10:01

If you are using the maven shade plugin, it won't work out of the box because the shade plugin doesn't know how to merge the META-INF/spring.factories files which are used to bootstrap certain features such as the DSL.

See my answer here.

You either have to fix up the spring.factories file in the uber jar or move to Spring Integration 5.0.1 where the DSL is now part of core and the initializer is in the core spring.factories file.

The answer includes a work-around if you can't move to Spring Integration 5 for some reason.

查看更多
登录 后发表回答