Pandas 数据框的 datetime64[ns] 类型在 Hive/Athena 中无法使用。

13

我正在开发一个Python应用程序,它可以将CSV文件转换为Hive / Athena兼容的Parquet格式,并使用fastparquet和pandas库执行此操作。 CSV文件中有时间戳值,例如2018-12-21 23:45:00,需要在Parquet文件中写入timestamp类型。以下是我正在运行的代码,

columnNames = ["contentid","processed_time","access_time"]

dtypes = {'contentid': 'str'}

dateCols = ['access_time', 'processed_time']

s3 = boto3.client('s3')

obj = s3.get_object(Bucket=bucketname, Key=keyname)

df = pd.read_csv(io.BytesIO(obj['Body'].read()), compression='gzip', header=0, sep=',', quotechar='"', names = columnNames, error_bad_lines=False, dtype=dtypes, parse_dates=dateCols)

s3filesys = s3fs.S3FileSystem()

myopen = s3filesys.open

write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)

代码运行成功,下面是Pandas创建的数据帧

contentid                 object
processed_time            datetime64[ns]
access_time               datetime64[ns]

最后,当我在Hive和Athena中查询parquet文件时,时间戳的值为+50942-11-30 14:00:00.000而不是2018-12-21 23:45:00

非常感谢任何帮助。


尝试在插入Hive时将列转换为日期时间格式pd.to_datetime(df['access_time', 'processed_time'], unit='ms', errors='coerce') - theMerakist
尝试过了,但仍然是一样的。 - prasannads
在创建DF时不要解析列,而是将其转换为datetime对象,如datetime.datetime.strptime('2018-12-21 23:45:00','%y-%m-%d %H:%m'),并应用于df的日期列。 - theMerakist
如果Athena/Hive不直接支持Python脚本生成的格式,您可能需要使用此处https://prestodb.io/docs/current/functions/datetime.html中的函数。使用“parquet-tools cat”检查模式和数据的架构。如果找不到正确的转换函数,请在此处发布时间戳格式。 - Prabhakar Reddy
3
这些回答中有没有帮助到您?我遇到了完全相同的问题。 - Severun
6个回答

8

我知道这个问题很老,但它仍然相关。

如前所述,Athena仅支持int96作为时间戳。使用fastparquet可以生成一个带有适合Athena格式的Parquet文件。重要部分是times='int96',这告诉fastparquet将pandas日期时间转换为int96时间戳。

from fastparquet import write
import pandas as pd

def write_parquet():
  df = pd.read_csv('some.csv')
  write('/tmp/outfile.parquet', df, compression='GZIP', times='int96')

非常有帮助!非常感谢! - Joshua

4

你可以尝试以下方法:

dataframe.to_parquet(file_path, compression=None, engine='pyarrow', allow_truncated_timestamps=True, use_deprecated_int96_timestamps=True)

2
我通过这种方式解决了问题。
使用to_datetime方法转换df系列
接下来,使用.dt访问器选择datetime64 [ns]的日期部分
例子:
df.field = pd.to_datetime(df.field)
df.field = df.field.dt.date

之后,雅典娜将会识别这些数据。

0

我也遇到了这个问题多次。 我的错误代码是我将索引设置为日期时间格式:

df.set_index(pd.DatetimeIndex(df.index), inplace=True)

当我使用fastparquet读取parquet文件时,它可能会提示我

OutOfBoundsDatetime: Out of bounds nanosecond timestamp: 219968-03-28 05:07:11

然而,它可以很容易地通过使用pd.read_parquet(path_file)而不是fastparquet.ParquetFile(path_file).to_pandas()来解决。

请使用pd.read_parquet(path_file)来解决此问题

这是我的解决方案,它运行良好,希望能帮助您,这样您就不需要担心如何以哪种方式编写Parquet文件了。


0

问题似乎出在 Athena 上,它只支持 int96,而当您在 pandas 中创建时间戳时,它是 int64。

我的数据框列包含一个字符串日期,名为“sdate”,我首先将其转换为时间戳。

# add a new column w/ timestamp
df["ndate"] = pandas.to_datetime["sdate"]
# convert the timestamp to microseconds
df["ndate"] = pandas.to_datetime(["ndate"], unit='us')

# Then I convert my dataframe to pyarrow
table = pyarrow.Table.from_pandas(df, preserve_index=False)

# After that when writing to parquet add the coerce_timestamps and 
# use_deprecated_int96_timstamps. (Also writing to S3 directly)
OUTBUCKET="my_s3_bucket"

pyarrow.parquet.write_to_dataset(table, root_path='s3://{0}/logs'.format(OUTBUCKET), partition_cols=['date'], filesystem=s3, coerce_timestamps='us', use_deprecated_int96_timestamps=True)


-1

我曾经遇到同样的问题,经过大量的研究,现在已经解决了。

当你进行

write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)

它在幕后使用fastparquet,而fastparquet使用与Athena不兼容的DateTime编码。

解决方案是:卸载fastparquet并安装pyarrow

  • pip uninstall fastparquet
  • pip install pyarrow

再次运行您的代码。这次应该可以工作了。:)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接