Spring integration - AMQP backed message channels

2019-05-29 10:30发布

I am trying to use AMQP-backed message channels in my Spring Integration app, but I think I am fundamentally misunderstanding something, specifically around the Message<?> interface and how instances of GenericMessage<?> are written to and read from, a RabbitMQ queue.

Given I have a Spring Integration app containing the following domain model object:

@Immutable
class Foo {
  String name
  long quantity
}

and I declare an AMQP backed message channel called fooChannel as follows:

@Bean
public AmqpChannelFactoryBean deliveryPlacementChannel(CachingConnectionFactory connectionFactory) {
  AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true)
  factoryBean.setConnectionFactory(connectionFactory)
  factoryBean.setQueueName("foo")
  factoryBean.beanName = 'fooChannel'
  factoryBean.setPubSub(false)
  factoryBean
}

When I initially tried to send a message to my fooChannel I received a java.io.NotSerializableException. I realised this to be caused by the fact that the RabbitTemplate used by my AMQP-backed fooChannel was using a org.springframework.amqp.support.converter.SimpleMessageConverter which can only work with Strings, Serializable instances, or byte arrays, of which my Foo model is none of those things.

Therefore, I thought that I should use a org.springframework.amqp.support.converter.Jackson2JsonMessageConverter to ensure my Foo model is properly converted to/from and AMQP message. However, it appears that the type of the message that is being added to the RabbitMQ queue which is backing my fooChannel is of type org.springframework.messaging.support.GenericMessage. This means that when my AMQP backed fooChannel tries to consume messages from the RabbitMQ queue it receives the following exception:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class org.springframework.messaging.support.GenericMessage]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)

From looking at the GenericMessage class I see that is designed to be immutable, which clearly explains why the Jackson2JsonMessageConverter can't convert from JSON to the GenericMessage type. However, I am unsure what I should be doing in order to allow my fooChannel to be backed by AMQP and have the conversion of my Spring Integration messages containing my Foo model work correctly?

In terms of the flow of my application I have the following Transformer component which consumes Bar models from the (non-AMQP backed) barChannel and places Foo models on the fooChannel as follows:

@Transformer(inputChannel = 'barChannel', outputChannel = 'fooChannel')
public Foo transform(Bar bar) {
  //transform logic removed for brevity
  new Foo(name: 'Foo1', quantity: 1)
}

I then have a ServiceActivator component which I wish to have consume from my fooChannel as follows:

@ServiceActivator(inputChannel = 'fooChannel')
void consumeFoos(Foo foo){
  // Do something with foo
} 

I am using spring-integration-core:4.2.5.RELEASE and spring-integration-amqp:4.2.5.RELEASE.

Can anyone please advise where I am going wrong with the configuration of my Spring Integration application?

If any further information is needed to in order to better clarify my question or problem, please let me know. Thanks

1条回答
\"骚年 ilove
2楼-- · 2019-05-29 11:06

Yes - amqp-backed channels are currently limited to Java serializable objects.

We should provide an option to map the Message<?> to a Spring AMQP Message (like the channel adapters do) rather than...

this.amqpTemplate.convertAndSend(this.getExchangeName(), this.getRoutingKey(), message);

...which converts the entire message.

You could use a pair of channel adapters (outbound/inbound) instead of a channel.

Since you are using Java config, you could wrap the adapter pair in a new MessageChannel implementation.

I opened a JIRA Issue for this.

查看更多
登录 后发表回答