我排队的所有消息的RabbitMQ队列和处理这些远程服务器上。 下面是同一类我的制片人,并回复处理程序。
public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T>,
MessageListener {
protected String exchange;
protected String routingKey;
protected String queue;
protected String replyQueue;
protected RabbitTemplate template;
// Reply handler
@Override
public void onMessage(Message message) {
try {
String corrId = new String(message.getMessageProperties()
.getCorrelationId(), "UTF-8");
System.out.println("received " + corrId + " from " + this.replyQueue);
} catch (IOException e) {
e.printStackTrace();
}
}
//Producer
@Override
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
System.out.println(item);
System.out.println("Queing " + item + " to " + this.queue);
Message message = MessageBuilder
.withBody(item.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setReplyTo(this.replyQueue)
.setCorrelationId(item.toString().getBytes()).build();
template.send(this.exchange, this.routingKey, message);
System.out.println("Queued " + item + " to " + this.queue);
}
// It should wait here untill we get all replies in onMessage, How can we do this ?
}
我把所有消息都在写方法和取得的onMessage答复。 这是正常工作,但写犯规等待响应,它返回到调用者和弹簧批步骤标记为已完成。
但我想的过程中发送所有消息,直到我们得到的所有回复中的onMessage后等待答复。 我们应该怎么做 ?