我是Apache Beam的新手,刚开始使用Python SDK进行开发。
关于Apache Beam,我知道Pipeline、Pcollection、Ptransform、ParDo和DoFn等高级概念。
在我的当前项目中,我们使用PANDAS实现了Pipeline,通过下面的语法读取、转换和写入文件:
我想了解一下,这种方法是否是正确的Apache Beam实现方式,因为我们仅使用PANDAS直接读取和写入文件,而不是逐个元素处理文件。
步骤:
- 创建Pipeline
- 创建输入文件路径的Pcollection
- 调用DoFn并传递文件路径
- 在DoFn内部执行所有操作(使用PANDAS读取、转换和写入)。
样例高级代码:
import **required libraries
class ActionClass(beam.DoFn):
def process(self, file_path):
#reading file using PANDAS into dataframe
df = pandas.read_csv('file_path')
# do some transformation using pandas
#write dataframe to output file from inside DoFn only.
return
def run():
p = beam.Pipeline(options=options)
input = p | beam.io.ReadFromText('input_file_path') --reading only file path
output = input | 'PTransform' | beam.ParDo(ActionClass)