我想要实现类似这样的功能:Beam/Dataflow中的批处理PCollection
以上链接中的答案是用Java编写的,而我使用的语言是Python。因此,我需要一些帮助来获得类似的构造。
具体来说,我有这个:
p = beam.Pipeline (options = pipeline_options)
lines = p | 'File reading' >> ReadFromText (known_args.input)
在此之后,我需要创建另一个
PCollection
,但是使用包含 N 行“行”的 List
,因为我的用例需要一组行。我不能逐行操作。我尝试使用变量进行计数的
ParDo
函数,将计数器 N 行与 Map
相关联进行 groupBy
。但是这些在每 1000 条记录后被重置,所以这不是我要寻找的解决方案。我阅读了链接中的示例,但我不知道如何在 Python 中执行类似的操作。我尝试将计数器保存在 Datastore 中,但是,Dataflow 读取和写入 Datastore 的速度差异相当显著。
正确的方法是什么?我不知道还有什么其他方法可以尝试。谢谢。
ReadFromText
返回元组(序列号,元素)
),否则使用beam进行此类确定性分组会很棘手(需要State
或数据驱动触发器)。如果您的管道不需要确定性分组,则可以在DoFn中维护大小为N的缓冲区,并在缓冲区满时(或在finish_bundle
中)刷新缓冲区。 - Jiayuan Ma