I am trying to use AMQP-backed message channels in my Spring Integration app, but I think I am fundamentally misunderstanding something, specifically around the Message<?>
interface and how instances of GenericMessage<?>
are written to and read from, a RabbitMQ queue.
Given I have a Spring Integration app containing the following domain model object:
@Immutable
class Foo {
String name
long quantity
}
and I declare an AMQP backed message channel called fooChannel
as follows:
@Bean
public AmqpChannelFactoryBean deliveryPlacementChannel(CachingConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true)
factoryBean.setConnectionFactory(connectionFactory)
factoryBean.setQueueName("foo")
factoryBean.beanName = 'fooChannel'
factoryBean.setPubSub(false)
factoryBean
}
When I initially tried to send a message to my fooChannel
I received a java.io.NotSerializableException
. I realised this to be caused by the fact that the RabbitTemplate
used by my AMQP-backed fooChannel
was using a org.springframework.amqp.support.converter.SimpleMessageConverter
which can only work with Strings, Serializable instances, or byte arrays, of which my Foo
model is none of those things.
Therefore, I thought that I should use a org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
to ensure my Foo
model is properly converted to/from and AMQP message. However, it appears that the type of the message that is being added to the RabbitMQ queue which is backing my fooChannel
is of type org.springframework.messaging.support.GenericMessage
. This means that when my AMQP backed fooChannel
tries to consume messages from the RabbitMQ queue it receives the following exception:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class org.springframework.messaging.support.GenericMessage]: can not instantiate from JSON object (missing default constructor or creator, or perhaps need to add/enable type information?)
From looking at the GenericMessage
class I see that is designed to be immutable, which clearly explains why the Jackson2JsonMessageConverter
can't convert from JSON to the GenericMessage
type. However, I am unsure what I should be doing in order to allow my fooChannel
to be backed by AMQP and have the conversion of my Spring Integration messages containing my Foo
model work correctly?
In terms of the flow of my application I have the following Transformer
component which consumes Bar
models from the (non-AMQP backed) barChannel
and places Foo
models on the fooChannel
as follows:
@Transformer(inputChannel = 'barChannel', outputChannel = 'fooChannel')
public Foo transform(Bar bar) {
//transform logic removed for brevity
new Foo(name: 'Foo1', quantity: 1)
}
I then have a ServiceActivator
component which I wish to have consume from my fooChannel
as follows:
@ServiceActivator(inputChannel = 'fooChannel')
void consumeFoos(Foo foo){
// Do something with foo
}
I am using spring-integration-core:4.2.5.RELEASE
and spring-integration-amqp:4.2.5.RELEASE
.
Can anyone please advise where I am going wrong with the configuration of my Spring Integration application?
If any further information is needed to in order to better clarify my question or problem, please let me know. Thanks
Yes - amqp-backed channels are currently limited to Java serializable objects.
We should provide an option to map the
Message<?>
to a Spring AMQPMessage
(like the channel adapters do) rather than......which converts the entire message.
You could use a pair of channel adapters (outbound/inbound) instead of a channel.
Since you are using Java config, you could wrap the adapter pair in a new
MessageChannel
implementation.I opened a JIRA Issue for this.