我正在开发一个Python应用程序,它可以将CSV文件转换为Hive / Athena兼容的Parquet格式,并使用fastparquet和pandas库执行此操作。 CSV文件中有时间戳值,例如2018-12-21 23:45:00
,需要在Parquet文件中写入timestamp
类型。以下是我正在运行的代码,
columnNames = ["contentid","processed_time","access_time"]
dtypes = {'contentid': 'str'}
dateCols = ['access_time', 'processed_time']
s3 = boto3.client('s3')
obj = s3.get_object(Bucket=bucketname, Key=keyname)
df = pd.read_csv(io.BytesIO(obj['Body'].read()), compression='gzip', header=0, sep=',', quotechar='"', names = columnNames, error_bad_lines=False, dtype=dtypes, parse_dates=dateCols)
s3filesys = s3fs.S3FileSystem()
myopen = s3filesys.open
write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)
代码运行成功,下面是Pandas创建的数据帧
contentid object
processed_time datetime64[ns]
access_time datetime64[ns]
最后,当我在Hive和Athena中查询parquet文件时,时间戳的值为+50942-11-30 14:00:00.000
而不是2018-12-21 23:45:00
非常感谢任何帮助。
pd.to_datetime(df['access_time', 'processed_time'], unit='ms', errors='coerce')
- theMerakist