Implement Request-Reply pattern using ActiveMQ, Ca

2019-04-07 19:44发布

问题:

I am trying to implement the following functionality:

Read CSV file line-by-line then for every line:

  1. Build request based on the values that the line contains
  2. Send the request to the message queue
  3. Other component needs to pick up the message, process the request and send the response to another message queue (known to producer, so the producer could pick up the response).

I believe that the request-reply pattern fits the bill. I installed ActiveMQ, downloaded camel and tried to use their jms project.

After configuring the component, the queue and testing connection (worked), I tried to figure out how actually to implement request-reply? I failed to find any good examples

I have a RouteBuilder

The RouteBuilder

public class MyRouteBuilder extends RouteBuilder {
    public static void main(String[] args) throws Exception {
        new Main().run(args);
    }

    public void configure() {
        from("file:src/data?noop=true")
        .to("activemq:RequestQ");

        from("activemq:RequestQ?exchangePattern=InOut&timeToLive=5000") 
        .inOut("activemq:RequestQ", "bean:myBean?method=someMethod"); 
    }
}

camel-context.xml

<beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="
         http://www.springframework.org/schema/beans 
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://camel.apache.org/schema/spring
         http://camel.apache.org/schema/spring/camel-spring.xsd">

    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <package>org.apache.camel.example.spring</package>
    </camelContext>

    <bean id="jmsConnectionFactory" 
        class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory" 
        class="org.apache.activemq.pool.PooledConnectionFactory" 
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" 
        class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" 
        class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

    <bean id="myBean" class="org.apache.camel.example.spring.MyBean"/>
</beans>

Questions:

  1. How can I read a file line-by-line construct and post a message based on line content?
  2. How to configure the route and how to configure the message header in order to get response in temporary queue that will be deleted after the response was picked up?
  3. What quick-start guides for the above can you recommend?

EDIT

I got the code below working. Now lets say that in the Processor I create the response. How can I send it back? How can I consume the response?

public class MyRouteBuilder extends RouteBuilder {

    public static void main(String[] args) throws Exception {
        new Main().run(args);
    }

    public void configure() {
        from("file:/Users/aviad/ws/integ/src/data?fileName=lines.txt&noop=true&idempotent=true")
        .split()
        .tokenize("\\n")
        .inOut("activemq:req");

        from("activemq:req")
        .process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                System.out.println(exchange.getIn().getBody(String.class));
                System.out.println("jmscorrelationid=" + exchange.getIn().getHeader("jmscorrelationid"));
                System.out.println("jmsdestination=" + exchange.getIn().getHeader("jmsdestination"));
            }
        });
    }
}

回答1:

I just had something similar around, so I alterd it and here it is. Note that the 2nd route does not need to be explicitly aware of a request/reply message, only the producer needs to know that. The 2nd route will reply if there is a reply to destination set (which is handled automagically by camel).

I don't know of any good example, but this doc page is really comprehensive with small examples.

 <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <route>
          <from uri="file://c:/apps/in"/>
          <split>
            <tokenize token="\n"/>
            <to uri="activemq:req" pattern="InOut"/>
            <to uri="stream:out"/><!-- print Hello to console -->
          </split>
        </route>
        <route>
          <from uri="activemq:req"/>
            <transform>
              <simple>Hello ${in.body}</simple>
            </transform>
        </route>
    </camelContext>