如何将CSV文件转换为Parquet

57

我是新手,正在学习大数据。我需要将一个 .csv/.txt 文件转换为 Parquet 格式。我搜索了很多资料,但没有找到直接的方法。有没有什么办法可以实现这个目标呢?

10个回答

51
我已经在如何使用Apache Drill完成此操作上发布了答案。然而,如果您熟悉Python,现在可以使用PandasPyArrow来执行此操作!

安装依赖项

使用pip

pip install pandas pyarrow

或者使用conda:
conda install pandas pyarrow -c conda-forge

分块将CSV转换为Parquet

# csv_to_parquet.py

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

csv_file = '/path/to/my.tsv'
parquet_file = '/path/to/my.parquet'
chunksize = 100_000

csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False)

for i, chunk in enumerate(csv_stream):
    print("Chunk", i)
    if i == 0:
        # Guess the schema of the CSV file from the first chunk
        parquet_schema = pa.Table.from_pandas(df=chunk).schema
        # Open a Parquet file for writing
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
    # Write CSV chunk to the parquet file
    table = pa.Table.from_pandas(chunk, schema=parquet_schema)
    parquet_writer.write_table(table)

parquet_writer.close()

我没有将此代码与Apache Drill版本进行基准测试,但根据我的经验,它非常快,每秒可以转换数万行(当然这取决于CSV文件!)。
编辑: 现在我们可以使用pyarrow.csv.read_csv直接将CSV文件读入PyArrow表中。这可能比使用Pandas CSV阅读器更快,尽管可能不太灵活。

为什么它不够灵活?(抱歉,我没有使用pyarrow的经验,只是看到了你的评论很好奇) - sphoenix
@sphoenix,我主要是指pd.read_csvpyarrow.csv.read_csv方法接受的参数数量。举个具体例子,在pd.read_csv中,sep="..."可以是正则表达式,而在pyarrow.csv.read_csv中,delimiter="..."必须是单个字符。 - ostrokach
此代码存在模式错误的风险,因为整个 CSV 的模式是从第一块数据中推断出来的。第一块可能会错误地将某列识别为 int,但最后一块可能包含小数或空值,在写入 parquet 文件时会导致错误。解决方案是在创建 parquet 文件模式之前,基于分阶段读取整个 CSV 推断数据类型。建议的代码在 此答案 中。 - the_RR

35

[对于 Python]

Pandas 现在直接支持这个功能。

只需使用 Pandas 的 read_csv 方法将 csv 文件读入 dataframe,然后使用 to_parquet 方法将该 dataframe 写入 parquet 文件。


2
为什么你会为一个Java问题提供Python解决方案? - Yehor Androsov
9
因为已经有一个版本没有提及 to_parquet(因为它是在0.21.0版本中发布的),所以我认为这对需要基于Python解决方案的人可能会有用。 - Pranav Gupta

18
您可以使用Apache Drill,如使用Drill将CSV文件转换为Apache Parquet中所述。
简要步骤: 启动Apache Drill:
$ cd /opt/drill/bin
$ sqlline -u jdbc:drill:zk=local
创建Parquet文件:
--将默认表格式设置为parquet
ALTER SESSION SET `store.format`='parquet';
--创建包含来自CSV表的所有数据的parquet表 CREATE TABLE dfs.tmp.`/stats/airport_data/` AS SELECT CAST(SUBSTR(columns[0],1,4) AS INT) `YEAR`, CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`, columns[1] as `AIRLINE`, columns[2] as `IATA_CODE`, columns[3] as `AIRLINE_2`, columns[4] as `IATA_CODE_2`, columns[5] as `GEO_SUMMARY`, columns[6] as `GEO_REGION`, columns[7] as `ACTIVITY_CODE`, columns[8] as `PRICE_CODE`, columns[9] as `TERMINAL`, columns[10] as `BOARDING_AREA`, CAST(columns[11] AS DOUBLE) as `PASSENGER_COUNT` FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;
尝试从新的Parquet文件中选择数据:
--从parquet表中选择数据
SELECT *
FROM dfs.tmp.`/stats/airport_data/*`
您可以通过转到http://localhost:8047/storage/dfs更改dfs.tmp位置(参考:CSV和Parquet)。

3
我确认这是实现这个目标的最佳且最简单的方法。Apache Hive 也可以作为一个替代选择。 - Thomas Decaux

7
以下代码是使用spark2.0的示例。与inferSchema选项相比,读取速度更快。Spark 2.0将转换为parquet文件比Spark1.6更有效率。
import org.apache.spark.sql.types._
var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
df = spark.read
          .schema(df)
          .option("header", "true")
          .option("delimiter", "\t")
          .csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")

6
我做了一个小型命令行工具,可以将CSV转换为Parquet:csv2parquet

3

1) 您可以创建外部Hive表

create  external table emp(name string,job_title string,department string,salary_per_year int)
row format delimited
fields terminated by ','
location '.. hdfs location of csv file '

2) 另一个存储Parquet文件的Hive表

create  external table emp_par(name string,job_title string,department string,salary_per_year int)
row format delimited
stored as PARQUET
location 'hdfs location were you want the save parquet file'

将表格一的数据插入到表格二中:
insert overwrite table emp_par select * from emp 

2
表emp_par已经被创建为外部表。这应该被创建为普通表,否则您将无法向其中插入数据。 - Jai Prakash

2
将csv文件作为Apache Spark中的Dataframe读取,使用spark-csv包。在将数据加载到Dataframe后,将Dataframe保存为parquet文件。
val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .option("mode", "DROPMALFORMED")
      .load("/home/myuser/data/log/*.csv")
df.saveAsParquetFile("/home/myuser/data.parquet")

2

1
import pyspark

sc = pyspark.SparkContext('local[*]')
sqlContext = pyspark.sql.SQLContext(sc)

df = sqlContext.read.csv('file:///xxx/xxx.csv')
df.write.parquet('file:///xxx/output.parquet')

虽然这段代码可能回答了问题,但提供有关它如何以及/或为什么解决问题的附加上下文将改善答案的长期价值。您可以在帮助中心找到有关编写良好答案的更多信息:stackoverflow.com/help/how-to-answer。 - abhiieor

0
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys

sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)

schema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", StringType(), True),
    StructField("col3", StringType(), True),
    StructField("col4", StringType(), True),
    StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接