使用JMS临时队列进行同步使用是一种好的实践吗?

24

如果我们使用“临时队列”来实现JMS请求/响应机制,这段代码是否可扩展?

目前为止,我们不知道是否需要支持每秒100个请求还是每秒数千个请求。

我正在考虑实现下面的代码。它以同步方式使用JMS。关键部分在于创建“消费者”以指向为此会话创建的“临时队列”。我只是无法确定使用这种临时队列是否是一种可伸缩的设计。

  destination = session.createQueue("queue:///Q1");
  producer = session.createProducer(destination);
  tempDestination = session.createTemporaryQueue();
  consumer = session.createConsumer(tempDestination);

  long uniqueNumber = System.currentTimeMillis() % 1000;
  TextMessage message = session
      .createTextMessage("SimpleRequestor: Your lucky number today is " + uniqueNumber);

  // Set the JMSReplyTo
  message.setJMSReplyTo(tempDestination);

  // Start the connection
  connection.start();

  // And, send the request
  producer.send(message);
  System.out.println("Sent message:\n" + message);

  // Now, receive the reply
  Message receivedMessage = consumer.receive(15000); // in ms or 15 seconds
  System.out.println("\nReceived message:\n" + receivedMessage);

更新:

我发现了另一种模式,请查看这篇博客。 这个想法是使用“普通”的队列来发送和接收消息。但是,对于“同步”调用,为了获得所需的响应(即匹配请求),您需要创建一个消费者,该消费者使用“选择器”侦听接收队列。

步骤:

    // 1. Create Send and Receive Queue.
    // 2. Create a msg with a specific ID
 final String correlationId = UUID.randomUUID().toString();
 final TextMessage textMessage = session.createTextMessage( msg );
 textMessage.setJMSCorrelationID( correlationId );

    // 3. Start a consumer that receives using a 'Selector'.
           consumer = session.createConsumer( replyQueue, "JMSCorrelationID = '" + correlationId + "'" );

因此,这种模式的区别在于我们不会为每个新请求创建一个新的临时队列。相反,所有响应都会发送到同一个队列,但使用“选择器”来确保每个请求线程只接收到它关心的响应。

我认为这里的缺点是你必须使用一个“选择器”。我还不知道这是比之前提到的模式更受欢迎还是不受欢迎。你有什么想法吗?


这个机制是我之前提出的一个问题中提出的:https://dev59.com/amgv5IYBdhLWcg3wMN-0我创建了这个新问题,只是想知道这是否是可扩展的设计。 - rk2010
你好,请问你最终决定使用哪种方法?在这两种方法中,tempq/selector 中的任何一种方法是否比另一种具有显著的性能/可伸缩性优势?谢谢。 - Borka
7个回答

6
关于您帖子中的更新 - 如果在消息头上执行选择器(如您使用相关ID所做的那样),则选择器非常高效。Spring Integration也在实现JMS出站网关时内部执行此操作。

JMSOutboundGateway是一个很棒的资源。感谢提供链接。 - rk2010
只是为了明确,这段代码最终会使用消息头,对吗?jmsRequest.setJMSCorrelationID(correlationId); messageSelector = "JMSCorrelationID = '" + correlationId + "'"; - rk2010
你有没有想过在什么情况下使用“临时队列”模式是一个好主意?我想知道为什么Websphere MQ的示例代码提供了这种模式。 - rk2010
实际上,我看到过一种组合被使用@rk2010 - 基本上,如果你的调用客户很少,那么你可以为每个客户创建一个临时响应队列,然后通过关联ID选择。这将分割响应队列并比为所有客户提供一个响应队列更快。但是,如果有很多客户而且你无法区分一个客户和另一个客户,则一个响应队列更好。然而,对于每个请求使用1个临时队列的方法是不可取的。 - Biju Kunjummen

4
有趣的是,此方案的可扩展性实际上可能与其他回答所描述的相反。
WebSphere MQ 保存和重复使用动态队列对象。因此,尽管使用动态队列不是免费的,但它确实具有良好的可扩展性,因为当队列被释放时,WMQ 只需要将句柄传递给请求新队列实例的下一个线程即可。在繁忙的 QMgr 中,动态队列的数量将保持相对静态,而句柄会从一个线程传递到另一个线程。严格来说,它不如重用单个队列快,但也不错。
另一方面,虽然 CORRELID 上的索引速度很快,但性能与索引中消息数量成反比。如果队列深度开始增加,则情况也有所不同。当应用程序对空队列进行带有 WAIT 的 GET 操作时,没有延迟。但是在深度队列上,QMgr 必须搜索现有消息的索引,以确定回复消息不在其中。在您的示例中,这就是每秒搜索空索引与大型索引之间差别的1,000次之多。
结果是,具有每个消息的1000个动态队列实际上可能比通过 CORRELID 获取的1000个线程的单个队列更快,具体取决于应用程序和负载的特性。我建议在进行特定设计之前进行规模测试。

2
在共享队列上使用关联ID选择器将与多个消费者的规模扩展非常好。
然而,每秒1000个请求是很多的。如果性能成为问题,您可能希望在不同实例之间稍微分配一下负载。
您可能需要详细说明请求与客户端数量之间的关系。如果客户端数量小于10且保持相对静态,并且请求数量非常高,则最具弹性和快速的解决方案可能是为每个客户端设置静态回复队列。

谢谢。是的,我们可能最终会有多个应用程序实例在运行。因此,每个节点可能不会每秒执行1000次。 - rk2010

1

创建临时队列并不是免费的。毕竟它会在代理服务器上分配资源。话虽如此,如果您有一个未知(事先)可能无限数量的客户端(多个JVM,每个JVM多个并发线程等),您可能别无选择。预分配客户端队列并将其分配给客户端会很快失控。

当然,您所描绘的是最简单的解决方案。如果您可以获得交易量的实际数字,并且它足够扩展,那就没问题。

在考虑避免临时队列之前,我会更多地考虑限制客户端数量并使客户端长期存在。也就是说,在客户端上创建一个客户端池,并让池中的客户端在启动时创建临时队列、会话、连接等,在后续请求中重用它们,并在关闭时将其关闭。然后,调整问题变成了池的最大/最小大小、修剪池的空闲时间以及池达到最大值时的行为(失败 vs 阻塞)。除非您正在创建任意数量的瞬态 JVM(在这种情况下,您从 JVM 启动开销方面面临更大的扩展问题),否则这应该与任何东西一样具有可伸缩性。毕竟,在那个点上,您分配的资源反映了系统的实际使用情况。真的没有机会使用少于这个。
要避免的是创建和销毁大量不必要的队列、会话、连接等。从一开始就设计服务器端以允许流式传输。然后在需要时进行池化。对于任何非微不足道的事情,你都需要这样做。

我认为临时队列必须与它创建的会话“绑定”: session.createTemporaryQueue();而且,由于会话无法连接到池中,所以我不知道是否可以使用会话池。 - rk2010
请务必查看我在原帖中的更新。我添加了一个新的模式。 - rk2010

0

我一直面临着同样的问题,决定在无状态bean内自己池化连接。一个客户端连接有一个tempQueue,并位于JMSMessageExchanger对象中(其中包含connectionFactory、Queue和tempQueue),该对象绑定到一个bean实例。我已在JSE/EE环境中进行了测试。但我不确定Glassfish JMS池行为是否会真正关闭“手动”获取的JMS连接,当bean方法结束时?我做错了什么吗?

此外,我在客户端bean中关闭了事务(TransactionAttributeType.NOT_SUPPORTED),以立即将请求消息发送到请求队列。

package net.sf.selibs.utils.amq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import lombok.Getter;
import lombok.Setter;
import net.sf.selibs.utils.misc.UHelper;

public class JMSMessageExchanger {

    @Setter
    @Getter
    protected long timeout = 60 * 1000;

    public JMSMessageExchanger(ConnectionFactory cf) {
        this.cf = cf;
    }

    public JMSMessageExchanger(ConnectionFactory cf, Queue queue) {
        this.cf = cf;
        this.queue = queue;
    }
    //work
    protected ConnectionFactory cf;
    protected Queue queue;
    protected TemporaryQueue tempQueue;
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected MessageConsumer consumer;
    //status
    protected boolean started = false;
    protected int mid = 0;

    public Message makeRequest(RequestProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
                this.tempQueue = this.session.createTemporaryQueue();
                this.consumer = this.session.createConsumer(tempQueue);
            }
            //send request
            Message requestM = producer.produce(this.session);
            mid++;
            requestM.setJMSCorrelationID(String.valueOf(mid));
            requestM.setJMSReplyTo(this.tempQueue);
            this.producer.send(this.queue, requestM);
            //get response
            while (true) {
                Message responseM = this.consumer.receive(this.timeout);
                if (responseM == null) {
                    return null;
                }
                int midResp = Integer.parseInt(responseM.getJMSCorrelationID());
                if (mid == midResp) {
                    return responseM;
                } else {
                    //just get other message
                }
            }

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    public void makeResponse(ResponseProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
            }
            Message response = producer.produce(this.session);
            response.setJMSCorrelationID(producer.getRequest().getJMSCorrelationID());
            this.producer.send(producer.getRequest().getJMSReplyTo(), response);

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    protected void init() throws Exception {
        this.connection = cf.createConnection();
        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        this.producer = this.session.createProducer(null);
        this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        this.connection.start();
        this.started = true;
    }

    public void close() {
        UHelper.close(producer);
        UHelper.close(consumer);
        UHelper.close(session);
        UHelper.close(connection);
        this.started = false;
    }

}

同一个类被用于客户端(无状态bean)和服务器端(@MessageDriven)。 RequestProducer和ResponseProducer是接口:

package net.sf.selibs.utils.amq;

import javax.jms.Message;
import javax.jms.Session;

public interface RequestProducer {
    Message produce(Session session) throws Exception;
}
package net.sf.selibs.utils.amq;

import javax.jms.Message;

public interface  ResponseProducer extends RequestProducer{
    void setRequest(Message request);
    Message getRequest();
}

我也阅读了有关在AMQ上实现请求-响应的文章: http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html


0
使用临时队列会导致每次都要创建 relyToProducers,这样成本很高。而使用缓存的生产者来处理静态 replyToQueue,方法 createProducer 在高并发调用环境中将更加昂贵并影响性能。

-1
也许我来晚了,但是这个星期我花了几个小时的时间在 JMS 中实现同步请求/回复。拓展 QueueRequester 以添加超时参数怎么样?我尝试过了,至少在一个单独的机器上(运行中的代理、请求方和回复方),测试结果表明这种解决方案要比讨论过的方案表现更佳。不过这取决于是否使用 QueueConnection,这意味着你可能需要强制开启多个连接。

QueueRequester,查看API,只是针对每个请求创建一个临时队列。 - Steve11235

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接