我想使用Beam管道将SequenceMatcher函数应用于大量单词。我已经(希望)理解了所有内容,除了WriteToText部分。
我定义了一个自定义的ParDo(此处称为ProcessDataDoFn),它接受main_input和side_input,处理它们并输出类似于这个的字典。
现在的问题是,如果我像这样保留管道,它只会输出output_dictionary的键。 如果我将ProcessDataDoFn的返回值更改为json.dumps(output_dictionary),则Json将被正确编写,但格式如下:
我定义了一个自定义的ParDo(此处称为ProcessDataDoFn),它接受main_input和side_input,处理它们并输出类似于这个的字典。
{u'key': (u'string', float)}
我的管道非常简单
class ProcessDataDoFn(beam.DoFn):
def process(self, element, side_input):
... Series of operations ...
return output_dictionary
with beam.Pipeline(options=options) as p:
# Main input
main_input = p | 'ReadMainInput' >> beam.io.Read(
beam.io.BigQuerySource(
query=CUSTOM_SQL,
use_standard_sql=True
))
# Side input
side_input = p | 'ReadSideInput' >> beam.io.Read(
beam.io.BigQuerySource(
project=PROJECT_ID,
dataset=DATASET,
table=TABLE
))
output = (
main_input
| 'ProcessData' >> beam.ParDo(
ProcessDataDoFn(),
side_input=beam.pvalue.AsList(side_input))
| 'WriteOutput' >> beam.io.WriteToText(GCS_BUCKET)
)
现在的问题是,如果我像这样保留管道,它只会输出output_dictionary的键。 如果我将ProcessDataDoFn的返回值更改为json.dumps(output_dictionary),则Json将被正确编写,但格式如下:
{
'
k
e
y
'
:
[
'
s
t
r
i
n
g
'
,
f
l
o
a
t
]
如何正确地输出结果?
ProcessData
,然后当你在管道中使用它时,它变成了ProcessDataDoFn
。我相信这只是问题中的一个笔误,但纠正一下会很有帮助。 - Davos