并行化GZip文件处理Spark

3
我有一个巨大的GZip文件列表,需要将它们转换为Parquet。由于GZip的压缩特性,这无法并行处理一个文件。
然而,由于我有很多个文件,是否有一种相对简单的方法让每个节点处理部分文件?这些文件在HDFS上。我认为我不能使用RDD架构来编写Parquet文件,因为这都是在驱动程序上完成的,而不是在节点本身上完成的。
我可以并行处理文件名列表,编写一个函数来处理本地的Parquets文件并将其保存回HDFS。但是我不知道怎么做。感觉像我错过了一些显而易见的东西,谢谢!
这被标记为重复问题,但实际上并不是如此。我完全清楚Spark读取它们作为RDD而无需担心压缩的能力,我的问题更多的是如何并行地将这些文件转换为结构化的Parquet文件。
如果我知道如何与Parquet文件交互而不使用Spark本身,我可以做这样的事情:
def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    write_csv_to_parquet_on_hdfs(file_to)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

这将允许我并行处理,但是我不知道如何在本地环境中与HDFS和Parquet进行交互。 我想知道以下内容:

1)如何做到这一点

或者..

2)如何使用PySpark以不同的方式并行化此过程


你的问题很好。这可能不是适合它的论坛,因为它似乎并不是一个编程问题。 - John Hascall
好的,这是关于在PySpark中实现并行化的内容。 - Jan van der Vegt
好的,这个问题并没有很清楚地表达出来。我的建议是:在pyspark中尝试一些方法,如果不行的话,在新的问题中把你尝试过的带上来。 - John Hascall
好的,我又扩展了问题,我会给zero323发一条消息。我不知道如何处理它,这就是为什么我寻求帮助的原因 :) 谢谢 - Jan van der Vegt
1个回答

0

我建议采用以下两种方法之一(实践中,我发现第一种方法在性能方面效果更好)。

将每个Zip文件写入单独的Parquet文件

在这里,您可以使用pyarrow将Parquet文件写入HDFS:

def convert_gzip_to_parquet(file_from, file_to):
    gzipped_csv = read_gzip_file(file_from)
    pyarrow_table = to_pyarrow_table(gzipped_csv)
    hdfs_client = pyarrow.HdfsClient()
    with hdfs_client.open(file_to, "wb") as f:
        pyarrow.parquet.write_table(pyarrow_table, f)

# Filename RDD contains tuples with file_from and file_to
filenameRDD.map(lambda x: convert_gzip_to_parquet(x[0], x[1]))

获取pyarrow.Table对象有两种方法:

  • 从pandas DataFrame中获取(在这种情况下,您还可以使用pandas的read_csv()函数):pyarrow_table = pyarrow.Table.from_pandas(pandas_df)

  • 或者手动构建它使用pyarrow.Table.from_arrays

为了使pyarrow与HDFS配合工作,需要正确设置几个环境变量,请参见此处

将所有Zip文件的行连接成一个Parquet文件

def get_rows_from_gzip(file_from):
    rows = read_gzip_file(file_from)
    return rows

# read the rows of each zip file into a Row object
rows_rdd = filenameRDD.map(lambda x: get_rows_from_gzip(x[0]))

# flatten list of lists
rows_rdd = rows_rdd.flatMap(lambda x: x)

# convert to DataFrame and write to Parquet
df = spark_session.create_DataFrame(rows_rdd)
df.write.parquet(file_to)

如果您事先知道数据的模式,将模式对象传递给create_DataFrame将加快DataFrame的创建速度。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接