Spark连接运行变慢的问题呈指数级增长

3

我正在尝试在两个Spark RDD之间进行连接。我有一个与类别相关联的交易日志。我已经格式化了我的交易RDD,将类别ID作为键。

transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]

categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]

事务日志的大小约为20GB(3.5亿行)。 类别列表小于1KB。
当我运行
transactions_cat.join(categories).count()

Spark开始变得非常缓慢。我有一个包含643个任务的阶段。前10个任务需要大约1分钟。然后每个任务都变得越来越慢(大约在第60个任务时需要15分钟左右)。我不确定出了什么问题。

请查看这些截图以获得更好的理解。 enter image description here enter image description here enter image description here

我正在使用Python shell运行Spark 1.1.0,使用4个工作节点,总内存为50 GB。 仅计算交易RDD非常快(30分钟)。

1个回答

7
可能的问题是Spark没有注意到您遇到了一个简单的连接问题。当您要连接的两个RDD中有一个非常小,您最好不要将其作为RDD处理。然后,您可以自己实现哈希连接hash join,这实际上比听起来简单得多。基本上,您需要:
  • 使用collect()从RDD中提取类别列表--结果通信很容易为自己付出代价(或者如果可能,首先不要使其成为RDD)
  • 将其转换为哈希表,其中一个条目包含一个键的所有值(假设您的键不唯一)
  • 对于您大型RDD中的每对,查找哈希表中的键,并为列表中的每个值生成一对(如果未找到,则该特定对不会产生任何结果)
我有一个用Scala实现的实现,如果您对翻译有问题,请随意提问,但应该很容易理解。
另一个有趣的可能性是尝试使用Spark SQL。我相信该项目的长期目标将包括自动为您完成此操作,但我不知道他们是否已经实现了这一点。

嗨,我遇到了完全相同的问题,尽管是在数据框架中。将七个10x2的数据框简单连接,然后在结果上运行count(),Spark就会失控,出现1000多个任务和14个阶段。有没有办法在Spark中修复/优化这个问题,而不需要手动连接? - Vaibhav

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接