在Cascading中构建自定义连接逻辑,确保仅使用MAP_SIDE。

10

我有三个级联管道(一个用于连接其他两个),描述如下:

  • LHSPipe - (更大的尺寸)

enter image description here

  • RHSPipes - (较小的尺寸,可能适合内存)

enter image description here

伪代码如下,此示例涉及两次连接:

IF F1DecidingFactor = YES then Join LHSPipe with RHS Lookup#1 BY (LHSPipe.F1Input = RHS Lookup#1.Join#F1) and set the lookup result (SET LHSPipe.F1Output = Result#F1) Otherwise SET LHSPipe.F1Output = N/A

F2计算采用相同的逻辑。

期望输出结果如下:

enter image description here

这种情况迫使我选择自定义连接操作,因为IF-ELSE决定是否连接。

考虑到以上情况,我想使用MAP-SIDE连接(将RHSPipe保留在MAP任务节点的内存中),我考虑了以下可能的解决方案,每个方案都有其利弊。请就此给出您的建议。

选项#1:

CoGroup - 我们可以使用CoGroup和BufferJoiner构建自定义连接逻辑,然后进行自定义连接操作,但这不能确保MAP-SIDE连接。

选项#2:

HashJoin - 它确保MAP-SIDE连接,但据我所见无法使用此方法来构建自定义连接。

请纠正我的理解并提出您的意见以满足这个要求。
谢谢。

你能提供你的示例代码吗?另外,你想在自定义连接中做什么? - Ambrish
样例输入数据和期望输出将会很有帮助。 - Ambrish
你考虑过将数据分成子集吗? - kpie
你尝试过下面答案中提供的解决方案了吗? - Ambrish
1个回答

1

我能想到的最好的解决方法是修改你的小型数据集。你可以在小型数据集中添加一个新字段(F1DecidingFactor)。F1Result的值应该如下:

伪代码

if F1DecidingFactor == "Yes" then
    F1Result = ACTUAL_VALUE
else
    F1Result = "N/A"

结果表格
|F1#Join|F1#Result|F1#DecidingFactor|
|    Yes|        0|             True|
|    Yes|        1|            False|
|     No|        0|              N/A|
|     No|        1|              N/A|

你也可以通过级联来完成上述操作。
完成后,你可以进行地图端的连接。
如果无法修改较小的数据集,则我有两个解决方案。 选项1 向小管道添加新字段,相当于您的决定因素(即F1DecidingFactor_RHS = Yes)。然后将其包含在连接条件中。一旦连接完成,您将仅在满足此条件的行中拥有值。否则它将为null / blank。示例代码: 主类
import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTestOption2 {
    public StackHashJoinTestOption2() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields f1DecidingFactor = new Fields("F1DecidingFactor");
        Fields f2DecidingFactor = new Fields("F2DecidingFactor");
        Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
        Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");

        Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
        Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);

        Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
        Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);

        Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // New field to small pipe. Expected Fields:
        // F1Join F1Result F1DecidingFactor_RHS
        rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // New field to small pipe. Expected Fields:
        // F2Join F2Result F2DecidingFactor_RHS
        rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);

        // Joining first small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
        Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin());

        // Joining second small pipe. Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
        Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin());

        Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);

        result = new Discard(result, f1DecidingFactorRhs);
        result = new Discard(result, f2DecidingFactorRhs);

        // result Pipe should have expected result
    }
}

选项2

如果您想要默认值而不是null / blank,则建议您首先使用默认Joiners进行HashJoin,然后使用适当的值更新元组的函数。像这样:

主类

import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;

public class StackHashJoinTest {
    public StackHashJoinTest() {
        Fields f1Input = new Fields("F1Input");
        Fields f2Input = new Fields("F2Input");
        Fields f1Join = new Fields("F1Join");
        Fields f2Join = new Fields("F2Join");

        Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");

        // Large Pipe fields : 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input
        Pipe largePipe = new Pipe("large-pipe");

        // Small Pipe 1 Fields : 
        // F1Join F1Result
        Pipe rhsOne = new Pipe("small-pipe-1");

        // Small Pipe 2 Fields : 
        // F2Join F2Result
        Pipe rhsTwo = new Pipe("small-pipe-2");

        // Joining first small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
        Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin());

        // Joining second small pipe. 
        // Expected fields after join: 
        // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
        Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin());

        Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);

        // result Pipe should have expected result
    }
}

更新功能。
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;

public class TestFunction extends BaseOperation<Void> implements Function<Void> {

    private static final long serialVersionUID = 1L;

    private static final String DECIDING_FACTOR = "No";
    private static final String DEFAULT_VALUE = "N/A";

    // Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"
    public TestFunction() {
        super(Fields.ARGS);
    }

    @Override
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) {
        TupleEntry arguments = call.getArguments();

        TupleEntry result = new TupleEntry(arguments);

        if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F1Output", DEFAULT_VALUE);
        }

        if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
            result.setString("F2Output", DEFAULT_VALUE);
        }

        call.getOutputCollector().add(result);
    }

}

参考资料

这应该可以解决你的问题。如果有帮助,请告诉我。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接