如何以异步方式将消息发送给排队起来,而不在春天等待每个消息的回复AMQP在Java中使用Rabbit

2019-10-19 23:40发布

我试图用簧AMQP使用RabbitMQ的,下面是我的配置。

<rabbit:connection-factory id="rabbitConnectionFactory"
    port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
    connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"  routing-key="${rabbitmq.import.queue}"/>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener"  method="onMessage" />
</rabbit:listener-container>

这是简单的消息监听器类,

public class ImportMessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println("consumer output: " + message);
        return message;
    }

}

这是生产商(其是弹簧批次的itemWriter),

public class ImportItemWriter<T> implements ItemWriter<T> {

    private AmqpTemplate template;

    public AmqpTemplate getTemplate() {
        return template;
    }

    public void setTemplate(AmqpTemplate template) {
        this.template = template;
    }

    public void write(List<? extends T> items) throws Exception {
        for (T item : items) {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
        }
    }

}

当我运行我的春天批处理作业,每封邮件被发送,并通过一个处理一个,我gettig以下响应

consumer output: 1
producer output: 1

consumer output: 2
producer output: 2

consumer output: 3
producer output: 3

consumer output: 4
producer output: 4


consumer output: 5
producer output: 5

它shoudl发送5个信息并预约和5个消费者线程(并发= 5)应同时处理他们,应尽快,因为它完成响应

So below should be the outout

consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5

producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5

我不想制片人等待第一条消息的队列第二消息的回复。

我试着用convertAndSend这使得异步(犯规等待答复),但我怎么在我的itemWriter就像我可以convertSendAndReceive得到回复消息?

如果我改变我的模板配置

<rabbit:template id="importAmqpTemplate"
        connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
        routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
        <rabbit:reply-listener/>
    </rabbit:template>

和,如果我使用template.convertAndSend(item.toString()); 那么怎样才能得到回复消息?

我不能我自己的消息处理程序附加到这个监听得到回复消息,我们可以在消费者端连接的方式。 对于回复,它需要默认RabbitmqTemplate处理程序。

Answer 1:

首先,让我解释一下这是怎么回事

您可以使用同步 sendAndReceive操作。 您的信息将被发送之前,它富含TemporaryReplyQueue和发送者(生产者)线程被阻塞等待从该答复replyQueue 。 这就是为什么你的所有邮件将被串行处理。

要继续前进,我们需要知道使用单向该操作的原因ItemWriter生产商。

也许RabbitTemplate#convertAndSend就足够了吗?

UPDATE

也许你只是需要RabbitTemplate.ReturnCallback一起与convertAndSend来确定,如果你的消息已被传递或没有?

更新2

另一种认为,实现您的要求使用TaskExecutor来发送和接收并行。 这样,您不必等待了赖以第一消息发送第二:

final List<Object> replies = new ArrayList<Object>();
ExecutorService executor = Executors.newCachedThreadPool();
for (T item : items) {
     executor.execute(new Runnable() {

         public void run() {
            Object reply = template.convertSendAndReceive(item.toString());
            System.out.println("producer output: " + reply);
            replies.add(reply);
         }
     });
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

在这之后,你可以做所有的事情replies当前items



文章来源: How to send messages asynchronously to queue them up without waiting for reply of each message in spring amqp using rabbitmq in java?