如何在Dataflow中对两个PCollections执行笛卡尔积操作?

4

我希望对两个PCollection进行笛卡尔积,但是任何一个PCollection都无法存入内存,因此使用side input不可行。

我的目标是:我有两个数据集。一个包含许多小型元素,另一个包含少量(约10个)非常大的元素。我想要将这两个元素取积,然后生成键值对象。

2个回答

4
我认为CoGroupByKey可能适用于您的情况:

https://cloud.google.com/dataflow/model/group-by-key#join

这就是我为类似用例做的事情。尽管我的可能没有受到内存的限制(您尝试过使用更大的机器扩展集群吗?):

PCollection<KV<String, TableRow>> inputClassifiedKeyed = inputClassified
        .apply(ParDo.named("Actuals : Keys").of(new ActualsRowToKeyedRow()));

PCollection<KV<String, Iterable<Map<String, String>>>> groupedCategories = p
[...]
.apply(GroupByKey.create());

所以这些集合都由相同的键进行索引。
然后我声明了标签:
final TupleTag<Iterable<Map<String, String>>> categoryTag = new TupleTag<>();
final TupleTag<TableRow> actualsTag = new TupleTag<>();

合并它们:

PCollection<KV<String, CoGbkResult>> actualCategoriesCombined =
        KeyedPCollectionTuple.of(actualsTag, inputClassifiedKeyed)
                .and(categoryTag, groupedCategories)
                .apply(CoGroupByKey.create());

在我的情况下,最后一步是重新格式化结果(从连续流中的标记组)。
actualCategoriesCombined.apply(ParDo.named("Actuals : Formatting").of(
    new DoFn<KV<String, CoGbkResult>, TableRow>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            KV<String, CoGbkResult> e = c.element();

            Iterable<TableRow> actualTableRows =
                    e.getValue().getAll(actualsTag);
            Iterable<Iterable<Map<String, String>>> categoriesAll =
                    e.getValue().getAll(categoryTag);

            for (TableRow row : actualTableRows) {
                // Some of the actuals do not have categories
                if (categoriesAll.iterator().hasNext()) {
                    row.put("advertiser", categoriesAll.iterator().next());
                }
                c.output(row);
            }
        }
    }))

希望这可以帮到您。再次强调 - 不确定内存限制。如果您尝试了这种方法,请告诉我们结果。

由于我在Java和数据流方面缺乏经验,我解析这个需要很长时间。如果可能的话,你能否也用Python发布一个简单的示例?我觉得扩展这个问题会比我提出一个新问题更好。 - KobeJohn
参考一下,我来自Spark,这个很简单:collection_a.cartesian(collection_b) - KobeJohn
我很乐意这样做,您介意私下联系吗? - Evgeny Minkevich

1

要创建笛卡尔积,请使用Apache Beam扩展Join

import org.apache.beam.sdk.extensions.joinlibrary.Join;

...

// Use function Join.fullOuterJoin(final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection, final V1 leftNullValue, final V2 rightNullValue)
// and the same key for all rows to create cartesian product as it is shown below:

    public static void process(Pipeline pipeline, DataInputOptions options) {
        PCollection<KV<Integer, CpuItem>> cpuList = pipeline
                .apply("ReadCPUs", TextIO.read().from(options.getInputCpuFile()))
                .apply("Creating Cpu Objects", new CpuItem()).apply("Preprocess Cpu",
                        MapElements
                                .into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptor.of(CpuItem.class)))
                                .via((CpuItem e) -> KV.of(0, e)));

        PCollection<KV<Integer, GpuItem>> gpuList = pipeline
                .apply("ReadGPUs", TextIO.read().from(options.getInputGpuFile()))
                .apply("Creating Gpu Objects", new GpuItem()).apply("Preprocess Gpu",
                        MapElements
                                .into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptor.of(GpuItem.class)))
                                .via((GpuItem e) -> KV.of(0, e)));

        PCollection<KV<Integer,KV<CpuItem,GpuItem>>>  cartesianProduct = Join.fullOuterJoin(cpuList, gpuList, new CpuItem(), new GpuItem());
        PCollection<String> finalResultCollection = cartesianProduct.apply("Format results", MapElements.into(TypeDescriptors.strings())
                .via((KV<Integer, KV<CpuItem,GpuItem>> e) -> e.getValue().toString()));
        finalResultCollection.apply("Output the results",
                TextIO.write().to("fps.batchproc\\parsed_cpus").withSuffix(".log"));
        pipeline.run();
    }


在上面的代码中,这一行是:
...
        .via((CpuItem e) -> KV.of(0, e)));
...

我创建了一个带有键为0的Map,用于输入数据中的所有可用行。结果是所有行都匹配。这相当于没有WHERE子句的SQL表达式JOIN。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接