名称错误:名称'pvalue'未定义。

3
在此文档(https://beam.apache.org/documentation/programming-guide/#additional-outputs)的4.5.2节中,有一个pvalue.TaggedOutput()被提及。
看起来很难导入pvalue,我已经从Apache文档中复制了导入行,并在def run()中使用了--save_main_session选项以及save_main_session=True,并在开始管道之前使用pipeline_options.view_as(SetupOptions).save_main_session = save_main_session。所有导入都对所有函数有效,所有类都可以在所有函数中使用,但是pvalue无法识别。我也尝试了所有可能的组合以及省略它们,pvalue始终未知。
我从这里的示例代码获取了所有代码:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py

尽管如此,没有p值。
NameError:名称'pvalue'未定义[在运行“generatedPtransform-1725”时]

仅当我使用Dataflowrunner时才会生成此错误,而不是使用Directrunner时。

我的DoFn示例

class Splitter(beam.DoFn):

    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

我的run()方法的示例
def run(argv=None, save_main_session=True):
    sources = [
        json.loads('{"id":72234,"value":1'),
        json.loads('{"id":23,"value":2}')
        ]

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:
           | beam.Create(sources)
           | beam.ParDo(Splitter()).with_outputs(Splitter.TAG1,Splitter.TAG2,main=Splitter.TAG1)

"

**我的导入**

"
from __future__ import absolute_import

import argparse
import logging
import re
import json
import datetime
from jsonschema import validate

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

1
你的代码中有from apache_beam import pvalue这一行吗?你也在使用这个吗? - undefined
是的,我已经添加了我的引入,也许我以某种方式做错了。 - undefined
你试过像这样使用吗?导入apache beam - apache_beam.pvalue.TaggedOutput。我在想pvalue是否会发生变化。或者你可以使用beam.pvalue.TaggedOutput。 - undefined
2个回答

3

由于在使用Apache Beam时,依赖应该在类和函数中声明,因此您应该尝试在Splitter类中导入pvalue

您的代码应该像这样:

class Splitter(beam.DoFn):
    from apache_beam import pvalue
    TAG1 = 'kleintje'
    TAG2 = 'grootje'

    def process(self, element):
        splittertid = element.get('id')

        if splittertid < 100:
            yield pvalue.TaggedOutput(self.TAG1, element)
        else:
            yield pvalue.TaggedOutput(self.TAG2, element)

由于代码在本地运行,因此您可以正常使用 from apache_beam import pvalue;但是,在使用Dataflowrunner时,代码应遵循一定的结构以正确处理依赖关系


1
不,这个问题应该通过使用--save_main_session来解决。 - undefined
直接在类中导入没有起作用。我不得不在使用它的类方法中导入pvalue(apache_beam v2.38)。 - undefined

2
一些依赖于Dataflowrunner的东西出了问题。通过加载错误的依赖项,然后再将其删除,事情突然开始正常工作。 像from apache_beam import pvalue这样导入似乎是正确的。 也许在这里学到的教训是,可能存在损坏的依赖关系,您可以通过强制重新安装来修复这些依赖关系,触发旧的或错误的apache_beam软件包的安装和卸载。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接