我们怎样才能使春天AMQP RabbitMQ的制片人发送的所有邮件后等待,在收到全部释放?(How

2019-10-28 08:51发布

我排队的所有消息的RabbitMQ队列和处理这些远程服务器上。 下面是同一类我的制片人,并回复处理程序。

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T>,
        MessageListener {

    protected String exchange;
    protected String routingKey;
    protected String queue;
    protected String replyQueue;
    protected RabbitTemplate template;

    // Reply handler 
    @Override
    public void onMessage(Message message) {

        try {
            String corrId = new String(message.getMessageProperties()
                    .getCorrelationId(), "UTF-8");
            System.out.println("received " + corrId + " from " + this.replyQueue);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //Producer
    @Override
    public void write(List<? extends T> items) throws Exception {

        for (T item : items) {
            System.out.println(item);

            System.out.println("Queing " + item + " to " + this.queue);

            Message message = MessageBuilder
                    .withBody(item.toString().getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setReplyTo(this.replyQueue)
                    .setCorrelationId(item.toString().getBytes()).build();

            template.send(this.exchange, this.routingKey, message);

            System.out.println("Queued " + item + " to " + this.queue);

        }

        // It should wait here untill we get all replies in onMessage, How can we do this ?

    }

我把所有消息都在写方法和取得的onMessage答复。 这是正常工作,但写犯规等待响应,它返回到调用者和弹簧批步骤标记为已完成。

但我想的过程中发送所有消息,直到我们得到的所有回复中的onMessage后等待答复。 我们应该怎么做 ?

Answer 1:

您可以使用任意数量的同步技术; 例如具有收听put的答复中LinkedBlockingQueue和具有发送者take (或poll与超时)从队列中,直到接收到所有的答复。

或者说,根本不使用监听器,只需使用相同的RabbitTemplatereceive()直到收到所有答复从应答队列。

然而, receive()返回null ,如果队列是空的,所以你必须睡觉之间发送和接收,避免纺CPU。



文章来源: How can we make producer in spring amqp rabbitmq waiting after sending all messages and release upon receiving all?