How to send messages asynchronously to queue them

2019-07-23 11:57发布

I am trying to use rabbitmq using spring amqp, below is my configuration.

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"  routing-key="${rabbitmq.import.queue}"/>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener"  method="onMessage" />
</rabbit:listener-container>

This is simple Message Listener class,

public class ImportMessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
        return message;
    }

}

This is producer (which is itemWriter of spring batch),

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

When I run my spring batch job, each message is sent and handled one by one and I am gettig below response

consumer output: 1
producer output: 1

consumer output: 2
producer output: 2

consumer output: 3
producer output: 3

consumer output: 4
producer output: 4


consumer output: 5
producer output: 5

It shoudl send 5 messages and queue them up and 5 consumer threads (concurrency=5) should handle them concurrently and should respond as soon as it completes

So below should be the outout

consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5

producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5

I dont want the producer to wait for the reply of first message to queue the second message.

I tried using convertAndSend which makes it Asynchronous (doesnt wait for reply) but how do I get reply message in my itemWriter like I can get with convertSendAndReceive ?

If I change my template configuration to

<rabbit:template id="importAmqpTemplate"
        connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
        routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
        <rabbit:reply-listener/>
    </rabbit:template>

and if I use template.convertAndSend(item.toString()); then how can I get reply message ?

I cant attach my own message handler to this listener to get reply message the way we can attach on consumer side. For reply, it takes default RabbitmqTemplate handler.

1条回答
神经病院院长
2楼-- · 2019-07-23 12:02

First of all let me explain what's going on

You use synchronous sendAndReceive operation. Before your message will be send it is enriched with TemporaryReplyQueue and the sender (producer) Thread is blocking to wait the reply from that replyQueue. That's why your all messages are processed serially.

To go ahead we need to know the reason to use that operation in the one-way ItemWriter producer.

Maybe the RabbitTemplate#convertAndSend would be enough for you ?

UPDATE

Maybe do you just need RabbitTemplate.ReturnCallback alongside with convertAndSend to determine, if your message was delivered or not?

UPDATE 2

Another thought to achieve your requirement to use TaskExecutor to send and receive in parallel. With that you don't need to to wait the rely for first message to send the second:

final List<Object> replies = new ArrayList<Object>();
ExecutorService executor = Executors.newCachedThreadPool();
for (T item : items) {
     executor.execute(new Runnable() {

         public void run() {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
            replies.add(reply);
         }
     });
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

And after that you can do something with all replies for current items.

查看更多
登录 后发表回答