将整个关系加载到UDF中的Apache Pig

5

我有一个与2个Pig关系有关的pig脚本,称为A和B。 A是一个小关系,而B是一个大关系。我的UDF应该在每台机器上将所有A加载到内存中,然后在处理B时使用它。目前我是这样做的。

A = foreach smallRelation Generate ...
B = foreach largeRelation Generate propertyOfB;
store A into 'templocation';
C = foreach B Generate CustomUdf(propertyOfB);

我将每台机器从“templocation”加载以获取A。这样做是有效的,但我有两个问题:
  1. 我的理解是应该以某种方式使用HDFS缓存,但我不确定如何直接将关系加载到HDFS缓存中。
  2. 当我重新加载UDF中的文件时,我必须编写逻辑来解析从A输出到文件的输出,而我宁愿直接使用bags和tuples(是否有内置的Pig Java函数可将Strings解析回Bag/Tuple形式?)。
有人知道该如何处理吗?

A和B是否有可以进行JOIN的列? - alexeipab
在这种情况下,是的,它们具有相同的数据并且可以连接。但我需要将A的每一行与B的每一行进行比较。我猜我可以做一个交叉连接,但那不是更低效吗?我将比必要多处理B A-1次,并且我将失去一次运行所有B行对单个A行的能力,而这是必需的。 - Manny
你能提供一个输入和输出数据的例子吗? - alexeipab
A: {id: chararray,attributes: {tuple_of_tokens: (token: chararray)}},C是一个包含两个字段元组的集合,其中第一个字段是A的ID,第二个字段是包含该A的前N个B的元组。我希望有一个通用的解决方案来存储UDF中之前pig结果的地方,而这并不是我需要存储的唯一位置。 - Manny
如果您可以在id列上执行连接操作,则可以使用复制连接(replicated join),这是一种映射端连接(map side join)。J = JOIN B by (id), A by (id) as 'replicated';Apache Pig将在每个数据节点上将A加载到缓存中。然后将J传递给UDF,我认为这将在Map阶段进行,因此将非常高效。 - alexeipab
我正在比较每个A和每个B,所以它实际上不是一个JOIN,而是一个CROSS。无论如何,我想知道如何最好地将整个PIG关系传递给UDF,然后重新加载它们,这比解决这个特定问题更重要。 - Manny
1个回答

1
这里有一个对你有用的技巧。
首先,在A上进行GROUP ALL操作,将所有数据“收集”到一个字段中。然后在A和B上人为地添加一个共同的字段并将它们连接起来。这样,对于增强后的B中的每个元组,您都可以使用A的完整数据来运行UDF。
就像这样:
(假设在A中原来有字段fa1,fa2,fa3,在B中有字段fb1,fb2)
-- add an artificial join key with value 'xx'
B_aux = FOREACH B GENERATE 'xx' AS join_key, fb1, fb2;
A_all = GROUP A ALL;
A_aux = FOREACH A GENERATE 'xx' AS join_key, $1;
A_B_JOINED = JOIN B_aux BY join_key, A_aux BY join_key USING 'replicated';

C = FOREACH A_B_JOINED GENERATE CustomUdf(fb1, fb2, A_all);

由于这是复制连接,因此它也只是映射端连接。

我认为我的当前解决方案只是将A保存在HDFS中,然后在所有的reducer中加载它比进行笛卡尔积更有效率。希望能找到一种非hacky的方法来完成所有这些操作而不需要任何技巧。 - Manny
抱歉,我把A和B搞混了。我以为A是更大的关系,而B是更小的。我已经修改了我的代码。我现在的代码根本不是笛卡尔积,只是映射端连接,非常高效,你不必在你的UDF中处理读取/解析A(想象一下你想在本地模式下测试它,那么你的UDF将无法工作)。 - Dexin Wang
GROUP ALL 不是在映射端进行的,只有连接操作是。 - bridiver
@DexinWang 不确定这是否有效。你测试过吗?有参考资料吗? - John Jiang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接