I am trying to use rabbitmq using spring amqp, below is my configuration.
<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:queue name="${rabbitmq.import.queue}" />
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" routing-key="${rabbitmq.import.queue}"/>
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" method="onMessage" />
</rabbit:listener-container>
This is simple Message Listener class,
public class ImportMessageListener {
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
return message;
}
}
This is producer (which is itemWriter of spring batch),
public class ImportItemWriter<T> implements ItemWriter<T> {
private AmqpTemplate template;
public AmqpTemplate getTemplate() {
return template;
}
public void setTemplate(AmqpTemplate template) {
this.template = template;
}
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
}
}
}
When I run my spring batch job, each message is sent and handled one by one and I am gettig below response
consumer output: 1
producer output: 1
consumer output: 2
producer output: 2
consumer output: 3
producer output: 3
consumer output: 4
producer output: 4
consumer output: 5
producer output: 5
It shoudl send 5 messages and queue them up and 5 consumer threads (concurrency=5) should handle them concurrently and should respond as soon as it completes
So below should be the outout
consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5
producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5
I dont want the producer to wait for the reply of first message to queue the second message.
I tried using convertAndSend which makes it Asynchronous (doesnt wait for reply) but how do I get reply message in my itemWriter like I can get with convertSendAndReceive ?
If I change my template configuration to
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
<rabbit:reply-listener/>
</rabbit:template>
and if I use template.convertAndSend(item.toString()); then how can I get reply message ?
I cant attach my own message handler to this listener to get reply message the way we can attach on consumer side. For reply, it takes default RabbitmqTemplate handler.
First of all let me explain what's going on
You use synchronous
sendAndReceive
operation. Before your message will be send it is enriched withTemporaryReplyQueue
and the sender (producer) Thread is blocking to wait the reply from thatreplyQueue
. That's why your all messages are processed serially.To go ahead we need to know the reason to use that operation in the one-way
ItemWriter
producer.Maybe the
RabbitTemplate#convertAndSend
would be enough for you ?UPDATE
Maybe do you just need
RabbitTemplate.ReturnCallback
alongside withconvertAndSend
to determine, if your message was delivered or not?UPDATE 2
Another thought to achieve your requirement to use
TaskExecutor
to send and receive in parallel. With that you don't need to to wait the rely for first message to send the second:And after that you can do something with all
replies
for currentitems
.