它是使用JMS临时队列同步使用一个好的做法?(Is it a good practice to us

2019-07-29 14:42发布

如果我们使用JMS请求/应答使用“临时队列”,将这些代码是可伸缩的机制?

截至目前,我们不知道我们是否会支持每秒100个请求,或每秒的请求1000。

下面的代码是什么,我想实现的。 这使得在“同步”的方式使用JMS。 关键的部分是在“消费者”被创建指向了“临时队列”这是为这个会话创建。 我只是想不通使用这种临时队列是否是一个可扩展的设计。

  destination = session.createQueue("queue:///Q1");
  producer = session.createProducer(destination);
  tempDestination = session.createTemporaryQueue();
  consumer = session.createConsumer(tempDestination);

  long uniqueNumber = System.currentTimeMillis() % 1000;
  TextMessage message = session
      .createTextMessage("SimpleRequestor: Your lucky number today is " + uniqueNumber);

  // Set the JMSReplyTo
  message.setJMSReplyTo(tempDestination);

  // Start the connection
  connection.start();

  // And, send the request
  producer.send(message);
  System.out.println("Sent message:\n" + message);

  // Now, receive the reply
  Message receivedMessage = consumer.receive(15000); // in ms or 15 seconds
  System.out.println("\nReceived message:\n" + receivedMessage);

更新:

我遇到了另一种模式,请参阅该博客的想法是用于发送和接收“常规”队列。 然而,对于“同步”电话,以获得所需的响应(即符合要求),您可以创建一个监听使用“选择”的接收队列中的消费者。

脚步:

    // 1. Create Send and Receive Queue.
    // 2. Create a msg with a specific ID
 final String correlationId = UUID.randomUUID().toString();
 final TextMessage textMessage = session.createTextMessage( msg );
 textMessage.setJMSCorrelationID( correlationId );

    // 3. Start a consumer that receives using a 'Selector'.
           consumer = session.createConsumer( replyQueue, "JMSCorrelationID = '" + correlationId + "'" );

因此,在这种模式不同的是,我们不会为每个新的请求一个新的临时队列。 相反,所有的反应都只有一个队列,而是使用“选择”以确保每个请求线程接收的只有大约关心的响应。

我觉得这里的缺点是,你必须使用一个“选择”。 我还不知道这是不太优选或比前面提到的模式更好。 思考?

Answer 1:

关于在您的文章更新 - 选择是,如果对邮件头进行,就像你与相关ID做非常有效的。 Spring集成内部也做这行实现JMS出站网关 。



Answer 2:

有趣的是,这种可扩展性实际上可能是什么其他的反应所描述的相反。

WebSphere MQ的保存和可再利用的动态队列对象在哪里。 所以,虽然使用动态队列是不是免费的,但是它很好地扩展因为队列释放出来,所有WMQ需要做的是句柄传递给请求新的队列实例的下一个线程。 在繁忙的QMGR,动态队列的数量将保持相对静态的,而手柄也会从线程传递给线程。 严格来说这是不太一样快,再利用一个单一的队列,但它是不坏。

在另一方面,即使在索引CORRELID是速度快,性能是相反的索引的消息数量。 这也使得一个区别,如果队列深度开始建立。 当应用程序变为一个GETWAIT一个空队列中没有延迟。 但是,在一个深刻的队列中,QMGR必须搜索现有邮件的索引来确定答复消息不在其中。 在你的榜样,这与次每秒一大指标1000个搜索空指数之间的差异。

其结果是,1000个动态队列与一个消息中的每个实际上可以比单个队列1000级的线程获得通过更快CORRELID ,这取决于应用和负载的特性。 我会建议提交到一个特定的设计之前,大规模测试这个。



Answer 3:

在共享队列上使用相关ID选择器将规模非常好与多个消费者。

1000个请求/秒然而,将一个不少。 您可能要划分负载不同实例之间有点如果性能真可谓是一个问题。

你可能想阐述一下请求VS客户端编号。 如果客户端的数量<10,并会留下相当静态的,并且请求数字是非常高的,最有活力和快速的解决方案可能是为每个客户端的静态回复队列。



Answer 4:

创建临时队列是不是免费的。 毕竟这是在经纪人(一个或多个)分配资源。 话虽如此,如果你有一个未知的(手)之前可能未绑定的客户端数量(多个JVM,每个JVM多个并发线程,等等),你可能没有选择的余地。 每分配客户队列,并将其分配给客户端会失控快。

肯定是你勾勒是最简单可行的解决方案。 如果你能得到真正的号码,交易量和它扩展不够,罚款。

我想看看避免临时队列之前,我想看看更多的限制客户端的数量,使客户长期生活。 也就是说建立在客户端一个客户端库,并在池中的客户创建临时队列,会话,连接等在启动时,重用他们在随后的请求,并撕裂下来就关机。 然后调整问题成为最大/最小尺寸之一的池,空闲时间就是修剪池,什么行为是(失败VS块)时,池刷爆。 除非你创建任意大量的短暂的JVM(在这种情况下,你已经得到了更大的扩展问题刚刚从JVM启动开销),那应该扩展以及任何东西。 毕竟,在这一点上你分配的资源反映系统的实际使用情况。 真的是没有机会用比少。

事情,以避免被创建和销毁的队列,会话,连接等设计在服务器端允许从一开始走流的大量无偿的数量。 然后,如果/当你需要汇集。 像不,任何东西不平凡的,你将需要。



Answer 5:

使用临时队列将耗资创建relyToProducers每次每一次。 而不是使用一个静态replyToQueue缓存的生产者,该方法createProducer将更加昂贵,抗冲击性能在高并发调用环境。



Answer 6:

我一直都面临着同样的问题,并决定集中连接自己无状态Bean里面。 一个客户端连接具有一个tempQueue和JMSMessageExchanger对象内部规定(其含有connectionFactory的,队列和tempQueue),其结合一种bean的实例。 香港专业教育学院在JSE / EE环境中进行了测试。 但是我真的不知道有关Glassfish的JMS池的行为。 它确实能接近JMS连接,豆方法后,“手动”获得结束?我做错什么可怕的错误?

也伊夫在客户端豆(TransactionAttributeType.NOT_SUPPORTED)关闭事务立即发送请求消息,请求队列。

package net.sf.selibs.utils.amq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import lombok.Getter;
import lombok.Setter;
import net.sf.selibs.utils.misc.UHelper;

public class JMSMessageExchanger {

    @Setter
    @Getter
    protected long timeout = 60 * 1000;

    public JMSMessageExchanger(ConnectionFactory cf) {
        this.cf = cf;
    }

    public JMSMessageExchanger(ConnectionFactory cf, Queue queue) {
        this.cf = cf;
        this.queue = queue;
    }
    //work
    protected ConnectionFactory cf;
    protected Queue queue;
    protected TemporaryQueue tempQueue;
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected MessageConsumer consumer;
    //status
    protected boolean started = false;
    protected int mid = 0;

    public Message makeRequest(RequestProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
                this.tempQueue = this.session.createTemporaryQueue();
                this.consumer = this.session.createConsumer(tempQueue);
            }
            //send request
            Message requestM = producer.produce(this.session);
            mid++;
            requestM.setJMSCorrelationID(String.valueOf(mid));
            requestM.setJMSReplyTo(this.tempQueue);
            this.producer.send(this.queue, requestM);
            //get response
            while (true) {
                Message responseM = this.consumer.receive(this.timeout);
                if (responseM == null) {
                    return null;
                }
                int midResp = Integer.parseInt(responseM.getJMSCorrelationID());
                if (mid == midResp) {
                    return responseM;
                } else {
                    //just get other message
                }
            }

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    public void makeResponse(ResponseProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
            }
            Message response = producer.produce(this.session);
            response.setJMSCorrelationID(producer.getRequest().getJMSCorrelationID());
            this.producer.send(producer.getRequest().getJMSReplyTo(), response);

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    protected void init() throws Exception {
        this.connection = cf.createConnection();
        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        this.producer = this.session.createProducer(null);
        this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        this.connection.start();
        this.started = true;
    }

    public void close() {
        UHelper.close(producer);
        UHelper.close(consumer);
        UHelper.close(session);
        UHelper.close(connection);
        this.started = false;
    }

}

同一个类中的客户端(无状态Bean)和服务器(@MessageDriven)使用。 RequestProducer和ResponseProducer都是接口:

package net.sf.selibs.utils.amq;

import javax.jms.Message;
import javax.jms.Session;

public interface RequestProducer {
    Message produce(Session session) throws Exception;
}
package net.sf.selibs.utils.amq;

import javax.jms.Message;

public interface  ResponseProducer extends RequestProducer{
    void setRequest(Message request);
    Message getRequest();
}

此外I`ve读到过AMQ请求-响应执行AMQ文章: http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html



Answer 7:

也许我太晚了,但我花了一些时间在本周得到同步请求/应答中JMS工作。 怎么样用超时延长QueueRequester。 我没有和至少测试一台机器(正在运行的代理,请求者和replyer)上显示,该方案优于讨论的。 在另一边这取决于使用一个QueueConnection,这意味着你可能会被迫打开多个连接。



文章来源: Is it a good practice to use JMS Temporary Queue for synchronous use?