Weblogic JMS消息监听器的并发处理

8

我在进行JMS测试用例时发现处理是顺序的。当我向一个使用JMS发送消息的Servlet发送200个请求,并且接收器(messageListner)依次接收请求时,如何接收并发请求?我们有哪些参数需要设置?我阅读了JMS教程和API,发现在同一会话中消息是按顺序传递的,即使我为每个发送请求创建一个新会话并在接收端使用10个会话仍然是顺序处理。

public class ProducerServlet extends javax.servlet.http.HttpServlet implements
    javax.servlet.Servlet {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

TestJMSListener jms = new TestJMSListener();
ConnectionFactory connectionFactory = null;
Queue dest1 = null;
Topic dest =null;
Connection connection = null;
MessageProducer producer = null;

protected void doGet(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException {
        try {
            connection = connectionFactory.createConnection();              

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(dest1);
        TextMessage message = session.createTextMessage();

        message.setText("This is message from JMSSECOND DEMO "
                + request.getParameter("Num"));
        System.out.println("Sending message: " + message.getText());
        producer.send(message);
        producer.send(session.createMessage());
    } catch (Exception e) {
        System.out.println("Exception occurred: " + e.toString());
    }

}

@Override
public void init(ServletConfig arg0) throws ServletException {      
    Context jndiContext = null;
    try {

        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());            
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        e.printStackTrace();            
    }

}

}

一个 Listner 的实现,在接收到消息后,我会睡眠(进行一些操作一秒钟)。

public class TestJMSListener implements MessageListener {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

public TestJMSListener() {

    System.out.println("********* Consumer check **********");

    Context jndiContext = null;
    ConnectionFactory connectionFactory = null;
    Connection connection[] = null;
    Session session[] = null;
    Queue dest1 = null;
    Topic dest = null;
    MessageConsumer consumer[] = null;
    // TextMessage message = null;

    try {
        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());
        System.exit(1);
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        System.exit(1);
    }
    connection = new Connection[10];
    session = new Session[10];
    consumer = new MessageConsumer[10];
    for (int i = 0; i < 10; i++) {
        try {

            connection[i] = connectionFactory.createConnection();
            session[i] = connection[i].createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            consumer[i] = session[i].createConsumer(dest);
            consumer[i].setMessageListener(this);
            connection[i].start();
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
        }
    }
}

@Override
public void onMessage(Message m) {

    if (m instanceof TextMessage) {
        TextMessage message = (TextMessage) m;
        try {
            System.out.println("Reading message from Listener: "
                    + new Date() + message.getText());
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

 }

我使用的是Weblogic 11g,使用默认的ConnectionFactory和Queue配置。当我使用Topic时,它实际上只能每秒传递一条消息(即第一条消息完成后),而对于Queue,它每秒钟传递2到3条消息。如何使我的监听器支持并发处理。

最终解决方案

增加更多的监听器对象,而不是在监听器中使用多个会话/消费者,就可以解决这个问题。以下是修改后的代码。

public class ProducerServlet extends javax.servlet.http.HttpServlet implements
    javax.servlet.Servlet {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";
TestJMSListener listeners[] = new TestJMSListener[20];
ConnectionFactory connectionFactory = null;
Queue dest1 = null;
Topic dest =null;
Connection connection = null;
MessageProducer producer = null;

protected void doGet(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException {
        try {
            connection = connectionFactory.createConnection();              

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(dest1);
        TextMessage message = session.createTextMessage();

        message.setText("This is message from JMSSECOND DEMO "
                + request.getParameter("Num"));
        System.out.println("Sending message: " + message.getText());
        producer.send(message);
        producer.send(session.createMessage());
    } catch (Exception e) {
        System.out.println("Exception occurred: " + e.toString());
    }

}

@Override
public void init(ServletConfig arg0) throws ServletException {      
    Context jndiContext = null;
    try {

        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());            
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
        for(int i=0;i<listeners.length;i++ ){
        listeners[i]=new TestJMSListener(Integer.toString(i+1));    
        }

    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        e.printStackTrace();            
    }

}

}


public class TestJMSListener implements MessageListener {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

public String listnerNum = "";
public TestJMSListener(String listerNo) {
    super();
    System.out.println("********* Consumer check **********");
    listnerNum = listerNo;
    Context jndiContext = null;
    ConnectionFactory connectionFactory = null;
    Connection connection = null;
    Session session = null;
    Queue dest1 = null;
    Topic dest = null;
    MessageConsumer consumer = null;
    // TextMessage message = null;

    try {
        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());
        System.exit(1);
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        System.exit(1);
    }
    try{
            connection = connectionFactory.createConnection();
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            consumer = session.createConsumer(dest1);
            consumer.setMessageListener(this);
            connection.start();
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
        }


}

@Override
public void onMessage(Message m) {

    if (m instanceof TextMessage) {
        TextMessage message = (TextMessage) m;
        try {
            System.out.println("Reading message from Listener: "+listnerNum+ " : "
                    + new Date() + message.getText());
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

}

1
可能是重复的问题:java多线程JMS队列接收器 - Robin Green
如果不是注册监听器,而是使用MessageConsumer#receive手动触发10个线程来消费消息,那么消息消费会如何表现? - ewernli
Robin Green:附上的答案不清楚,能否请您澄清一下。我创建了多个会话,但处理仍然是顺序的。 - learner
3个回答

3
我看了你的解决方案,意识到你有几个并发会话、消费者等,但只使用一个队列来处理所有这些。队列是一个队列,在管道中所有东西都按顺序排队。如果您恰好只有其中一个,则在该点上只有一个执行线程,并且所有内容都是顺序进行的,因为队列不允许并发发生。
如果您在不同的线程中实现几个队列,则您的计算机能够同时处理多个调用。几个队列可能意味着使用不同的队列名称等,但是对于这种麻烦,您可以使用负载均衡器解决方案(例如Apache Camel)来为您实际选择队列。至少这个关闭的帖子让我明白了这种队列和线程的组合是可行的。
然后,负载均衡器为每个请求选择单独的队列,每个队列都会进行自己的顺序工作以处理请求。并发会话的数量则取决于配置。

2
复杂的解决方案。简单的解决方案是添加更多的监听器对象。当我添加了20个监听器时,它们可以同时快速处理。 - learner

3
在你的代码中,你只有一个监听器实例(在Servlet实例创建时创建),因此你将仅按顺序接收到消息,而不管你有多少个发送者会话。这只是队列。
如果你想要同时接收,那么你可能需要多个监听器,每次只有一个消息会被传递到任何一个监听器。
如果你想要并发处理消息,在按顺序传递后创建线程池并将进程委托给单独的线程,然后返回到监听模式。
注意**在此模式下,你可能无法正确处理Ack模式,因为你正在执行消息处理时进行Ack。

0

我对WebLogic JMS不是专家,但你的代码看起来没问题(除了你创建的多个连接,这是不必要的,多个会话应该足够。我甚至认为多个连接可能会产生负面影响,因为它们会消耗线程),它应该可以并发消费。由于你说使用队列,每秒接收2-3条消息,实际上你确实同时得到它们(因为每个监听器都睡眠一秒)。

既然你说每秒得到2-3条,我想你实际上得到4-6条,因为每秒第二条消息没有被打印出来(因为它不是TextMessage),因为生产者发送了一个TextMessage('producer.send(message)')和一个空消息('producer.send(session.createMessage())')。除此之外,我会检查ConnectionFactpry的服务器配置。我记得需要为MDB配置WorkManager线程,但不确定“手动”JMS客户端是否也需要。

所以我会这样处理:

  • 移除重复发送 ('producer.send(session.createMessage())')
  • 为生产者使用一个连接(但保留新会话)(而不是在每个请求上重新创建)
  • 在客户端上使用一个连接(但多个会话)
  • 正确关闭生产者会话(可能会消耗线程)
  • 检查已经生产的足够的消息数量
  • 检查管理限制

希望这有所帮助。

敬礼,

Messi


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接