JMS consume multiple topics

2020-04-20 21:04发布

问题:

I am new to Java and working on a project that consumes multiple (different) topics and sends it to another server. I was wondering what the best way to handling multiple topics is.

From what I understand each consumer is tied to one topic, so if I had to consume multiple topics I would need one consumer for each different topic. Since a consumer makes a blocking call I would need to invoke a thread per consumer to consume these topics in parallel.

And if I wanted to improve throughput further is it a good practice to have one boss thread per consumer (attached to a topic) and allow each boss thread to setup worker threads to improve performance accordingly?

Please advice if this a good practice and if not what are the other alternative options? And are there any well known design patterns to handle this problem

Why I chose a consumer model instead of a listener model?

I have one more constraint which is, after the consumer receives a message it needs to send the message to another receiving server. If the receiving server is down (during a new version push) then I have to pause consuming messages until the receiving server is up. In that case having a message listener wouldn't help because I wouldn't be able to pause the listener when the receiving server is down. Am I right when I say this or is there a way to pause the listener and stop consuming messages until the receiving server is up?

回答1:

The way I would go about it is to use the listener feature.

Your object implements the MessageListener interface and then you add to the consumer your message listener. In this case the client library will handle the threading for you in reading the messages from the queue and despatching them to the listeners.

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MyMessageConsumer implements MessageListener {

    public static void main() {
        try {

            MyMessageConsumer myMessageConsumer = new MyMessageConsumer();

            // This example is using the ActiveMQ client library
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("nio://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Destination destination1 = session.createQueue("MyTopic1");
            MessageConsumer consumer1 = session.createConsumer(destination1);
            consumer1.setMessageListener(myMessageConsumer);

            Destination destination2 = session.createQueue("MyTopic2");
            MessageConsumer consumer2 = session.createConsumer(destination2);
            consumer2.setMessageListener(myMessageConsumer);

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
        // Handle my messages here
    }
}

Session Transactions

In this option, we use transacted messages and it will deliver the message if session.rollback() is called. You acknowledge() when your operation was successful or rollback() when it was not.

package io.bessel.test;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MyMessageConsumer implements MessageListener {

    public static void main(String ... arguments) {
        try {
            // This example is using the ActiveMQ client library
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("nio://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

            MyMessageConsumer myMessageConsumer = new MyMessageConsumer(session);

            Destination destination1 = session.createQueue("MyTopic1");
            MessageConsumer consumer1 = session.createConsumer(destination1);
            consumer1.setMessageListener(myMessageConsumer);

            Destination destination2 = session.createQueue("MyTopic2");
            MessageConsumer consumer2 = session.createConsumer(destination2);
            consumer2.setMessageListener(myMessageConsumer);

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }

    private final Session session;

    public MyMessageConsumer(Session session) {
        this.session = session;
    }

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                String text = ((TextMessage) message).getText();
                System.out.println(String.format("Received message: %s", text));
                this.session.rollback();

            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

}


标签: java consumer