如何创建一个分布式的“去抖动”任务来清空Redis列表?

11

我有一个使用场景: 多个客户端向共享的 Redis List 推送数据。另外一个工作进程应当抽取 (处理和删除)这个列表。等待/多重执行已经就绪,以确保这一过程顺利进行。

出于性能方面的考虑,我不想立即调用“drain”进程,而是要在 x 毫秒之后,从第一个客户端推送到(然后为空的)列表开始计时。

这类似于分布式 underscore/lodash 的 debounce函数,其计时器从第一个项目进入时开始运行(即:“leading”而不是“trailing”)

我正在寻找可靠的容错方法来完成此操作。

目前我倾向于以下方法:

  1. 使用 Redis SetNXpx 方法。这样可以:
    • 仅在专用键空间中设置值(互斥),如果它尚不存在。这就是 nx 参数的作用。
    • 在 x 毫秒后过期。这是 px 参数的作用。
  2. 如果设置了值,则此命令返回 1,表示以前不存在任何值。否则,它将返回 01 表示当前客户端是自 Redis 列表被清空以来运行进程的第一个客户端。因此,
  3. 此客户端将在 x 毫秒内将作业放入分布式队列中以便调度运行。
  4. 在 x 毫秒后,接收作业的工人开始排空列表的过程。

这在原理上可行,但感觉有些复杂。是否有其他方法以分布式容错的方式解决这个问题?

顺便说一句:Redis和分布式队列已经就位,所以我认为在解决这个问题时使用它不会增加额外的负担。

2
我很惊讶分布式去抖动机制还没有被发明出来 - 这似乎是一种常见的数据流(限制消息速率)。 - Mark Essel
1个回答

9
抱歉,正常的回答需要大量的文字/理论。因为您已经提出了一个好问题,您已经写了一个好答案 :)
首先,我们应该定义这些术语。在underscore/lodash方面,“debounce”应该在David Corbacho的文章解释下学习:
防抖:将多个事件视为“一组”。想象一下,你回家了,进入电梯,门正在关闭……突然你的邻居出现在大厅里,试图跳上电梯。要有礼貌!给他开门:您正在去除电梯的抖动。考虑到同样的情况可能会再次发生,并且如此等等......可能会延迟几分钟的出发时间。
节流:将其视为阀门,它调节执行的流量。我们可以确定函数在某个时间内可以被调用的最大次数。因此,在电梯类比中,您足够礼貌地让人们进入10秒钟,但是一旦过了这段时间,您必须走!
您正在询问 debounce ,因为第一个元素将被推送到列表中:
因此,类比于电梯。电梯应该在第一个人上电梯后10分钟后开始上升。无论有多少人挤进电梯都没有关系。
在分布式容错系统的情况下,这应该被视为一组要求:
1. 在插入第一个元素(即创建列表)后,必须在X时间内开始处理新列表。 2. 工作进程崩溃不应该破坏任何内容。 3. 无死锁。 4. 第一个要求必须得到满足,无论工作人员数量是1个还是N个。
也就是说,您应该知道(以分布式方式)-工作人员组必须等待,或者您可以开始处理列表。只要我们提到“分布式”和“容错”,这些概念总是伴随着它们的朋友:
1. 原子性(例如通过阻塞) 2. 预留
实际上,在实践中,我担心您的系统需要变得更加复杂(也许您只是没有编写它,而已经拥有它)。
您的方法:
  1. 使用SET NX PX进行悲观锁定。 NX保证只有一个进程在做工作(原子性)。PX确保如果此过程发生任何事件,Redis会释放锁(关于死锁的容错的一部分)。
  2. 所有工作进程都尝试获取一个互斥锁(每个列表键一个),因此只有一个进程会在X时间后处理列表。此进程可以更新互斥锁的TTL(如果需要更多时间)。如果进程崩溃,则互斥锁将在TTL之后被解锁,并被其他工作进程获取。

我的建议

Redis中容错可靠队列处理是围绕RPOPLPUSH构建的:

  • 从处理到特殊列表(每个工作进程每个列表)中RPOPLPUSH项。
  • 处理项目
  • 从特殊列表中删除项目

需求 因此,如果工作进程崩溃,我们总是可以从特殊列表中返回损坏的消息到主列表。Redis保证RPOPLPUSH / RPOP的原子性。也就是说,只有一组问题工作人员需要等待一段时间。

然后有两个选择。第一种-如果客户端很多,但工作人员较少,请在工作人员侧使用锁定。因此,尝试在工作人员中锁定互斥体,如果成功-开始处理。

反之亦然。每次执行LPUSH / RPUSH时都使用SET NX PX(如果您有许多工作人员和一些推送客户端,则具有“在弹出我之前等待N时间”的解决方案)。因此,推送是:

SET myListLock 1 PX 10000 NX 
LPUSH myList value

每个工作线程只需检查myListLock是否存在,如果存在,则应在至少键TTL之前等待设置处理互斥锁并开始排空。

感谢您提供了详细的答案。也许您能明确解释一下为什么我的方法不够好吗?我可能错过了某些内容。 - Geert-Jan
在实际系统中,要求4(每个队列使用多个工作人员)是任何负载系统中的重要要求之一。因此,您应该至少使用2个互斥锁-一个用于去抖逻辑,另一个用于“已处于去抖模式的队列”。此外,从特殊列表返回需要更多一个特殊的“恢复”工作者或者在从列表代码中弹出时需要更复杂的代码。 - Nick Bondarenko

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接