我需要实现一个公平队列系统,使得基于某些消息头的值,针对当前排队的所有具有该头部的值的消息,以轮询的方式处理消息。
系统中的消息自然地根据某些属性分组,其中可能有数千个可能的值,并且当前排队的消息的值集合会随时间变化而改变。一个类比是消息具有一个头部,该头部是消息创建时的毫秒部分。因此,该头部将具有0到999之间的值,并且该值将分布在当前所有排队的消息中。
我需要能够按照任何特定值不优先于其他值的顺序消耗消息。如果排队消息的标题值分布如下:
value | count
------|-------
A | 3
B | 3
C | 2
那么消费顺序将是A,B,C,A,B,C,A,B
。
如果队列中添加了另一个值的消息,则应自动将其添加到轮询序列中。
这意味着当前排队的一些消息需要一定的了解,但不要求消费者持有该知识;代理可能具有某种方式来排序交付。
可以接受在公平排队开始之前存在某些阈值。也就是说,如果阈值为10,则可按顺序处理具有相同值的10个消息,但是处理的第11个消息应该是序列中的下一个值。如果只有该值的排队消息,则下一个可能是相同的值。
可能的值的数量可能不允许简单地为每个值创建队列并迭代队列,尽管尚未测试。
我们正在使用HornetQ,但如果有提供这些语义的替代方案,我想知道。
消息是作业,标头值是用户ID。所寻找的是,在某些限制内,来自任何给定用户的作业不会不合理地延迟来自其他用户的作业;生产100万个作业的用户不会导致其他用户的后续作业等待处理这100万个作业。
HornetQ中的队列上的消费者按创建顺序进行评估,因此向队列添加选择性消费者不会阻止任何接收与过滤器匹配的消息的全捕获消费者。
JMS组似乎没有帮助,因为这将某个组(用户?)绑定到给定的消费者。
一种潜在的解决方案是基于需求(例如:来自同一用户的10个连续消息)在主题上创建选择性消费者,并有某些东西管理所有选择性消费者的生命周期以确保全捕获不会处理相同的消息。虽然可能,但这似乎具有一些烦琐的同步要求。