JMS是否支持“公平排队”?

12

我需要实现一个公平队列系统,使得基于某些消息头的值,针对当前排队的所有具有该头部的值的消息,以轮询的方式处理消息。

系统中的消息自然地根据某些属性分组,其中可能有数千个可能的值,并且当前排队的消息的值集合会随时间变化而改变。一个类比是消息具有一个头部,该头部是消息创建时的毫秒部分。因此,该头部将具有0到999之间的值,并且该值将分布在当前所有排队的消息中。

我需要能够按照任何特定值不优先于其他值的顺序消耗消息。如果排队消息的标题值分布如下:

value | count
------|-------
  A   |   3
  B   |   3
  C   |   2

那么消费顺序将是A,B,C,A,B,C,A,B

如果队列中添加了另一个值的消息,则应自动将其添加到轮询序列中。

这意味着当前排队的一些消息需要一定的了解,但不要求消费者持有该知识;代理可能具有某种方式来排序交付。

可以接受在公平排队开始之前存在某些阈值。也就是说,如果阈值为10,则可按顺序处理具有相同值的10个消息,但是处理的第11个消息应该是序列中的下一个值。如果只有该值的排队消息,则下一个可能是相同的值。

可能的值的数量可能不允许简单地为每个值创建队列并迭代队列,尽管尚未测试。

我们正在使用HornetQ,但如果有提供这些语义的替代方案,我想知道。

消息是作业,标头值是用户ID。所寻找的是,在某些限制内,来自任何给定用户的作业不会不合理地延迟来自其他用户的作业;生产100万个作业的用户不会导致其他用户的后续作业等待处理这100万个作业。

HornetQ中的队列上的消费者按创建顺序进行评估,因此向队列添加选择性消费者不会阻止任何接收与过滤器匹配的消息的全捕获消费者。

JMS组似乎没有帮助,因为这将某个组(用户?)绑定到给定的消费者。

一种潜在的解决方案是基于需求(例如:来自同一用户的10个连续消息)在主题上创建选择性消费者,并有某些东西管理所有选择性消费者的生命周期以确保全捕获不会处理相同的消息。虽然可能,但这似乎具有一些烦琐的同步要求。


我从你的例子中推断FIFO不是默认选项?这是因为可能会有一个用户生成异常请求峰值吗?那个用户可预测吗?他们应该有自己的队列/主题进行单独处理吗?显然,消息没有隐含的顺序,因为你可以按任意顺序处理它们(例如按用户),所以多线程读取服务不起作用吗? - Omertron
4个回答

0

您希望JMS代理实现一种消息传递算法(公平排队),但据我所知,这不是JMS规范的一部分。也许可以找到一种方法使代理执行此操作,但我怀疑这点,并且您提出的任何解决方案都可能是特定于代理的。

相反,为什么不将所需的排队算法放在您自己的应用程序中呢?例如:编写一个“公平排队转发器(FQF)”应用程序,该应用程序订阅来自代理的所有消息,以任何顺序消耗它们。让这个FQF应用程序尽可能快地消费消息,以便JMS代理队列始终为空或接近为空。然后,FQF应用程序可以将消息存储在本地队列中,并根据您所需的排队算法确定的顺序逐个重新发布到最终消息处理应用程序订阅的队列或主题上。在此端,您可能需要使用事务或某种流量控制,以便FQF应用程序仅以最终系统可以处理的速率发布消息。

以你的例子来说,这些消息代表着要按照消息头中的用户ID属性确定的某种顺序处理的作业。因此,我建议你编写一个作业调度算法,使用任何你想要的排队算法将作业传递给作业处理器。

这样,你就可以完全控制消息处理顺序,而不必以某种方式“欺骗”代理程序来做你想要的事情。你只需将JMS用作消息传递机制,因此无需编写自己的消息传递协议。


0

我不确定我完全理解这个问题,但是你的属性有成千上万种可能的值并且随着时间的推移而改变是有问题的。这听起来像是一个典型的“哈希值拯救”问题。那么,如何创建该属性的哈希值,然后对该值进行模运算以得出一个公平但先前已知的值呢?

假设有100个消费者从100个队列(命名为Q0到Q99)中处理是可行的。那么您可以在JMS生产者中执行以下操作:

String queueName = "Q" + user.hashCode()%100;

这是生产者发送到的队列名称。用户值也作为属性添加。同一用户将进入相同的队列(如果有太多用户,则排队),并且用户几乎均匀分布在消费应用程序中。

现在,您仍然面临一个问题,即一个不良用户创建了一百万个作业。此解决方案的第二部分可以是一个在启动时为空的JMS选择器。一旦消耗了第一条消息,您就会计算每个用户的作业数量,一旦达到阈值(例如,来自同一用户的10个作业),您将通过添加诸如“user NOT LIKE user123”之类的选择器来暂时禁止此用户。如果有多个这样的用户,则使用“AND”累积选择。一旦消费者不再收到任何消息,您就将该消费者的选择设置为空,并重新开始处理队列。


0
首先考虑的选项是拥有一个多线程消费应用程序。假设每个会话/消费者一个线程,可以设置选择器的同步或异步接收。每个选择器将针对特定用户进行键控。
在假设JVM在线程分派方面相当公平(我很乐意假设)并且应用程序代码中没有死锁的情况下,我会断言要求将得到满足。一个线程可能会卡在一个用户的百万作业上,其他线程不会受到影响。
然而,如果需要单线程应用程序,则 JMS 规范本身无法帮助。当然可能会有供应商扩展可以帮助。但是另一种选择是让应用程序查看每个消息并将其放入特定用户 ID 的特定队列中。最终的消费应用程序本身将在这些队列之间“轮询”以获取工作。需要另一个应用程序,但您将拥有一个非常确定性的系统。

1
多线程是我所期望的,但是考虑到有成千上万的用户,我不能简单地为每个用户分配一个线程。尽管 JVM 可能能够处理数千个线程,但仅在线程堆栈上消耗 GB 的内存将是相当浪费的。对于每个用户使用队列是确保按用户排序交付的一种方式,但可能涉及大量迭代空队列。 - ptomli
一个选择可能是使用精心构建的主题树。也许能够将用户分组在一起,这样你就会冒着一组用户中的某个人引起问题的轻微风险,但总体上更加公平。如果您使用消息监听器,则取决于队列的实现,拥有大量空白队列可能不是问题。不断轮询空队列我同意是很差的,但如果有服务器到客户端的回调机制,那么它就可以了。 - Calanais

0

我建议使用消息优先级来实现,同时将consumerWindowSize设置为0,并始终让客户端从服务器中选择新的消息。

这样会增加消息的延迟,但您总是可以从服务器接收到消息。

请注意,您需要考虑竞争情况。如果您正在消费消息C,而消息B已经到达,那么您将先消费C再消费B。但由于C已经在消费中,所以没有太多可以做的。

您还可以考虑消息分组,但这将使消息组与负载均衡中的单个消费者绑定。


1
JMS 优先级是在消息提交时设置的,这意味着要实现公平排队,我们需要提交者能够看到队列的当前状态,但这不切实际。我现在正在测试基于消息分组的系统,但正如你所说,分组与特定消费者绑定存在一些问题。 - ptomli

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接