如何从 Apache Beam Python 的 PCollection 中创建 N 个元素的组?

7

我想要实现类似这样的功能:Beam/Dataflow中的批处理PCollection

以上链接中的答案是用Java编写的,而我使用的语言是Python。因此,我需要一些帮助来获得类似的构造。

具体来说,我有这个:

 p = beam.Pipeline (options = pipeline_options)
 lines = p | 'File reading' >> ReadFromText (known_args.input)

在此之后,我需要创建另一个 PCollection,但是使用包含 N 行“行”的 List,因为我的用例需要一组行。我不能逐行操作。
我尝试使用变量进行计数的 ParDo 函数,将计数器 N 行与 Map 相关联进行 groupBy。但是这些在每 1000 条记录后被重置,所以这不是我要寻找的解决方案。我阅读了链接中的示例,但我不知道如何在 Python 中执行类似的操作。
我尝试将计数器保存在 Datastore 中,但是,Dataflow 读取和写入 Datastore 的速度差异相当显著。
正确的方法是什么?我不知道还有什么其他方法可以尝试。谢谢。

你能否重新表达一下你想要实现什么?你想要将整个输入文件(所有行)作为单个列表吗? - Marcin Zablocki
嗨@MarcinZablocki,不,我想要一个PCollection的List,其中包含来自输入文件的N行,例如:如果N为2且输入为“1,2,3,4,5,6,7,8”,其中逗号是换行符,我想要一个类似于这样的PCollection:PCollection [List(1,2),List(3,4),List(5,6),List(7,8)]。 - Luis Felipe Muñoz
1
如果输入是“1,2,3,4,5,6,7”,并且N=2,那么输出的PCollection应该是什么样子? - Arjun Kay
PCollection是无序的。除非您的输入包含顺序信息(例如ReadFromText返回元组(序列号,元素)),否则使用beam进行此类确定性分组会很棘手(需要State或数据驱动触发器)。如果您的管道不需要确定性分组,则可以在DoFn中维护大小为N的缓冲区,并在缓冲区满时(或在finish_bundle中)刷新缓冲区。 - Jiayuan Ma
1
这个问题似乎与你的类似 - 答案是使用 Top transform - Lefteris S
顺序并不重要,重要的是在输出元组中有N个寄存器,问题是将一个PCollection转换为N行的PCollections列表。 - Luis Felipe Muñoz
1个回答

10
假设分组顺序不重要,您可以在一个DoFn内进行分组。
class Group(beam.DoFn):
  def __init__(self, n):
     self._n = n
     self._buffer = []

  def process(self, element):
     self._buffer.append(element)
     if len(self._buffer) == self._n:
        yield list(self._buffer)
        self._buffer = []

  def finish_bundle(self):
     if len(self._buffer) != 0:
        yield list(self._buffer)
        self._buffer = []

lines = p | 'File reading' >> ReadFromText(known_args.input)
          | 'Group' >> beam.ParDo(Group(known_args.N)
          ...

当我运行它时,出现了这个错误:RuntimeError: Finish Bundle应该只输出WindowedValue类型,但得到了<class 'list'> - undefined
这个如何考虑到最后可能会剩下一些元素呢?例如,如果我们有奇数个元素,那么就会有一个元素不会被处理。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接