RabbitMQ中的工作池和多租户队列

16
我工作的网络应用是基于云计算的多租户应用(许多客户,每个客户都有自己独立的“环境”,但所有客户共享一组硬件资源),我们正在引入用户批量处理功能。 批处理的类型并不重要,只是数量足够大,没有工作队列就不太实际。 我们选择RabbitMQ作为底层队列框架。
由于我们是多租户应用程序,我们不一定希望客户能够导致其他客户的队列处理时间过长,因此我们提出的一个想法是在每个客户端上创建一个队列,并将一个共享的工作池指向所有客户队列。 问题在于,据我所知,工人直接绑定到特定队列而不是交换机。 在我们理想的世界中,我们的客户队列仍将通过共享的工作池进行处理,而不会阻止其他客户端,我们可以通过启动更多工人或关闭闲置工人来根据需要增加或缩小工作池。 由于工人绑定到特定队列,这使我们在实际操作中无法做到这一点,因为我们经常会有很多工人在没有活动的队列上闲置。
有没有相对简单的方法来实现这一点? 我对RabbitMQ还比较陌生,还没有能够实现我们想要的功能。 我们也不想编写非常复杂的多线程消费程序,因为这会耗费时间在开发和测试中,而这可能是我们负担不起的。 我们的技术栈是基于Windows/.Net/C#,但我认为这不应该对问题有重大影响。
4个回答

6
您可以查看优先队列实现(在最初提出这个问题时尚未实现):https://www.rabbitmq.com/priority.html
如果这种方法不适用,您可以尝试其他方法来达到您想要的效果(这些方法适用于旧版本的RabbitMQ):
您可以有100个队列绑定到一个主题交换机,并将路由密钥设置为用户ID%100的哈希值,即每个任务的密钥都在1和100之间,同一用户的任务具有相同的密钥。每个队列与1到100之间的唯一模式绑定。现在,您有一组工作人员,他们从随机队列编号开始,然后在每个作业后增加该队列编号,再次对100取模以在队列100之后循环回到队列1。
现在,您的工作人员群体可以同时处理多达100个独特的用户,或者如果没有其他工作要做,则所有工作人员都可以关注单个用户。如果工人需要在每个作业之间循环通过所有100个队列,则在仅单个用户在单个队列上有大量作业的情况下,自然会在每个作业之间产生一些开销。较少的队列数量是处理此问题的一种方法。您还可以使每个工人保持对每个队列的连接,并从每个队列中消耗多达一个未确认的消息。只要未确认的消息超时设置得足够长,工人就可以更快地在内存中循环处理待处理消息。
或者,您可以创建两个交换机,每个交换机都有一个绑定队列。所有工作都发送到第一个交换机和队列,而一组工人则会消费它们。如果一个工作单元花费太长时间,那么工人可以取消它并将其推送到第二个队列。只有在第一个队列没有任何任务时,工人才会处理第二个队列。您可能还需要几个具有相反队列优先级的工人,以确保长时间运行的任务仍然在短任务不断到来时被处理,从而保证用户的批次最终总是被处理。这不会真正将您的工人群分配到所有任务上,但它将防止一个用户的长时间运行任务阻止您的工人执行同一用户或其他用户的短时间运行任务。这也假定您可以取消作业并稍后重新运行它而不会出现任何问题。它也意味着需要重新运行低优先级的超时任务会浪费资源,除非您能够提前识别出快速和慢速任务。
如果针对一个用户有100个缓慢的任务,然后另一个用户发布了一批任务,则这些任务在缓慢任务完成之前不会得到处理。如果这被证明是一个合法的问题,您可以将两个解决方案结合起来。

1

我不明白为什么你不使用RabbitMQ的虚拟主机,并让你的应用程序登录到RabbitMQ并为每个用户验证一个单独的连接。

这并不意味着你不能有一个工作进程监管者,将工作进程分配给一个或另一个用户。但这确实意味着每个用户的所有消息都由完全独立的交换和队列处理。


1

你可以让你的工作池都消费同一个独特的队列。这样工作就会在它们之间分配,你就能够增加/减少你的工作处理能力来扩大/缩小你的工作池。


2
我不是在询问如何将多个工作人员分配给同一队列,而是在询问相反的情况。我想要一个有限的工作人员池从大量(我们称之为约500)的队列中消费。 - bakasan
2
我亲身尝试过这种方法,结果并不理想:很难找到一个合适的启发式来处理所有这些队列。你是先处理最满的队列?还是先处理那些有较旧消息的队列?在这两种情况下,你都已经超出了AMQP协议范围,并且必须开始处理Rabbit管理API。然后你想:让我们与工作人员数量相同的队列数,并在500个Q和工作队列之间添加一些一致性哈希映射。然后你意识到,一个单独的队列和n个工作人员竞争就足够了。 - David Dossot
我有类似的需求,但是我想确保来自特定客户的消息按顺序处理。在创建联系人之前不会删除等等。是否有一些RabbitMQ的配置或设置可以做到这一点,同时在工作进程之间共享队列?(这是一个新的问题吗...?) - Aaron

0

工人被分配到0+个队列,而不是交换机。

每个工人从哪些队列中获取任务的逻辑是通过CELERYD_CONSUMER指示的类实现的,默认情况下为celery.worker.consumer.Consumer

您可以创建一个自定义的消费者类来实现您喜欢的任何逻辑。难点在于决定您想要使用的“公平性”算法的细节;但是一旦您决定了这一点,您就可以通过创建自定义消费者类并将其分配给适当的工人来实现它。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接