Apache-Beam + Python:将JSON(或字典)字符串写入输出文件

5
我想使用Beam管道将SequenceMatcher函数应用于大量单词。我已经(希望)理解了所有内容,除了WriteToText部分。
我定义了一个自定义的ParDo(此处称为ProcessDataDoFn),它接受main_input和side_input,处理它们并输出类似于这个的字典。
{u'key': (u'string', float)}

我的管道非常简单

class ProcessDataDoFn(beam.DoFn):
    def process(self, element, side_input):

    ... Series of operations ...

    return output_dictionary

with beam.Pipeline(options=options) as p:

    # Main input
    main_input = p | 'ReadMainInput' >> beam.io.Read(
        beam.io.BigQuerySource(
            query=CUSTOM_SQL,
            use_standard_sql=True
        ))

    # Side input
    side_input = p | 'ReadSideInput' >> beam.io.Read(
        beam.io.BigQuerySource(
            project=PROJECT_ID,
            dataset=DATASET,
            table=TABLE
        ))

    output = (
        main_input
        | 'ProcessData' >> beam.ParDo(
            ProcessDataDoFn(),
            side_input=beam.pvalue.AsList(side_input))
        | 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
    )

现在的问题是,如果我像这样保留管道,它只会输出output_dictionary的键。 如果我将ProcessDataDoFn的返回值更改为json.dumps(output_dictionary),则Json将被正确编写,但格式如下:
{
'
k
e
y
'

:

[
'
s
t
r
i
n
g
'

,

f
l
o
a
t
]

如何正确地输出结果?


在你的代码中,类被声明为 ProcessData,然后当你在管道中使用它时,它变成了 ProcessDataDoFn。我相信这只是问题中的一个笔误,但纠正一下会很有帮助。 - Davos
1
谢谢,现在应该已经修复了。 - Michele 'Ubik' De Simoni
2个回答

7

你的输出结果看起来很不寻常。 json.dumps 应该在一行中打印 json,并且应该逐行写入文件。

也许,为了使代码更简洁,您可以添加一个额外的 map 操作,以任何所需的方式进行格式化。例如:

output = (
  main_input
  | 'ProcessData' >> beam.ParDo(
        ProcessDataDoFn(),
        side_input=beam.pvalue.AsList(side_input))
  | 'FormatOutput' >> beam.Map(json.dumps)
  | 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)

有没有理由建议将json.dumps函数映射到pcoll中的元素,而不是使用ParDo来应用DoFn?似乎DoFn具有指标和其他优点,或者您认为这对于像这样的简单转换案例来说是太多的开销了? - Davos
1
一个 Map 会被翻译成一个 ParDo,所以无论哪种适合您的用例都可以。如果您想要额外的功能,可以选择 ParDo。:) - Pablo

5
我其实已经部分解决了这个问题。
我编写的ParDoFn可以返回字典或JSON格式的字符串。在两种情况下,当Beam尝试对该输入执行操作时,问题就会出现。如果给定的PCollection是一个字典,Beam似乎会迭代该字典并只获取其键;如果该PCollection是一个字符串,则会迭代所有字符(这就是为什么JSON输出如此奇怪)。我认为解决方案相当简单:将字典或字符串封装在列表中。JSON格式化部分可以在ParDoFn级别上完成,也可以通过像您展示的Transform那样实现。

3
根据Python API 此处 的说明: 请注意,DoFn必须为每个输入PCollection元素返回一个可迭代对象。在process方法中使用yield关键字是实现此操作的一种简单方法。 - Michele 'Ubik' De Simoni
如果可以的话,我很想看看你的 ProcessDataDoFn()。你最终是不是对列表中的每个字典使用了 yield - Davos
很遗憾,这是专有代码,但是是的,yield 对我解决了问题。 - Michele 'Ubik' De Simoni
Python 中的一个奇怪特性是字符串可迭代,因此切片和列表函数变得可能。我想 PCollection 只是期望一个可迭代对象,也可能有一种将字符串转换为 PCollection 以逐字符处理的用例。 - Davos

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接