我是新手,正在学习大数据。我需要将一个 .csv
/.txt
文件转换为 Parquet 格式。我搜索了很多资料,但没有找到直接的方法。有没有什么办法可以实现这个目标呢?
使用pip
:
pip install pandas pyarrow
conda
:conda install pandas pyarrow -c conda-forge
# csv_to_parquet.py
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = '/path/to/my.tsv'
parquet_file = '/path/to/my.parquet'
chunksize = 100_000
csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False)
for i, chunk in enumerate(csv_stream):
print("Chunk", i)
if i == 0:
# Guess the schema of the CSV file from the first chunk
parquet_schema = pa.Table.from_pandas(df=chunk).schema
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
# Write CSV chunk to the parquet file
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
pyarrow.csv.read_csv
直接将CSV文件读入PyArrow表中。这可能比使用Pandas CSV阅读器更快,尽管可能不太灵活。[对于 Python]
Pandas 现在直接支持这个功能。
只需使用 Pandas 的 read_csv 方法将 csv 文件读入 dataframe,然后使用 to_parquet 方法将该 dataframe 写入 parquet 文件。
$ cd /opt/drill/bin $ sqlline -u jdbc:drill:zk=local创建Parquet文件:
--将默认表格式设置为parquet ALTER SESSION SET `store.format`='parquet';尝试从新的Parquet文件中选择数据:
--创建包含来自CSV表的所有数据的parquet表 CREATE TABLE dfs.tmp.`/stats/airport_data/` AS SELECT CAST(SUBSTR(columns[0],1,4) AS INT) `YEAR`, CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`, columns[1] as `AIRLINE`, columns[2] as `IATA_CODE`, columns[3] as `AIRLINE_2`, columns[4] as `IATA_CODE_2`, columns[5] as `GEO_SUMMARY`, columns[6] as `GEO_REGION`, columns[7] as `ACTIVITY_CODE`, columns[8] as `PRICE_CODE`, columns[9] as `TERMINAL`, columns[10] as `BOARDING_AREA`, CAST(columns[11] AS DOUBLE) as `PASSENGER_COUNT` FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;
--从parquet表中选择数据 SELECT * FROM dfs.tmp.`/stats/airport_data/*`您可以通过转到
http://localhost:8047/storage/dfs
更改dfs.tmp
位置(参考:CSV和Parquet)。import org.apache.spark.sql.types._
var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
df = spark.read
.schema(df)
.option("header", "true")
.option("delimiter", "\t")
.csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
1) 您可以创建外部Hive表
create external table emp(name string,job_title string,department string,salary_per_year int)
row format delimited
fields terminated by ','
location '.. hdfs location of csv file '
2) 另一个存储Parquet文件的Hive表
create external table emp_par(name string,job_title string,department string,salary_per_year int)
row format delimited
stored as PARQUET
location 'hdfs location were you want the save parquet file'
insert overwrite table emp_par select * from emp
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.load("/home/myuser/data/log/*.csv")
df.saveAsParquetFile("/home/myuser/data.parquet")
import pyspark
sc = pyspark.SparkContext('local[*]')
sqlContext = pyspark.sql.SQLContext(sc)
df = sqlContext.read.csv('file:///xxx/xxx.csv')
df.write.parquet('file:///xxx/output.parquet')
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
schema = StructType([
StructField("col1", StringType(), True),
StructField("col2", StringType(), True),
StructField("col3", StringType(), True),
StructField("col4", StringType(), True),
StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')
pyarrow
的经验,只是看到了你的评论很好奇) - sphoenixpd.read_csv
和pyarrow.csv.read_csv
方法接受的参数数量。举个具体例子,在pd.read_csv
中,sep="..."
可以是正则表达式,而在pyarrow.csv.read_csv
中,delimiter="..."
必须是单个字符。 - ostrokach