我使用 pyathena 运行了一个查询,并创建了 pandas 数据框。是否有一种方法可以直接将 pandas 数据框写入 AWS athena 数据库,就像对 MYSQL 数据库使用 data.to_sql 一样。
下面是一个数据框代码示例,需要写入 AWS athena 数据库:
data=pd.DataFrame({'id':[1,2,3,4,5,6],'name':['a','b','c','d','e','f'],'score':[11,22,33,44,55,66]})
我使用 pyathena 运行了一个查询,并创建了 pandas 数据框。是否有一种方法可以直接将 pandas 数据框写入 AWS athena 数据库,就像对 MYSQL 数据库使用 data.to_sql 一样。
下面是一个数据框代码示例,需要写入 AWS athena 数据库:
data=pd.DataFrame({'id':[1,2,3,4,5,6],'name':['a','b','c','d','e','f'],'score':[11,22,33,44,55,66]})
另一种现代(截至2020年2月)实现此目标的方法是使用aws-data-wrangler库。它自动化了许多数据处理中的例行(有时很烦人的)任务。
结合问题的情况,代码如下所示:
import pandas as pd
import awswrangler as wr
data=pd.DataFrame({'id':[1,2,3,4,5,6],'name':['a','b','c','d','e','f'],'score':[11,22,33,44,55,66]})
# Typical Pandas, Numpy or Pyarrow transformation HERE!
wr.pandas.to_parquet( # Storing the data and metadata to Data Lake
dataframe=data,
database="database",
path="s3://your-s3-bucket/path/to/new/table",
partition_cols=["name"],
)
这非常有帮助,因为aws-data-wrangler知道如何从路径中解析表名(但您也可以在参数中提供表名),并根据数据框在Glue目录中定义适当的类型。
它还有助于直接将数据查询到Pandas数据框中:
df = wr.pandas.read_table(database="dataase", table="table")
整个过程将会快速且方便。
写作时排名最高的答案使用的是旧版本的API,现在已经不再适用。
文档现在介绍了这个往返过程。
import awswrangler as wr
import pandas as pd
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
# Storing data on Data Lake
wr.s3.to_parquet(
df=df,
path="s3://bucket/dataset/",
dataset=True,
database="my_db",
table="my_table"
)
# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)
# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")
AWS Athena 的存储是 S3
,只能读取 S3 文件中的数据。之前无法像其他数据库一样将数据直接写入 Athena
数据库。
缺少支持
以进行 insert into ...
。
作为解决方法
,用户可以执行以下操作来使其正常工作。
1. You need to write the pandas output to a file,
2. Save the file to S3 location, from where the AWS Athena is reading.
希望这能给你一些指导。
2020年5月1日更新。
2019年9月19日,AWS
宣布支持向Athena
插入数据,这使得上述答案中的某个陈述是错误的
。尽管我提供的解决方案仍然可用,但随着AWS
的宣布,另一个可能的解决方案也出现了。
正如AWS文档
所建议的那样,此功能将允许您发送insert
语句,Athena
将把数据写回到源表S3位置
的新文件中。因此,AWS
已经解决了你写入数据到S3
文件的问题。
只需要注意,Athena
会将插入的数据写入单独的文件。
这里是文档。
一个选项是使用:
pandas_df.to_parquet(file, engine="pyarrow)
首先需要将其保存到parquet格式的临时文件中。为此,您需要安装pyarrow依赖项。一旦该文件被本地保存,您可以使用Python的aws sdk将其推送到S3。
现在可以通过执行以下查询在Athena中创建一个新表:
CREATE EXTERNAL TABLE IF NOT EXISTS 'your_new_table'
(col1 type1, col2 type2)
PARTITIONED BY (col_partitions_if_neccesary)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
LOCATION 's3 location of your parquet file'
tblproperties ("parquet.compression"="snappy");
另一个选择是使用pyathena。从官方文档中获取示例:
import pandas as pd
from urllib.parse import quote_plus
from sqlalchemy import create_engine
conn_str = "awsathena+rest://:@athena.{region_name}.amazonaws.com:443/"\
"{schema_name}?s3_staging_dir={s3_staging_dir}&s3_dir={s3_dir}&compression=snappy"
engine = create_engine(conn_str.format(
region_name="us-west-2",
schema_name="YOUR_SCHEMA",
s3_staging_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/"),
s3_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/")))
df = pd.DataFrame({"a": [1, 2, 3, 4, 5]})
df.to_sql("YOUR_TABLE", engine, schema="YOUR_SCHEMA", index=False, if_exists="replace", method="multi")