spring boot configure multiple ActiveMQ instances

2019-02-05 01:02发布

问题:

I have requirement to move messages from queues on one ActiveMQ instance to another ActiveMQ instance. Is there a way to connect to two different ActiveMQ instances using spring boot configuration?

Do I need to create multiple connectionFactories? If so then how does JmsTemplate know which ActiveMQ instance to connect to?

  @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(JMS_BROKER_URL);
    }

Any help and code examples would be useful.

Thanks in advance. GM

回答1:

Additionally to the response of @Chris You have to create different BrokerService instances using differents ports and create different ConnectionFactory to connect to each broker and create different JmsTemplate using these different factories to send messages to differents brokers.

For example :

import javax.jms.ConnectionFactory;
import javax.jms.QueueConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class ActiveMQConfigurationForJmsCamelRouteConsumeAndForward {
    public static final String LOCAL_Q = "localQ";
    public static final String REMOTE_Q = "remoteQ";

    @Bean
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:5671");
        broker.setBrokerName("broker");
        broker.setUseJmx(false);
        return broker;
    }

    @Bean
    public BrokerService broker2() throws Exception {
        final BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:5672");
        broker.setBrokerName("broker2");
        broker.setUseJmx(false);
        return broker;
    }

    @Bean
    @Primary
    public ConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5671");
        return connectionFactory;
    }

    @Bean
    public QueueConnectionFactory jmsConnectionFactory2() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5672");
        return connectionFactory;
    }

    @Bean
    @Primary
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(jmsConnectionFactory());
        jmsTemplate.setDefaultDestinationName(LOCAL_Q);
        return jmsTemplate;
    }

    @Bean
    public JmsTemplate jmsTemplate2() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(jmsConnectionFactory2());
        jmsTemplate.setDefaultDestinationName(REMOTE_Q);
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory2(
            @Qualifier("jmsConnectionFactory2") ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

To move messages from one AMQ instance to another instance you can use JmsBridgeConnectors :

Note that by the example below you cannot have multiple consumers on the queue from which you want to forward the messages because Camel or JmsBridgeConnectors consume the message and forward it. If you want a only copy of the message to be forwarded you have some solutions : 1- Convert your queue to a topic, manage the messages for offline consumers by a durable subscriptions or retroactive consumers. 2- convert your queue to a composite queue and use DestinationsInterceptors to copy messages to another queue. 3- use NetworkConnector for Networkof brokers

@Bean
public BrokerService broker() throws Exception {
    final BrokerService broker = new BrokerService();
    broker.addConnector("tcp://localhost:5671");
    SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector();
    OutboundQueueBridge bridge = new OutboundQueueBridge();
    bridge.setLocalQueueName(LOCAL_Q);
    bridge.setOutboundQueueName(REMOTE_Q);
    OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge };
    simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE);
    simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges);
    simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory());
    simpleJmsQueueConnector.setOutboundQueueConnectionFactory(jmsConnectionFactory2());
    JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector };
    broker.setJmsBridgeConnectors(jmsConnectors);
    broker.setBrokerName("broker");
    broker.setUseJmx(false);
    return broker;
}

Or by using Camel like this below :

@Bean
public CamelContext camelContext() throws Exception {
    CamelContext context = new DefaultCamelContext();
    context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5671"));
    context.addComponent("outboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5672"));
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("inboundQueue:queue:" + LOCAL_Q).to("outboundQueue:queue:" + REMOTE_Q);
        }
    });
    context.start();
    return context;
}

your Producer must be like this to use differents JmsTemplates :

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer implements CommandLineRunner {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("jmsTemplate2")
    private JmsTemplate jmsTemplate2;

    @Override
    public void run(String... args) throws Exception {
        send("Sample message");
    }

    public void send(String msg) {
        this.jmsTemplate.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.LOCAL_Q, msg);
        this.jmsTemplate2.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, msg);
    }
}

and Consumer :

import javax.jms.Session;

import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, containerFactory = "jmsListenerContainerFactory2")
    public void receiveQueue(Session session, String text) {
        System.out.println(((ActiveMQSession) session).getConnection().getBrokerInfo());
        System.out.println(text);
    }
}


回答2:

You would need to instantiate multiple JmsTemplate instances as Beans in your application and then use a combination of @Qualifier and @Primary annotations to indicate which JmsTemplate instance should go where.

For example

@Bean("queue1")
@Primary
public JmsTemplate getQueue1(@Qualifier("connectionFactory1")ConnectionFactory factory...){
...
}

@Bean("queue2")
@Primary
public JmsTemplate getQueue2(@Qualifier("connectionFactory2")ConnectionFactory factory...){
...
}

...

@Autowired
@Qualifier("queue1")
private JmsTemplate queue1;
...

See here for more info.