Apache Beam:ReadFromText与ReadAllFromText的区别

4

我正在运行一个Apache Beam管道,从Google Cloud Storage读取文本文件,对这些文件进行一些解析,然后将解析后的数据写入Bigquery。

为了简洁起见,忽略解析和google_cloud_options,我的代码如下:(使用带有GCP附加组件和Dataflow作为运行器的apache-beam 2.5.0)

p = Pipeline(options=options)

lines = p | 'read from file' >> 
beam.io.ReadFromText('some_gcs_bucket_path*')  |  \
    'parse xml to dict' >> beam.ParDo(
        beam.io.WriteToBigQuery(
            'my_table',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    p.run()

这段代码在少量输入文件时可以正常运行,并成功将相关数据附加到我的Bigquery表中。但是当我增加输入文件的数量到大约800,000个时,就会出现以下错误信息:
“BoundedSource.split() 操作返回的 BoundedSource 对象的总大小超过了允许的限制。”
我在这里找到了一个建议使用 ReadAllFromText 而不是 ReadFromText 的文章, 用于解决类似问题: Troubleshooting apache beam pipeline import errors [BoundedSource objects is larger than the allowable limit]
然而,当我进行替换后,又遇到了以下错误:
Traceback (most recent call last):
  File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 240, in <module>
    xmltobigquery.run_dataflow()
  File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 220, in run_dataflow
    'parse xml to dict' >> beam.ParDo(XmlToDictFn(), job_spec=self.job_spec) | \
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 831, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 488, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/textio.py", line 470, in expand
    return pvalue | 'ReadAllFiles' >> self._read_all_files
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
    label or transform.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 416, in expand
    | 'ReadRange' >> ParDo(_ReadRange(self._source_from_file)))
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
    label or transform.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 568, in expand
    | 'RemoveRandomKeys' >> Map(lambda t: t[1]))
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 494, in expand
    windowing_saved = pcoll.windowing
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
    self.producer.inputs)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
    return inputs[0].windowing
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
    self.producer.inputs)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
    return inputs[0].windowing
AttributeError: 'PBegin' object has no attribute 'windowing'. 

有什么建议吗?

请分享您的代码最终版本,以便我们了解错误的实际情况。 - A.Queue
此外,我发现这个答案提到了相同的错误,并且添加PColl修复了它。 - A.Queue
原来使用“ReadAllFromText”方法时,还需要添加一个“Create”方法(与“ReadFromText”示例不同,不会自动包含)。这解决了我的问题,谢谢。 - Richardt Benade REZCO
@richardt-benade-rezco,太酷了!请随意分享最终代码的样子。 - A.Queue
@Richardt Benade REZCO,您能否将解决方案和可工作的代码发布为答案,以造福社区? - Philipp Sh
2个回答

6
我遇到了同样的问题。正如Richardt所提到的,必须显式调用beam.Create。另一个挑战是如何与模板参数一起使用此模式,因为beam.Create仅支持在内存中的数据,如文档中所述(链接)
Google Cloud Support在这种情况下帮助了我,我想与您分享解决方案。诀窍是使用虚拟字符串创建管道,然后使用映射lambda在运行时读取输入:
class AggregateOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            help='Path of the files to read from')
        parser.add_value_provider_argument(
            '--output',
            help='Output files to write results to')

def run():
    logging.info('Starting main function')

    pipeline_options = PipelineOptions()
    pipeline = beam.Pipeline(options=pipeline_options)
    options = pipeline_options.view_as(AggregateOptions)

    steps = (
            pipeline
            | 'Create' >> beam.Create(['Start'])  # workaround to kickstart the pipeline
            | 'Read Input Parameter' >> beam.Map(lambda x: options.input.get())  # get the real input param
            | 'Read Data' >> beam.io.ReadAllFromText()
            | # ... other steps

希望这个答案有所帮助。

你在哪里指定输入/输出文件?是在运行管道脚本时作为解析器参数吗? - Luiscri
1
@Luiscri 为了测试目的,您可以在本地运行代码时添加参数。但是,如果要在GCloud DataFlow中运行,请通过创建元数据文件并运行所描述的Python命令来生成自定义模板。将元数据文件上传到与模板保存在同一Cloud Storage位置的后缀为_metadata的位置。然后,该模板就可以在Dataflow中进行选择 - philsch

0
回答原问题: ReadFromText 取文件模式参数,而 ReadAllFromText 则将其文件模式作为管道输入。
# ReadFromText
(p
 | beam.io.ReadFromText("myfile.csv"))

# ReadAllFromText
(p
 | beam.Create(["myfile1.csv", "myfile2.csv", "myfile3.csv"])
 | beam.io.ReadAllFromText())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接