任务队列与作业亲和性

19

我目前面临一个问题,我相信这个问题有一个官方名称,但我不知道该在网上搜索什么。如果我描述了这个问题和我所想的解决方案,希望有人能告诉我设计模式的名称(如果有与我所要描述的相匹配的)。

基本上,我想要的是一个作业队列:我有多个客户端创建作业(发布者),以及处理这些作业的工作者(消费者)。现在我想将发布者创建的作业分配给各个消费者,这基本上可以使用几乎任何具有负载均衡性的消息队列来实现,例如使用RabbitMQ或甚至MQTT 5。

然而,现在事情变得复杂了...每个作业都涉及到外部实体,比如用户。我想让单个用户的作业按顺序处理,但对于多个用户则并行处理。我没有要求用户X的作业始终要发送到工作者Y,因为它们无论如何都应该被顺序处理。

现在我可以使用RabbitMQ及其一致性哈希交换来解决这个问题,但当新的工作者进入集群时,我会遇到数据竞争,因为RabbitMQ不支持重新定位已经在队列中的作业。

MQTT 5也不支持这一点:这个想法在这里被称为“粘性共享订阅”,但这并不是官方名称。它可能是MQTT 6的一部分,也可能不是。谁知道呢。

我还看了NSQ、NATS和其他一些代理程序。大多数甚至不支持这种非常特定的情况,而那些支持的则使用了前面提到的存在数据竞争问题的一致性哈希算法。
如果代理程序在作业到达时不将作业排序,而是跟踪特定用户的作业是否已经在处理,则可以消除这个问题:如果是,则应延迟该用户的所有其他作业处理,但应继续处理其他用户的所有作业。就我所知,使用RabbitMQ等等工具不可能实现这一点。
我相信我不是唯一一个有这种用例的人。例如,我可以想象用户上传视频到视频平台,虽然上传的视频会并行处理,但单个用户上传的所有视频将按顺序进行处理。
简而言之:我描述的东西是否以通用名称为人所知?像“分布式作业队列”这样的东西? “具有任务关联性的任务分派器”?或者其他任何东西?我尝试了很多术语,但没有成功。这可能意味着没有解决方案,但正如我所说,很难想象我是地球上唯一面临此问题的人。
有什么建议吗?还有:是否有任何实现此功能的工具?任何协议?

PS:仅使用预定义的路由键不是一个选项,因为用户ID(我这里只是举了一个虚构的例子)基本上是UUID,所以可能有数十亿个,所以我需要更动态的解决方案。因此,一致性哈希算法基本上是正确的方法,但是如前所述,分布必须逐块进行,而不是预先进行,以避免数据竞争。


我可能会使用数据库而不是消息队列来解决这个问题。由于处理消息需要先决条件,因此您已经有了一个争议点,因此最好在那个时候使用正确的工具。 - theMayer
使用一些现代消息代理,是否已经找到了解决这个问题的方法?我在Kafka或RabbitMQ中找不到方法来做到这一点。似乎需要实现额外的协调才能实现。我只在旧的、经典的代理(ActiveMQ、Weblogic JMS、Apache Qpid)中找到了这个功能,称为“消息组”或“顺序单元”。这意味着给定“单元”或“组”的消息以动态方式推送到当前空闲工作器中的一个(每次可能选择不同的工作器)。 - tporeba
9个回答

24

Temporal Workflow可以在最小的努力下支持您的用例。

这里是一个满足您要求的草案设计:

  • 使用userID作为workflow ID向用户工作流发送signalWithStart请求。它会将信号传递到工作流或者首先启动工作流再将信号传递到它。
  • 该工作流中的所有请求都会被缓冲。Temporal提供了一个硬性保证,即只能存在一个具有给定ID并处于打开状态的工作流。因此,所有信号(事件)都有保证会被缓冲到属于用户的工作流中。在任何进程或基础架构故障的情况下,Temporal保留工作流中的所有数据(包括堆栈跟踪和局部变量)。因此,不需要显式地持久化taskQueue变量。
  • 内部工作流事件循环逐个调度这些请求。
  • 当缓冲区为空时,工作流可以完成。

这里是用Java实现它的工作流代码(Go、Typescript和PHP SDK也受支持,Python处于alpha阶段):

@WorkflowInterface
public interface SerializedExecutionWorkflow {

    @WorkflowMethod
    void execute();

    @SignalMethod
    void addTask(Task t);
}

@ActivityInterface
public interface TaskProcessorActivity {
    void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

    private final Queue<Task> taskQueue = new ArrayDeque<>();
    private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

    @Override
    public void execute() {
        while(!taskQueue.isEmpty()) {
            processor.process(taskQueue.poll());
        }
    }

    @Override
    public void addTask(Task t) {
        taskQueue.add(t);
    }
}

然后是将该任务通过signal方法排队到工作流的代码:

private void addTask(WorkflowClient cadenceClient, Task task) {
    // Set workflowId to userId
    WorkflowOptions options = WorkflowOptions.newBuilder()
       .setTaskQueue(TASK_QUEUE)
       .setWorkflowId(task.getUserId())
       .build();
    // Use workflow interface stub to start/signal workflow instance
    SerializedExecutionWorkflow workflow = temporalClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
    BatchRequest request = temporalClient.newSignalWithStartRequest();
    request.add(workflow::execute);
    request.add(workflow::addTask, task);
    temporalClient.signalWithStart(request);
}

相比于使用队列进行任务处理,Temporal提供了许多其他优势。

  • 内置指数级重试和无限期限的过期处理
  • 故障处理。例如,它允许执行一个任务,在配置的时间间隔内如果两个更新都无法成功,则通知另一个服务。
  • 支持长时间运行的心跳操作
  • 能够实现复杂的任务依赖关系。例如,实现调用链或在不可恢复故障的情况下实现补偿逻辑(SAGA
  • 完全可见当前更新状态。例如,当使用队列时,您只知道队列中是否有一些消息,并且需要额外的数据库来跟踪整体进度。使用Temporal,每个事件都被记录。
  • 能够取消正在进行的更新。
  • 分布式CRON支持

请参阅这个演示,了解Temporal编程模型。


如果工作进程崩溃,Queue<Task> taskQueue 不会丢失吗?如果我理解正确,信号将不会重新发送到新的工作进程,因为当前工作进程已经接收到它们并将其放置在内部状态中。据我所知,Cadence 不存储工作流状态,而是从历史记录中重新创建它。信号是否是历史记录的一部分? - Ruslan Stelmachenko
我认为你混淆了Cadence通过事件溯源提供容错机制的实现细节和编程模型。在工作线程崩溃的情况下,Queue<Task> taskQueue不会丢失。这正是Cadence极大简化分布式系统开发的原因。我将这种模型称为“无视故障代码”,因为工作流代码甚至不知道工作线程失败的情况。编辑:我仔细阅读了你的评论。是的,信号绝对是工作流历史的一部分。如果没有这个,恢复状态将是不可能的。 - Maxim Fateev
谢谢!我建议您更新文档以明确指定,信号是历史的一部分(现在不太明显)。还可以添加一个包含一些内部信息的部分,例如“工作原理”。例如,我想知道:信号何时进入工作流程,在哪个线程中调用信号处理程序方法(在主工作流线程中,仅在特殊位置如WorkflowThread.sleep()?)。此外,当调用活动或sleep()时,主工作流线程会发生什么(我的意思是,毕竟Java不支持协程)。等等。谢谢! - Ruslan Stelmachenko
同意,我计划编写一个单独的章节,介绍Java和Go客户端如何进行多线程处理。其要点是采用协作式方法,并使用真实线程。因此,当调用sleep时,真实线程会被阻塞。但如果工作流程被推出缓存,该线程将释放回进程。这严重限制了缓存Java工作流程的数量,因为限制始终在于线程而不是内存。但从用户的角度来看,工作流程在整个睡眠期间都被阻止,这并不影响用户体验。 - Maxim Fateev

3
我希望拥有一个工作队列:我有多个创建作业(发布者)的客户端和多个处理这些作业(消费者)的工作者。现在,我想将由发布者创建的作业分配给各个消费者,这基本上可以使用几乎任何负载均衡的消息队列来完成,例如使用RabbitMQ甚至MQTT 5。

然而,现在情况变得复杂了...每个作业都指向一个外部实体,比如一个用户。我想要的是单个用户的作业按顺序处理,但对于多个用户则可以并行处理。我没有要求用户X的作业总是被分配到工作者Y,因为它们无论如何都应该按顺序处理。

即使不是这种特定用例,我几个月前做了一项(动态)任务调度调查[0][1],也没有类似的东西出现。

每个调度算法都有一些通用的属性,如优先级、年龄、入队时间、任务名称(通过平均处理时间推导)。如果您的任务都与用户相关,则可以构建一个调度程序,考虑到用户ID来从队列中选择任务。
但我猜想,您不想构建自己的调度程序,无论如何这将是浪费,因为根据这种需求的经验,现有的消息队列允许实现您的要求。
总结您的要求如下:
只运行一个用户任务的调度程序。
解决方案是使用分布式锁,类似于REDIS distlock,在任务开始前获取锁,并在任务执行期间定期刷新它。如果同一用户的新任务进来并尝试执行,则无法获取锁并将被重新排队。
以下是伪代码:
def my_task(user_id, *args, **kwargs):
    if app.distlock(user_id, blocking=False):
        exec_my_task(user_id, *args, **kwargs)
    else:
        raise RetryTask()

不要忘记刷新释放锁定。
类似的方法被用来强制执行爬虫中每个请求之间的robots.txt延迟。

使用某种分布式锁系统似乎是一个不错的建议,但我同意playsted的观点,在从队列获取任务后在消费者内部检查这一点已经太晚了。如果它被用来选择从哪个队列轮询任务,那就完全是另一回事了。 - tporeba

2
Amirouche所描述的方案是一个简单的解决方案,只要锁冲突不经常发生。如果发生了,你的工人将浪费很多时间去获取他们必须拒绝并重新排队的消息。
另一种很好地解决这类问题的替代方案是Actor模型/Actor框架。一些例子包括Akka、Orleans、Protoactor和Cadence(上面提到过,尽管Candence不仅仅是一个Actor框架)。这些框架可能会变得非常复杂,但在它们的核心可以确保单个Actor的消息被逐个处理,但允许许多Actor同时处理(在您的场景中每个用户ID将有一个Actor)。这些框架抽象了所有的消息路由和并发,大大简化了实现,并应该更加健壮/可扩展。

1

将每个实体的处理顺序作为硬性要求是具有挑战性的。

每个发布任务的持续时间有多长?如果它们总是非常短,您可以通过哈希分发任务,并在每次更改工作池的形状时简单地清除正在运行的作业,而不会损失太多生产力。

如果它们运行时间较长,则可能会太慢。在这种情况下,您还可以潜在地让工作人员从快速中央服务(如Redis或其他服务)中获取每个任务的user_id的原子咨询锁,以便在其执行期间使用。此服务也可以按用户ID范围或其他方式单独扩展分区。如果在接收任务和执行其第一个副作用之间有足够的间隔,工作人员甚至不需要在成功获取锁后阻塞,直到即将提交,因此可能不会看到明显的延迟。争用*可能很少:如果您已经在使用一些一致的哈希方案来分发工作,那么它们确实很少见,并且仍然只会在工作池拓扑结构变化时发生。您应该至少使用散列分布来保证只有两个工作人员竞争锁:旧的和新的。**

如果授予锁是按照先到先服务的顺序提供服务,且锁的请求速度比工作池拓扑变化更快(也就是说,工作者一旦从发布者那里收到作业,就会排队等待锁),即使拓扑变化非常迅速,这甚至可以给您提供有关排序的相当好的保证。
编辑:
*我最初写的是“故障”;这并不完全是我的意思。换句话说,除非拓扑发生变化,否则该锁服务几乎永远不会遇到任何锁定争用,因为给定用户的任务通常会发送到同一个工作者。

另一个可能性是:您可以只进行部分工作池排空,并提供良好的保证。如果您正在使用一种一致的哈希方案来分配任务,并且可以维护完成已调度任务的低水位线,那么可以推迟启动目标工作者与最老的当前执行任务开始时不同的任务(即仅为其分配了新工作者的用户的运行任务)。这涉及到相当多的额外复杂性;如果您可以高效地跟踪低水位线并且没有长时间运行的任务,那么它可能是一个允许您省略锁定服务的好选择。然而,在撰写本文时,我不清楚这是否比使用锁定更便宜;低水位通常不易可靠地实现,并且在错误的时间死亡的工作者可能会延迟整个更改工作者的1/N队伍的处理,而不仅仅是在它死亡时正在运行的用户的任务。


1

Kafka支持您所需的内容。您需要配置一个键,Kafka将确保所有具有相同键的消息按顺序处理。


1
不是按顺序的,它会确保所有具有相同键的数据都进入同一个节点。 - prashant

1

Apache Qpid代理支持称为消息组的功能,其中路由键和工作进程之间的关系是动态的,并基于当前流量。

消费顺序意味着代理将不允许未确认消息在给定组的多个使用者之间。

这意味着在给定时间内只有一个使用者可以处理来自特定组的消息。当使用者确认其获取的所有消息时,代理可能会将该组的下一个挂起消息传递给另一个使用者。

这可能会提供更好的工作者利用率:

请注意,不同的消息组不会阻止彼此传递。例如,假设队列包含来自两个不同消息组(称为"A"组和"B"组)的消息,并且它们被排队,使得"A"组的消息在"B"组之前。如果"A"组的第一条消息正在被客户端消费,则剩余的"A"组消息将被阻塞,但是"B"组的消息可供其他消费者消费 - 即使它在队列中处于"A"组之后。
然而,与其他代理相比,这个功能可能会带来显著的性能损失。而且现在对Qpid没有太多兴趣。(与其他代理相比) 4 5

编辑:还有其他经纪人也提供此功能:ActiveMQActiveMQ Artemis 编辑2:事实证明,ActiveMQ和Artemis中的“消息组”工作方式不同 - 组分配给工作程序是静态(粘性),而不是动态的。


0

我能够通过搜索"分类排序的作业队列"找到this描述的行为类型的讨论。

不幸的是,看起来他们没有解决你的问题。

有一个先前问题的答案建议不使用任何消息代理服务处理有序或具有业务逻辑敏感性的任务,原因可能适用于你正在做的事情也可能不适用。它还指出了一种似乎可以完成你正在尝试做的事情的技术,但可能对手头的任务不具备良好的可扩展性。

如果你有粘性的选项,它将以简洁且最小的额外低效方式解决你的问题。当然,粘性也有其自身的故障模式;没有理由认为你会找到一个完全符合你所需权衡的实现。

我猜测,因为你在这里提出了问题,每个用户的顺序性是重要的。在你提供的视频平台处理上传的示例中,违反顺序性不会有太大影响。更广泛地说,大多数需要大规模吞吐量负载均衡作业队列的人并不需要对处理顺序有保证。

如果你最终需要自己构建这个东西,你将有很多选择。我得到的印象是你期望有一个巨大的吞吐量、高度并行化的架构和低用户ID冲突率。在这种情况下,您可以考虑维护一个先决条件列表:
当一个新任务进来时,负载均衡器会搜索所有正在处理、已分配和未分配的作业,以查找与作业关键字(user_id)匹配的任何作业。
如果存在匹配项,则将新作业添加到未分配列表中,并将其共享其键的最旧作业作为先决条件。
每次作业完成时,工作者都需要检查未分配列表,以查看它是否刚刚完成了任何人的先决条件。如果是,工作者可以将该子作业标记为分配,或者直接处理该子作业。
当然,这也有自己的故障模式;您必须做出权衡。


-2

Kafka 可以帮助你,因为它会暂时存储消息,这样你就可以再次轮询它们。


-2

如果我正确理解您的情况,我认为您描述的功能与 Message SessionsAzure Service Bus 中的工作方式非常相似。

您只需在将消息推入队列之前将消息的 SessionId 属性设置为 UserId

每个消费者将锁定会话以处理一个接一个的消息,并且这些消息将属于同一用户。完成后,消费者可以继续进行下一个可用会话。

此外,Azure Functions 最近发布了服务总线会话支持,目前处于预览状态,但可以轻松实现所有这些。

不幸的是,我不熟悉是否存在于开源替代品中,但我希望这可以帮助到您。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接