I am able to write a java program using Rabbit Java API's doing the following:
Client sends message over Rabbit MQ exchange/queue with correlation Id (Say UUID - "348a07f5-8342-45ed-b40b-d44bfd9c4dde").
Server receives the message.
Server sends response message over Rabbit MQ exchange/queue with the same correlation Id - "348a07f5-8342-45ed-b40b-d44bfd9c4dde".
Client received the correlated message only in the same thread as 1.
Below is the Send.java and Recv.java using Rabbit APIs. I need help to convert this sample to use Spring AMQP integration especially receiving part on step 4. I am looking for something like receive method which can filter message using correlation Id.
Send.java:
import java.util.UUID;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Send {
private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
String message = "Hello World!";
String cslTransactionId = UUID.randomUUID().toString();
BasicProperties properties = (new BasicProperties.Builder())
.correlationId(cslTransactionId)
.replyTo(RESPONSE_QUEUE).build();
channel.basicPublish("", REQUEST_QUEUE, properties, message.getBytes());
System.out.println("Client Sent '" + message + "'");
Channel responseChannel = connection.createChannel();
responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
responseChannel.basicConsume(RESPONSE_QUEUE, false, consumer);
String correlationId = null;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String responseMessage = new String(delivery.getBody());
correlationId = delivery.getProperties().getCorrelationId();
System.out.println("Correlation Id:" + correlationId);
if (correlationId.equals(cslTransactionId)) {
System.out.println("Client Received '" + responseMessage + "'");
responseChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
break;
}
}
channel.close();
connection.close();
}
}
Recv.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Recv {
private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(REQUEST_QUEUE, true, consumer);
String correlationId = null;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
correlationId = delivery.getProperties().getCorrelationId();
System.out.println("Correlation Id:" + correlationId);
System.out.println("Server Received '" + message + "'");
if (correlationId != null)
break;
}
String responseMsg = "Response Message";
Channel responseChannel = connection.createChannel();
responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
BasicProperties properties = (new BasicProperties.Builder())
.correlationId(correlationId).build();
channel.basicPublish("", RESPONSE_QUEUE, properties,responseMsg.getBytes());
System.out.println("Server Sent '" + responseMsg + "'");
channel.close();
connection.close();
}
}
After running the Java configuration provided by gary, I am trying to move the configuration to XML format for server side adding listener. Below is the XML configuration:
server.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean
id="serviceListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queues" ref="requestQueue"/>
<property name="messageListener" ref="messageListenerAdaptor"/>
</bean>
<bean id="messageListenerAdaptor"
class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="pojoListener" />
</bean>
<bean
id="pojoListener"
class="PojoListener"/>
<bean
id="replyListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queues" ref="replyQueue"/>
<property name="messageListener" ref="fixedReplyQRabbitTemplate"/>
</bean>
<!-- Infrastructure -->
<rabbit:connection-factory
id="connectionFactory"
host="localhost"
username="guest"
password="guest"
cache-mode="CHANNEL"
channel-cache-size="5"/>
<rabbit:template
id="fixedReplyQRabbitTemplate"
connection-factory="connectionFactory"
exchange="fdms.exchange"
routing-key="response.key"
reply-queue="RESPONSE.QUEUE">
<rabbit:reply-listener/>
</rabbit:template>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="requestQueue" name="REQUEST.QUEUE" />
<rabbit:queue id="replyQueue" name="RESPONSE.QUEUE" />
<rabbit:direct-exchange name="fdms.exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="RESPONSE.QUEUE" key="response.key" />
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
SpringReceive.java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringReceive {
/**
* @param args
*/
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("cslclient.xml");
SimpleMessageListenerContainer serviceListenerContainer = context.getBean("serviceListenerContainer", SimpleMessageListenerContainer.class);
serviceListenerContainer.start();
}
}
You can use
RabbitTemplate.sendAndReceive()
(orconvertSendAndReceive()
) with a reply listener container (Docs here); the template will take care of the correlation for you.If you are using Spring Integration, use an outbound gateway with an appropriately configured rabbit template.