分布式分析系统数据一致性的架构设计

5
我正在重构一个分析系统,该系统将进行大量计算,并且我需要一些关于可能的架构设计的想法,以解决我面临的数据一致性问题。
当前的架构是基于队列的系统,在该系统中,不同的请求应用程序创建消息,最终由工作程序消耗。每个"请求应用程序"将大型计算分解为较小的部分,这些部分将被发送到队列并由工人处理。当所有部分完成时,源"请求应用程序"将汇总结果。此外,工人从集中式数据库(SQL Server)中消耗信息以处理请求(重要提示:工人不会更改数据库上的任何数据,只会消耗它)。
问题在于我们包含了一个 Web 服务来更新数据库中的信息。这可以随时发生,但是每个源自相同"请求应用程序"的"大型计算"都必须看到数据库上相同的数据。例如:
1. 应用程序 A 生成消息 A1 和 A2,并将其发送到队列。 2. 工人 W1 挑选出消息 A1 进行处理。 3. Web 服务器更新数据库,从状态 S0 更改为 S1。 4. 工人 W2 挑选出消息 A2 进行处理。
我无法允许工人 W2 使用数据库的 S1 状态。为了使整个计算保持一致,它应该使用先前的 S0 状态。
我考虑了两种解决方案:
1. 使用锁定模式来防止 Web 服务器在工人从中消耗信息时更改数据库。缺点是锁可能会持续很长时间,因为不同的"请求应用程序"计算可能会重叠。 2. 在数据库和工人之间创建一个新层(控制按需应用程序的数据库缓存的服务器)。缺点是添加另一层可能会带来显着开销,并且需要大量工作,因为我将不得不重新编写工人的持久性(大量代码)。
我正在考虑第二个解决方案,但对此并不十分自信。是否有更好的想法?我是否设计有误或者遗漏了什么?
  • 这是一个庞大的2层遗留系统(使用C#),我们正在尝试将其演变成一种更可扩展的解决方案,尽可能地减少努力。
  • 每个工作进程都有可能在不同的服务器上运行。

1
这听起来非常像是map/reduce。你为什么要自己编写这样的东西呢?我会直接使用Hadoop。 - duffymo
我忘了提到这是一个巨大的2层遗留系统(使用C#编写),我们正在尝试将其发展为更具可扩展性的解决方案,同时尽可能少地付出努力。我认为将所有内容更改为Hadoop将是一项巨大的任务。 - Fabio Marreco
比起编写、调试和维护Hadoop已经实现的功能,还要更加庞大吗?在承诺之前我会确认的。 - duffymo
1
我同意@duffymo的观点。你们将会花费余生来尝试让这个工作,发现最好的结果是最终一致性,并且不得不编写手动对账程序来处理节点故障。建议使用专门设计用于此类问题的语言进行全面重写,例如ERLang和Hadoop。 - Software Engineer
我对Hadoop不是很熟悉,所以请帮帮我。该系统有超过100万行代码,并已执行了分离/计算/聚合(Map/Reduce),但它采用In-Memory(单个服务器)。它还在SQL Server上具有许多智能功能(存储过程、函数等)。现在我需要进行横向扩展,我已经轻松地通过几周的开发迁移到了这个队列架构,除了数据库并发问题。您认为Hadoop能够在我的情况下提供帮助吗?如果可以,请指点我正确的方向。(我无法重写整个应用程序) - Fabio Marreco
我不清楚Hadoop如何解决这个问题。 Hadoop可能是实现方案#2的许多工具之一,但它肯定不能开箱即用地解决问题。 - Phil Sandler
3个回答

1

你能否为你的数据库版本控制?

假设请求应用程序使用ct1时间戳标记计算的开始。现在,此计算生成的每个消息都带有相同的时间戳。

并且每个数据库更新将DB状态与更新时间戳一起标记。因此,状态S0在时间t0上,状态S1在t1等。

现在,当工作程序获取消息时,它需要获取更新时间最大且小于或等于消息时间的DB状态。在您的示例中,如果A1和A2带有ct1时间戳,并且t1> ct1,则两个工作程序都将检索S0而不是S1。

这当然意味着您需要在数据库中保存多个版本。如果您知道您的计算必须在某个时间窗口之后完成,您可以在一定时间后清除这些版本。


你说得很对。我可以准备每个表以允许版本控制,但我需要重写整个模式。你知道SQL Server中是否有任何功能可以自动完成或至少使其更容易吗? - Fabio Marreco
不一定。但是在数据库模式中添加时间戳行通常不应该太困难。 - Rotem Hermon

1
我喜欢选项2,特别是如果完整计算所需的数据量不是不合理地大。我假设有一种方法可以通过id将属于同一整体作业的计算相关联吗?
当一组计算的第一条消息到达时,接收它的工作程序会查询数据库并获取执行所有计算所需的所有数据并创建一个临时数据存储。这个数据存储的外观取决于许多因素(大小、结构等),但它可以是一个blob/文档,一个关系模式中的一组数据(由correlationId隔离),企业缓存中的一个条目等。
您需要小心Worker 1和Worker 2都在处理同一组计算的情况,因为只有其中一个应该创建数据存储,并且两者都需要等待存储完全填充后才能继续进行。

我可以将这些计算与同一整体工作相关联。我可以生成一个ID并将其设置为所有计算。但是,发现作业需要的所有数据对我来说是一个非常复杂的任务。我考虑在架构中添加一个新服务器,所有工作人员将访问该服务器而不是数据库。该服务器将通过作业ID维护从数据库的结果缓存。你认为怎么样? - Fabio Marreco
那么你不是回到了最初的问题吗?或者说,序列中的每个计算只依赖于前一个计算的结果? - Phil Sandler
每个计算都独立于其他计算的结果,它们仅依赖于数据库中的数据。这个“服务器”将是唯一的,作为所有连接到数据库的门面,以便它可以在内存中管理每个人的缓存。不过我对此并不太有信心。 - Fabio Marreco
嗯,我还不清楚这样做如何解决问题。在消息A1和A2之间,数据库不能仍然发生变化吗?我认为你需要类似于在第一条消息到达时对数据库进行快照的操作。 - Phil Sandler

0

感谢大家的帮助。

既然我相信这个问题在其他场景中可能也很常见,因此我想分享我们选择的解决方案。

更深入地思考问题后,我理解了它的本质。

  • 我需要为每个任务进行某种类型的会话控制
  • 有一个缓存进程作为每个任务的会话控制

现在计算已经演变为分布式,我只需要使我的缓存也成为分布式。

为了实现这一点,我们选择使用内存数据库(哈希值),部署为单独的服务器。(在这种情况下 Redis)。

现在每次启动作业时,我都会为该作业创建一个 ID,并将其传递给它们的消息。

当每个工作者想要从数据库中获取某些信息时,它会:

  1. 在 Redis 中查找数据(使用作业 ID)
  2. 如果数据在 Redis 中,则使用数据
  3. 如果没有,则从 SQL 中加载数据,并将其保存在 Redis 中(使用作业 ID)。

在任务结束时,我清除与作业 ID 相关联的所有哈希。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接