我试图用簧AMQP使用RabbitMQ的,下面是我的配置。
<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:queue name="${rabbitmq.import.queue}" />
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" routing-key="${rabbitmq.import.queue}"/>
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" method="onMessage" />
</rabbit:listener-container>
这是简单的消息监听器类,
public class ImportMessageListener {
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
return message;
}
}
这是生产商(其是弹簧批次的itemWriter),
public class ImportItemWriter<T> implements ItemWriter<T> {
private AmqpTemplate template;
public AmqpTemplate getTemplate() {
return template;
}
public void setTemplate(AmqpTemplate template) {
this.template = template;
}
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
}
}
}
当我运行我的春天批处理作业,每封邮件被发送,并通过一个处理一个,我gettig以下响应
consumer output: 1
producer output: 1
consumer output: 2
producer output: 2
consumer output: 3
producer output: 3
consumer output: 4
producer output: 4
consumer output: 5
producer output: 5
它shoudl发送5个信息并预约和5个消费者线程(并发= 5)应同时处理他们,应尽快,因为它完成响应
So below should be the outout
consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5
producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5
我不想制片人等待第一条消息的队列第二消息的回复。
我试着用convertAndSend这使得异步(犯规等待答复),但我怎么在我的itemWriter就像我可以convertSendAndReceive得到回复消息?
如果我改变我的模板配置
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
<rabbit:reply-listener/>
</rabbit:template>
和,如果我使用template.convertAndSend(item.toString()); 那么怎样才能得到回复消息?
我不能我自己的消息处理程序附加到这个监听得到回复消息,我们可以在消费者端连接的方式。 对于回复,它需要默认RabbitmqTemplate处理程序。