使用PANDAS和Apache Beam

4

我是Apache Beam的新手,刚开始使用Python SDK进行开发。

关于Apache Beam,我知道Pipeline、Pcollection、Ptransform、ParDo和DoFn等高级概念。

在我的当前项目中,我们使用PANDAS实现了Pipeline,通过下面的语法读取、转换和写入文件:

我想了解一下,这种方法是否是正确的Apache Beam实现方式,因为我们仅使用PANDAS直接读取和写入文件,而不是逐个元素处理文件。

步骤:

  1. 创建Pipeline
  2. 创建输入文件路径的Pcollection
  3. 调用DoFn并传递文件路径
  4. 在DoFn内部执行所有操作(使用PANDAS读取、转换和写入)。

样例高级代码:

import **required libraries

class ActionClass(beam.DoFn):

    def process(self, file_path):
        #reading file using PANDAS into dataframe 
        df = pandas.read_csv('file_path')
        # do some transformation using pandas
        #write dataframe to output file from inside DoFn only.
        return

def run():

    p = beam.Pipeline(options=options)

    input = p | beam.io.ReadFromText('input_file_path') --reading only file path

    output = input | 'PTransform' | beam.ParDo(ActionClass)
2个回答

1
我的看法是你没有充分利用Beam的能力。
因为你的解决方案没有利用到Beam真正有用的并行处理能力。
我建议你使用ReadFromText来读取CSV,并使用Map或ParDo对数据进行转换。这样,Beam将读取CSV,并可以将数据分发到不同的工作节点上进行转换。
现在,根据你的需求,你可以直接在Beam上使用数据框架https://beam.apache.org/documentation/dsls/dataframes/overview/
  from apache_beam.dataframe.io import read_csv

with beam.Pipeline() as p:
  df = p | read_csv("gs://apache-beam-samples/nyc_taxi/misc/sample.csv")
  agg = df[['passenger_count', 'DOLocationID']].groupby('DOLocationID').sum()
  agg.to_csv('output')

0
在我看来,如果你有许多小的 CSV 文件想用 pandas 处理,那么这很可能是使用 Apache Beam 的一个有效案例。
谢谢。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接