使用严格顺序进行并发消息处理

10
在我的JavaEE Web应用程序中,我需要按照到达顺序严格处理传入的消息。我假设我的Web应用程序容器(Tomcat 6)保留了消息到达HTTP端口的顺序。
让我头疼的是,我内部处理这些消息的方式。为了改善工作量,我将每个消息的处理附加到线程池中,因为需要执行许多操作,例如XML解析,有时使用外部Web服务来增强数据。处理完成后,我将消息的Java表示推送到一个复杂的流处理引擎esper.codehaus.org中,该引擎是线程安全的。在这里,检查不同的模式,其中入口顺序是最高要求,例如现象的阈值超过。
我曾想过将每个已处理的消息插入到PriorityQueue中,并使用它们在Servlet中接收到的优先级ID(对于每个消息递增)进行排序。问题在于:
轮询队列元素(最低ID是队列头部)以将其插入esper的线程可能会跳过一个ID,因为它不会检查缺失的项目。我猜一个例子能更好地说明:

enter image description here

对于步骤(1)至(4),一切都按预期进行。但是在步骤(5)中,QueuePoller检索到元素6而不是元素4(后者在步骤(6)中插入)。这导致消息顺序变为:2;3;6;4。
我尝试的是修改轮询队列头部的实现方式,以遵循ID的严格顺序。也就是说,如果下一个ID的元素还没有插入到队列中,则在屏障处等待直到其到达。这似乎在前10分钟内有效,但后来挂起,可能是由于某个元素从未插入到队列中。
是否有人曾经遇到过类似的问题,并对我有一些提示?

你最好在队列项到达服务器时立即创建它们,然后只允许按顺序从队列中取出,并且一旦处理完成就可以了。 - Nick Wilson
我不理解的是,如果输出队列中元素的顺序必须与输入队列中的顺序匹配,那么无序处理它们有什么好处呢?也就是说,如果QueuePoller需要在6之前先处理4,那么为什么要先处理6呢? - aib
我也考虑过这个方案,不过我想它可能会导致相似的结果。如果我使用一个标志来指示处理已完成,那么如果处理过程中出现问题(例如服务器超时等问题,这也是元素未被插入的问题),同样的问题可能出现。因此,这个指示标志永远不会被设置为true。 - matthes
@aib:如果消息处理(XML解析,使用Web服务进行增强)成本高昂,则可以从多核CPU和硬件使用中受益。我对我的应用程序进行了一些压力测试,使用单线程解决方案表现出较低的性能。 - matthes
@matthes,你的轮询解决方案可能有效,如果你添加了轮询超时:如果队列在M毫秒后不包含N个有序项目,则取第一个有序项目块并处理它们(也许,在极限情况下,块只包含一个项目)。 - Victor Sorokin
4个回答

3
请查看Disruptor——一个高性能队列,具有严格的顺序(先进先服务)。Disruptor

该死,我正要说那个 :-) - dty
看起来 OP 的情况是项目以无序的方式进入处理线程,他需要按照项目键提供的顺序进行处理。Disruptor 如何解决这个问题? - Victor Sorokin
谢谢提供链接,我一定会去查看并回报,如果它解决了我的问题。快速阅读后,似乎仍然没有解决等待缺失元素的问题 - 或者我没有读够。 - matthes
事情是,disruptor分成两个动作:
1)当“生产者”线程“声明”一个数字时,它基本上是保留了它。 2)当“生产者”线程将其数据放入队列中时,它会提交并允许“消费者”进行处理。 Disruptor 保证消息按照声明的顺序进行处理。也就是说,如果某个“生产者”已经保留了 #5,那么即使队列中已经有 #6,消费者也会等待它准备好。
- Vladimir Sitnikov
好的,这看起来就是我要找的东西。我会试一下并回报的!谢谢。 - matthes
这真的很棒。伙计们,看看Disruptor!它用不到100行代码就解决了我所有的问题。花了我一段时间才明白它并不是存储对象,而只是将ID作为实际对象的键。这使它运行速度极快。我将其与aib和Victor建议的自编容器进行了比较。我的容器由于使用了多个屏障而慢得多。 - matthes

1

您可以立即将占位符添加到处理队列中以等待传入请求。该占位符由线程池在后台进行预处理,但主要处理程序需要等待预处理完成。我考虑的构造是Future


0

类库提供了一个灵活的线程池实现,以及一些有用的预定义配置。您可以通过调用Executors中的静态工厂方法之一来创建线程池:

根据您的需求,我认为Executors.newSingleThreadExecutor()是最好的选择。单线程执行器创建一个单个工作线程来处理任务,并在其意外死亡时进行替换。任务按照任务队列所施加的顺序(FIFO、LIFO、优先级顺序)保证按顺序进行处理。


这确实满足了排序限制,但牺牲了性能。 - Péter Török
是的,正如以上评论中提到的,如果其他方法都不起作用,这将是我实现的最后一个解决方案。在许多情况下,消息处理可能非常昂贵。 - matthes

0

从你的问题和需要一个图表来看(顺便加一分),优先队列并不是你想要的好构造。这是因为队列很乐意为你提供可用的6,而不是等待不可用的4。

我认为现在是时候自己编写同步容器了。


不应该比滥用优先队列更复杂 :D - aib

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接