Redis - 使用BRPOPLPUSH的更可靠的处理队列清理方法

20

我们目前的设计

环境:Redis 2.8.17

我们实现了一个可靠队列,使用与 Redis 文档中描述的类似模式,在 RPOPLPUSH 下。

但是,为了考虑到其阻塞特性,我们使用BRPOPLPUSH,并使用LPUSH 来确保先进先出(FIFO)顺序。

生产者: 多个线程(来自多台服务器)使用LPUSH将项目推入队列。

消费者: 多个线程(来自多台服务器)使用BRPOPLPUSH来处理项目。

BRPOPLPUSH q processing-q

如文档所述,redis会从队列'q'中弹出项目,并将它们添加到'processing-q'中。

问题

由于我们的应用程序是多线程(异步)的,因此我们无法控制何时消费者将完成其处理。

因此,如果我们使用LREM(根据文档)从processing-q中删除已处理的元素,则这只会删除processing-q的顶部元素,而不保证已删除相应消费者处理的实际元素。

因此,如果我们什么都不做,processing-q将继续增长(占用内存),这在我看来非常糟糕。

有任何建议或想法吗?


1
这里有一份关于“Redis作为可靠工作队列”的好演示文稿链接,以及同一主题的演讲者博客文章。它解释了一个在Redis中建模的更高级的队列设计,比RPOPLPUSH文档中的队列设计更先进。同时也提到了故障恢复活动。 - saaj
4个回答

27

您只需要在调用 LREM 时包含要删除的作业即可。

LREM 的格式如下:

LREM queue count "object"

它将从 queue 中删除 "object" 相等的 count 项。因此,要删除您的消费者线程正在处理的特定作业,您可以执行以下操作。

LREM processing-q 1 "job_identifier"

了解更多详细信息请查看此处的文档:http://redis.io/commands/lrem

为了处理崩溃的消费者和遗弃的作业,您可以使用SETEX创建带有过期时间的锁,并周期性地检查没有锁的作业。

整个流程如下:

生产者

  1. RPUSH q "job_identifier"

消费者

  1. SETEX lock:processing-q:job_identifier 60(先设置锁以避免竞态条件)
  2. BRPOPLPUSH q processing-queue
  3. 处理作业
  4. LREM processing-queue "job_identifier"

过期作业监视器

  1. jobs = LRANGE processing-queue 0 -1
  2. 对于每个作业:lock = GET lock:processing-q:job_identifier
  3. 如果lock为null,则此作业已超时,因此从processing-q中删除 LREM processing-queue "job_identifier"
  4. 并重试 RPUSH q "job_identifier"

@NotAUser在此处发布了一个开源java实现:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq


1
这看起来比建议的被接受的答案更容易实现,而且效果更好。 - lucaswxp
1
是的!感谢您的跟进,这就是我最终采取的措施。 顺便说一下,我最终实现了这个:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq - NotAUser
5
我想我可能错过了一些显而易见的东西,但在获得作业 ID 之前如何使用作业 ID 设置锁定? - user0103
1
在过期作业监视器中,步骤4不应该在步骤3之前发生吗?最好是消费者接收任务多次,而不是从未接收。 - NeverEndingQueue
2
我不确定现在是否愚笨,但我仍然无法理解这个想法。生产者使用 RPUSH 添加带有 GENERATED_ID 和作业内容的字符串。然后,消费者使用完整的作业字符串或仅使用 GENERATED_ID(由生产者)运行 SETEX,但是消费者如何在首先轮询 q 之前发出该命令呢?除了此队列机制外,producerconsumer 之间是否有其他通信? - NeverEndingQueue
显示剩余9条评论

12

我会采用一个逐个消费者的处理队列(例如,processing-q:consumer-id)的方法。这将解决您当前的问题,但您仍然需要找到一种处理已崩溃的消费者的方法。为此,我建议您同时保存每个消费者最后弹出任务的时间,并定期检查超时情况。如果某个消费者达到了超时时间,将其任务移回主队列并删除其队列。


4
在类似的项目中,我正在使用工作进程的主机名和进程ID来备份队列。每个工作进程都有自己的备份队列,如果工作进程死亡,项目不会丢失。
有关详细信息,请查看READMEimplementation

4
当一名工人在处理过程中死亡并使用不同的进程ID重新启动时,会发生什么?新的工作进程如何查找其前任的备份队列? - skyork

0
除了提出的解决方案,您还可以使用ltrim来处理队列,使其达到适合您服务的数量。这将确保处理队列永远不会失控。
但是,如果达到修剪限制,您将开始丢失项目。这可能或可能不适用于您的用例。

http://redis.io/commands/ltrim


你说得对,在我的使用情况下,丢失一些项目是不可接受的。 - aspdeepak

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接