Spring Integration Aggregator with MongoDbMessageS

2019-08-05 04:14发布

I'm using Spring Integration 4.3.11 with Spring Data MongoDB 1.10.6. I have a message aggregator that uses MongoDbMessageStore. When a message from a message sequence comes in, it's persisted in MongoDB, but immediately after that (in o.s.i.store.AbstractMessageGroupStore#addMessageToGroup) an exception is thrown when the same message is read from MongoDB that

Failed to instantiate [org.springframework.messaging.support.GenericMessage]: No default constructor found

Full stacktrace:

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.dsl.AggregatorSpec$InternalAggregatingMessageHandler#0]; nested exception is org.springframework.data.mapping.model.MappingInstantiationException: Failed to instantiate org.springframework.messaging.support.GenericMessage using constructor NO_CONSTRUCTOR with arguments 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:53)
    at org.springframework.integration.dispatcher.UnicastingDispatcher$3.run(UnicastingDispatcher.java:129)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.data.mapping.model.MappingInstantiationException: Failed to instantiate org.springframework.messaging.support.GenericMessage using constructor NO_CONSTRUCTOR with arguments 
    at org.springframework.data.convert.ReflectionEntityInstantiator.createInstance(ReflectionEntityInstantiator.java:64)
    at org.springframework.data.convert.ClassGeneratingEntityInstantiator.createInstance(ClassGeneratingEntityInstantiator.java:83)
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:259)
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:239)
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:199)
    at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:195)
    at org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageReadingMongoConverter.read(MongoDbMessageStore.java:520)
    at org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageReadingMongoConverter.read(MongoDbMessageStore.java:529)
    at org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageReadingMongoConverter.read(MongoDbMessageStore.java:485)
    at org.springframework.data.mongodb.core.MongoTemplate$ReadDbObjectCallback.doWith(MongoTemplate.java:2324)
    at org.springframework.data.mongodb.core.MongoTemplate.executeFindMultiInternal(MongoTemplate.java:1969)
    at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1787)
    at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1770)
    at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:644)
    at org.springframework.data.mongodb.core.MongoTemplate.findOne(MongoTemplate.java:609)
    at org.springframework.integration.mongodb.store.MongoDbMessageStore.getMessageGroup(MongoDbMessageStore.java:245)
    at org.springframework.integration.store.AbstractMessageGroupStore.addMessageToGroup(AbstractMessageGroupStore.java:210)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:621)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:413)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    ... 9 common frames omitted
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.messaging.support.GenericMessage]: No default constructor found; nested exception is java.lang.NoSuchMethodException: org.springframework.messaging.support.GenericMessage.<init>()
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:105)
    at org.springframework.data.convert.ReflectionEntityInstantiator.createInstance(ReflectionEntityInstantiator.java:61)
    ... 28 common frames omitted
Caused by: java.lang.NoSuchMethodException: org.springframework.messaging.support.GenericMessage.<init>()
    at java.lang.Class.getConstructor0(Class.java:3082)
    at java.lang.Class.getDeclaredConstructor(Class.java:2178)
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:102)
    ... 29 common frames omitted

I've set up the aggregator with Spring Integration DSL as follows:

IntegrationFlows
.from(
        Amqp.inboundAdapter(connectionFactory, properties.getQueue())
                .errorChannel(errorChannel)
                .mappedRequestHeaders(MAPPED_AMQP_HEADERS)
                .taskExecutor(amqpConsumerExecutor)
                .concurrentConsumers(properties.getConcurrentConsumers())
                .messageConverter(new Jackson2JsonMessageConverter())
                .defaultRequeueRejected(false)
)
.channel(resultChannel)
.aggregate(aggregator -> aggregator
        .expireGroupsUponCompletion(true)
        .groupTimeout(properties.getGroupTimeout())
        .expireGroupsUponTimeout(true)
        .messageStore(new MongoDbMessageStore(mongoDbFactory))
        .outputProcessor(new ResponseAggregator())
)
// ...

The mongoDbFactory is autoconfigured by Spring Boot.

The message persisted in MongoDB:

{
    "_id" : ObjectId("59ca5c1349ac540d17911afc"),
    "_createdDate" : NumberLong(1506434067236),
    "_class" : "org.springframework.integration.mongodb.store.MongoDbMessageStore$MessageWrapper",
    "_groupId" : "39f292df-4b75-6531-e570-1950ef6bb8e2",
    "_messageType" : "org.springframework.messaging.support.GenericMessage",
    "payload" : {
        "_class" : "com.example.Response",
        "status" : 0
    },
    "headers" : {
        "amqp_receivedDeliveryMode" : "PERSISTENT",
        "sequenceNumber" : 1,
        "amqp_deliveryTag" : NumberLong(3),
        "sequenceSize" : 15,
        "amqp_consumerQueue" : "response",
        "amqp_redelivered" : false,
        "amqp_receivedRoutingKey" : "response",
        "amqp_contentEncoding" : "UTF-8",
        "json__TypeId__" : "com.example.Response",
        "correlationId" : "39f292df-4b75-6531-e570-1950ef6bb8e2",
        "id" : LUUID("fcf5ba73-30a0-1674-2767-d9980c09fb49"),
        "amqp_consumerTag" : "amq.ctag-A0tTqrEhhbEDxN5OEGs3hA",
        "contentType" : "application/json",
        "timestamp" : NumberLong(1506434067171)
    },
    "_group_timestamp" : NumberLong(1506434067179),
    "_group_update_timestamp" : NumberLong(1506434067179),
    "_last_released_sequence" : 0,
    "_group_complete" : false,
    "sequence" : 2
}

What am I doing wrong?

1条回答
Bombasti
2楼-- · 2019-08-05 04:31

You have to declare MongoDbMessageStore as a bean:

@Bean
MongoDbMessageStore messageStore() {
    return new MongoDbMessageStore(mongoDbFactory);
}

...

.messageStore(messageStore())

It has particular BeanFactoryAware logic including:

public void afterPropertiesSet() throws Exception {
    if (this.applicationContext != null) {
        this.converter.setApplicationContext(this.applicationContext);
    }
    this.converter.afterPropertiesSet();

Where that MessageReadingMongoConverter performs the logic like:

 public void afterPropertiesSet() {
        List<Object> customConverters = new ArrayList<Object>();
        customConverters.add(new UuidToDBObjectConverter());
        customConverters.add(new DBObjectToUUIDConverter());
        customConverters.add(new MessageHistoryToDBObjectConverter());
        customConverters.add(new DBObjectToGenericMessageConverter());
        customConverters.add(new DBObjectToMutableMessageConverter());
        DBObjectToErrorMessageConverter docToErrorMessageConverter = new DBObjectToErrorMessageConverter();
        if (MongoDbMessageStore.this.whiteListPatterns != null) {
            docToErrorMessageConverter.deserializingConverter
                    .addWhiteListPatterns(MongoDbMessageStore.this.whiteListPatterns);
        }
        customConverters.add(docToErrorMessageConverter);
        customConverters.add(new DBObjectToAdviceMessageConverter());
        customConverters.add(new ThrowableToBytesConverter());
        this.setCustomConversions(new CustomConversions(customConverters));
        super.afterPropertiesSet();
    }

Pay attention to the DBObjectToGenericMessageConverter. That is how GenericMessage will be instantiate instead of default constructor-based strategy.

查看更多
登录 后发表回答