大型数据集上的连续聚合

8
我正在思考一种算法来解决我遇到的问题。这不是一个作业问题,而是我正在处理的一个副项目。
有一个名为表A的表格,大约有10^5行,并且每天按顺序添加10^2行。 表B大约有10^6行,每天新增10^3行。从A到B有一对多的关系(A中的某些行对应多个B行)。
我想知道如何对这种数据进行连续聚合。我希望有一个任务每隔约10分钟运行一次,并执行以下操作:对于A中的每一行,在与之相关的B行中找到最近一天、一周和一个月内创建的所有行(然后按计数排序),并将它们保存在不同的DB中或缓存它们。
如果这很令人困惑,这里有一个实际的例子:假设表A有亚马逊产品,表B有产品评论。我们想显示最近4小时、1天、1周等评分最高的产品的排序列表。新产品和评论以快速的速度添加,我们希望所述列表尽可能地更新。
我目前的实现只是一个for循环(伪代码):
result = []

for product in db_products:
    reviews = db_reviews(product_id=product.id, create>=some_time)
    reviews_count = len(reviews)
    result[product]['reviews'] = reviews
    result[product]['reviews_count'] = reviews_count

sort(result, by=reviews_count)

return result

我每小时都会执行这个操作,并将结果保存在JSON文件中进行提供。问题在于这种方法并不能很好地扩展,并且计算时间很长。

那么,我应该从哪里着手解决这个问题呢?

更新:

谢谢大家的回答。但最终我学习并使用了Apache Storm。


你有多确定你需要一个包含那些数据的实际表格?如果你按需生成它,并将每小时的数据缓存一小时,每天的数据缓存一天,每月的数据缓存一个月,这样怎么样? - vlad-ardelean
3个回答

1

需求摘要

在一个数据库中有两个较大的表,您需要定期为过去的时间段(小时、天、周等)创建一些聚合并将结果存储在另一个数据库中。

我假设,一旦时间段过去,相关记录就不会再更改,换句话说,过去时期的聚合始终具有相同的结果。

建议解决方案:Luigi

Luigi是一个用于处理依赖任务的框架,其中一个典型用途是计算过去时期的聚合。

其概念如下:

  • 编写简单的Task实例,定义所需的输入数据、输出数据(称为Target)和创建目标输出的过程。
  • 任务可以被参数化,典型的参数是时间段(特定的日、小时、周等)
  • Luigi可以在中途停止任务并稍后重新开始。它将认为任何已存在目标的任务都已完成,并且不会重新运行它(您必须删除目标内容才能让它重新运行)。

简而言之:如果目标存在,则任务已完成。

这适用于多种类型的目标,例如本地文件系统中的文件、Hadoop、AWS S3以及数据库。
为了防止未完成的结果,目标实现要注意原子性,例如,文件首先在临时位置创建,只有在完成后才移动到最终位置。
在数据库中,有结构来表示某个数据库导入已完成。
您可以自由创建自己的目标实现(它必须创建某些内容并提供exists方法来检查结果是否存在)。
使用Luigi处理您的任务
对于您描述的任务,您可能已经发现了所需的一切。只需几个提示:
luigi.postgres.CopyToTable允许将记录存储到Postgres数据库中。目标将自动创建所谓的“标记表”,其中将标记所有已完成的任务。

对于其他类型的数据库也有类似的类,其中一个使用SqlAlchemy,可能涵盖您使用的数据库,请参见类luigi.contrib.sqla.CopyToTable

在Luigi文档中有一个工作示例将数据导入sqlite数据库

完整的实现超出了StackOverflow答案的可扩展范围,但我相信,您会有以下经验:

  • 完成任务所需的代码非常清晰 - 没有样板式编码,只需编写必须完成的部分。
  • 支持处理时间段 - 即使从命令行开始,例如 Efficiently triggering recurring tasks。它甚至会注意不要太早进入过去,以防止生成太多任务可能会超载您的服务器(默认值已经非常合理且可以更改)。
  • 可以选择在多个服务器上运行任务(使用提供了Luigi实现的中央调度器)。

我用Luigi处理了大量的XML文件,还制作了一些任务,将聚合数据导入数据库,并且可以推荐它(我不是Luigi的作者,只是一个快乐的用户)。

加速数据库操作(查询)

如果您的任务执行数据库查询的时间太长,则有几个选项:

如果你想用Python计算每个产品的评论数量,可以尝试使用SQL查询,它通常会更快。可以创建一个使用count在适当记录上的SQL查询,并直接返回所需的数字。使用group by,甚至可以一次获取所有产品的摘要信息。
  • 设置适当的索引,可能在“评论”表上的“产品”和“时间段”列上。这将加速查询,但要确保它不会使插入新记录变慢(太多索引可能会导致这种情况)。
  • 优化SQL查询后,即使不使用Luigi,也可能得到可行的解决方案。


    我不确定这是否是正确的答案,但是Luigi让我朝着Storm的方向前进。谢谢! - KGo
    KaranGoel,感谢您接受我的答案并指出Apache Storm,它似乎是用于实时和大规模处理的框架(以前从未听说过)。 - Jan Vlcinsky

    1

    数据仓库?汇总表是正确的方法。

    数据是否发生变化(一旦写入)?如果是,则增量更新汇总表将成为一个挑战。大多数DW应用程序没有这个问题

    在插入原始数据表时更新汇总表(日期+维度+计数+总和)。由于您每分钟只获得一个插入,INSERT INTO SummaryTable ... ON DUPLICATE KEY UPDATE ... 将非常足够,并且比每10分钟运行脚本更简单。

    从汇总表而不是原始数据(事实表)中进行任何报告。速度会更快。

    我的汇总表博客 讨论了详情。(它针对更大的DW应用程序,但应该是有用的阅读材料。)


    0
    我同意Rick的观点,汇总表对你来说是最合理的选择。每10分钟更新一次汇总表,并且只需从中提取数据,按照用户的请求进行汇总。
    此外,确保你的数据库索引正确设置以提高性能。我相信db_products.id已经设置为唯一索引了。但是,还要确保db_products.create被定义为DATE或DATETIME类型,并且也建立了索引,因为你在WHERE语句中使用了它。

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接