Spring integration with Rabbit AMQP for “Client Se

2019-02-20 12:57发布

问题:

I am able to write a java program using Rabbit Java API's doing the following:

  1. Client sends message over Rabbit MQ exchange/queue with correlation Id (Say UUID - "348a07f5-8342-45ed-b40b-d44bfd9c4dde").

  2. Server receives the message.

  3. Server sends response message over Rabbit MQ exchange/queue with the same correlation Id - "348a07f5-8342-45ed-b40b-d44bfd9c4dde".

  4. Client received the correlated message only in the same thread as 1.

Below is the Send.java and Recv.java using Rabbit APIs. I need help to convert this sample to use Spring AMQP integration especially receiving part on step 4. I am looking for something like receive method which can filter message using correlation Id.

Send.java:

import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Send {

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        String message = "Hello World!";
        String cslTransactionId = UUID.randomUUID().toString();
        BasicProperties properties = (new BasicProperties.Builder())
            .correlationId(cslTransactionId)
            .replyTo(RESPONSE_QUEUE).build();

        channel.basicPublish("", REQUEST_QUEUE, properties, message.getBytes());

        System.out.println("Client Sent '" + message + "'");


        Channel responseChannel = connection.createChannel();
        responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        responseChannel.basicConsume(RESPONSE_QUEUE, false, consumer);
        String correlationId = null;
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String responseMessage = new String(delivery.getBody());
            correlationId = delivery.getProperties().getCorrelationId();
            System.out.println("Correlation Id:" + correlationId);
            if (correlationId.equals(cslTransactionId)) {
                    System.out.println("Client Received '" + responseMessage + "'");
                responseChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                break;
            }
        }

        channel.close();
        connection.close();
    }
}

Recv.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv {

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(REQUEST_QUEUE, true, consumer);
        String correlationId = null;
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            correlationId = delivery.getProperties().getCorrelationId();
            System.out.println("Correlation Id:" + correlationId);
            System.out.println("Server Received '" + message + "'");
            if (correlationId != null)
                break;
            }

            String responseMsg = "Response Message";
            Channel responseChannel = connection.createChannel();
            responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
            BasicProperties properties = (new BasicProperties.Builder())
            .correlationId(correlationId).build();

            channel.basicPublish("", RESPONSE_QUEUE, properties,responseMsg.getBytes());

            System.out.println("Server Sent '" + responseMsg + "'");

            channel.close();
            connection.close();
       }
}

After running the Java configuration provided by gary, I am trying to move the configuration to XML format for server side adding listener. Below is the XML configuration:

server.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean 
        id="serviceListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="queues" ref="requestQueue"/>
            <property name="messageListener" ref="messageListenerAdaptor"/>
    </bean>

    <bean id="messageListenerAdaptor"
        class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="pojoListener" />
    </bean>

    <bean 
        id="pojoListener"
        class="PojoListener"/>

    <bean
        id="replyListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="queues" ref="replyQueue"/>
        <property name="messageListener" ref="fixedReplyQRabbitTemplate"/>
    </bean>

    <!-- Infrastructure -->
    <rabbit:connection-factory 
        id="connectionFactory" 
        host="localhost" 
        username="guest" 
        password="guest" 
        cache-mode="CHANNEL" 
        channel-cache-size="5"/>

    <rabbit:template 
        id="fixedReplyQRabbitTemplate" 
        connection-factory="connectionFactory"
        exchange="fdms.exchange"
        routing-key="response.key"
        reply-queue="RESPONSE.QUEUE">
        <rabbit:reply-listener/>
    </rabbit:template>

    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue id="requestQueue" name="REQUEST.QUEUE" />
    <rabbit:queue id="replyQueue" name="RESPONSE.QUEUE" />

    <rabbit:direct-exchange name="fdms.exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="RESPONSE.QUEUE" key="response.key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>

SpringReceive.java

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.support.ClassPathXmlApplicationContext;


public class SpringReceive {

/**
 * @param args
 */
public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("cslclient.xml");
    SimpleMessageListenerContainer serviceListenerContainer =     context.getBean("serviceListenerContainer", SimpleMessageListenerContainer.class);
    serviceListenerContainer.start();
    }
}

回答1:

You can use RabbitTemplate.sendAndReceive() (or convertSendAndReceive()) with a reply listener container (Docs here); the template will take care of the correlation for you.

If you are using Spring Integration, use an outbound gateway with an appropriately configured rabbit template.