使用MQ异步处理、聚合和发布数据

7

一些背景知识,然后才是真正的问题:

我正在开发一个后端应用程序,由几个不同模块组成。每个模块目前都是一个命令行Java应用程序,可以根据需要“按需”运行(稍后会有更多详细信息)。

每个模块是一个“步骤”,是可以视为数据流的更大过程的一部分;第一步从外部源收集数据文件并将其推送/加载到某些SQL数据库表中;然后,基于不同的条件和事件(时间、DB中的数据存在、通过Web服务/Web界面完成的消息和计算),后续步骤从(1个或多个)DB表中获取数据,处理它们,并将它们写入不同的表格。步骤在三个不同的服务器上运行,从三个不同的DB中读取数据,但只写入单个DB。目的是汇总数据,计算指标和统计信息。

目前,每个模块定期执行(从第一个模块的几分钟/几小时到链中最后几个模块的数天,需要聚合更多数据,因此需要“等待”它们可用性更长时间),使用cronjob。运行一个模块(目前是Java控制台应用程序),它检查给定时间窗口内的新的未处理信息,并执行其工作。

问题:它有效,但是…我需要扩展和维护它,而这种方法开始显示出其限制。

  1. 我不喜欢依赖“轮询”;考虑到以前模块的信息可能足以“告诉”链中其他模块何时可用它们所需信息,它们可以继续进行。
  2. 它很“慢”:链条下面的几天延迟是因为我们必须确保数据已到达并由上一个模块处理。因此,我们暂停这些模块,直到确信我们拥有所有数据。新添加的内容需要实时(不难办到,但尽快)计算某些指标。这里在 SO 上发生的事情很好地说明了这一点! :) 我需要获得非常相似的东西。

为了解决第二个问题,我将引入“部分”或“增量”计算:只要我拥有一组相关信息,我就会处理它。然后,当其他链接的信息到达时,我计算差异并相应更新数据,但然后我还需要通知其他(相关的)模块。

问题(S)

- 1)哪种方法最好? - 2)相关:通知其他模块(在我的情况下为Java可执行文件)可用的最佳方法是什么?

我可以看到三种方法:

  • 在数据库中添加其他“非数据”表,每个模块都会写入“嘿,我已经完成了这个任务并且可用”。当cronjob启动另一个模块时,它读取表格,决定它可以计算子集xxx,并进行计算。以此类推。
  • 使用消息队列,如ZeroMQ(或者像@mjn建议的Apache Camel)而不是数据库表
  • 使用键值存储,如Redis,而不是数据库表

编辑:我相信基于队列的方法是正确的方法,我添加了“表+轮询”选项以保证完整性,但现在我明白它只是一个干扰(显然,每个人都会回答“是的,请使用队列,轮询很糟糕” - 这是正确的!)。所以让我重述问题: 使用MQ会比使用pub / sub类Redis的键值存储有哪些优缺点?

  • 3) 有没有解决方案可以帮助我完全摆脱cronjobs?

编辑:特别是在我的情况下,意味着:是否有一种机制在某些MQ和/或键值存储中,可以使我发布带有“时间”的消息?例如“在1天内交付”?当然具有持久性和“几乎一次”的交付保证

  • 4) 我应该将这个基于消息(事件?)的解决方案构建为一个集中式服务,在其中一个服务器上作为守护程序/服务运行吗?
  • 5) 我是否应该放弃启动订阅者的想法,并使每个模块作为守护程序/服务持续运行?
  • 6) 这些都是利弊分析(可靠性,单点故障与资源使用和复杂性之间的关系...)?

编辑:这是我最关心的部分:我想要 “队列” 本身激活基于队列中的消息的“模块”,类似于MSMQ激活。 这是一个好主意吗?在Java世界中有什么可以做到这一点的东西,是我应该自己实现它(在MQ或Redis上),还是应该将每个模块作为守护线程运行? (即使某些计算通常会发生突发情况,在长达两小时的处理后跟随两天的闲置?)

注意:我不能使用重量级容器/EJB(没有Glassfish或类似产品)

编辑:像Camel这样的东西对我来说也有点重,我在这里寻找的是真正轻巧的东西,无论是资源还是开发复杂度方面。


除了发布者/订阅者模型之外,您还可以在评估中包含Actor模型:http://akka.io/ - TizianoPiccardi
看起来是一个相当不错的应用程序,我知道你的痛苦。也许对你来说不是解决方案,但我曾经通过使用事件处理引擎Esper,在飞行中计算统计数据和指标,成功避免了那些典型的日终/周终批处理作业。您可以注册一个查询,其结果在每次事件到达时重新计算。域语言非常强大,特别是对于时间基础数据,通常在SQL甚至过程性语言中都很难实现。 - MarianP
我把它放在了最开始的位置,从0开始。但是你可能会发现迁移很容易,因为我说过领域语言很强大(虽然一开始需要学习和实验一下)。Esper是用Java编写的(也有一个等效的.Net版本),具有适当的API和单元测试支持。我唯一遇到的问题是,我必须将所有事件存储在数据库中,并在JVM重新启动时重放它们。然而,引擎还有一个付费版本,应该支持这种持久性的东西。 - MarianP
顺便说一下,它非常快,我认为我读到过它能够在1秒内处理100k事件和500个注册查询!您可能不能使用Esper,但是在线处理事件是一个不错的范例。还有其他类似(付费)引擎可供选择。 - MarianP
@dema80 我没有完全阅读你的问题,但通过教育猜测,我建议你查看此链接:https://dev59.com/SXE85IYBdhLWcg3wvGKm - mostruash
显示剩余2条评论
3个回答

1

1>我建议使用消息队列,根据您的要求选择队列,但在大多数情况下任何一种都可以。我建议您选择基于JMS协议(Active MQ)或AMQP协议(Rabbit MQ)并编写简单的包装器或使用Spring提供的spring-jms或spring-amqp。

2>您可以编写队列消费者,以便它们通知您的系统有新消息到达,例如在Rabbit中,您可以实现MessageListener接口。

 public class MyListener implements MessageListener {
     @Override
public void onMessage(Message message) {
     /* Handle the message */        

    }
}

3> 如果您使用异步消费者,如<2>中所述,您可以摆脱所有轮询和cron作业

4> 取决于您的要求->如果数百万个事件/消息通过您的队列,则在集中服务器上运行队列中间件是有意义的。

5> 如果资源消耗不是问题,则让您的消费者/订阅者一直运行是最简单的方法。如果这些消费者是分布式的,则可以使用像zookeeper这样的服务进行编排

6> 可扩展性 ->大多数排队系统提供了易于分发消息的功能,因此只要您的消费者是无状态的,就可以通过添加新的消费者和一些配置来实现扩展。


谢谢你的回答;我喜欢基于JMS的MQ所允许的监听器方法。不过,我需要更多细节:你说“让你的消费者/订阅者一直运行是最简单的方法”,但我担心的是可用性(如果一个订阅者崩溃了而我没有发现呢?),而不是资源消耗。 - Lorenzo Dematté
此外,我仍在寻找完成“在2小时内处理/发送此消息”的工作的最佳方法。 - Lorenzo Dematté
可用性 - 您可以使用 JMS 或 AMQP 事务,如果事务失败,您可以收到通知,您还可以使用失败的确认来执行此操作。延迟消息 - JMS 没有这个功能,但很容易自己添加,Rabbit MQ 最近以间接的方式添加了该功能。http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html - winash

1
队列任务描述部分听起来像基于“企业集成模式”的系统,如Apache Camel
延迟消息可以用常量表示。
from("seda:b").delay(1000).to("mock:result");

或者变量,例如消息头值。
from("seda:a").delay().header("MyDelay").to("mock:result");

Camel似乎确实是“普通”MQ和/或使用redis的自制MQ的有效替代方案。然而,我的其他问题(如何放置/优缺点/集中式vs分布式)仍然存在。(例如,Camel是否支持分布式模型?如果是这样,如何实现?) - Lorenzo Dematté

0

实现之后,我觉得回答自己的问题对于未来访问StackOverflow的人们是有好处的。

最终,我选择了Redis。它非常快速和可扩展。我很喜欢它的灵活性:它比消息队列更加灵活。我是否断言Redis在MQ方面比其他MQ更好?嗯,在我的特定情况下,我相信是这样的。关键是:如果某些功能不是开箱即用的,你可以构建它(通常使用MULTI - 但你甚至可以使用LUA进行更高级的定制!)。

例如,我遵循this good answer来实现“持久性”,可恢复的发布/订阅(即允许客户端死亡并重新连接而不会丢失消息的发布/订阅)。

这帮助我满足了我的可扩展性和“可靠性”要求:我决定将管道中的每个部分保持独立(目前是一个守护进程),但添加一个监视器来检查Redis上的列表/队列;如果某些内容没有被消耗(或者消耗得太慢),监视器就会生成一个新的消费者。我还考虑到真正的“弹性”,并增加了消费者在没有工作可做时自我终止的能力。

另一个例子:执行定时活动。我正在遵循这种方法,它现在似乎相当流行。但我渴望尝试keyspace通知,看看过期键和通知的组合是否可以成为一种更优越的方法。
最后,作为访问Redis的库,我的选择是Jedis:它很受欢迎,得到支持,并提供了一个不错的接口来实现pub/sub作为监听器。虽然这不是Scala的最佳方法(惯用法),但它运行良好。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接