使用Pandas的.to_sql将JSON列写入Postgres

32

ETL过程中,我需要从一个Postgres数据库中提取和加载一个JSON列到另一个数据库。我们使用Pandas来完成这个任务,因为它有很多读写不同源/目的地数据的方法,所有的转换都可以使用Python和Pandas编写。说实话,我们对这种方法非常满意...但是我们遇到了一个问题。

通常情况下,读写数据非常容易。你只需使用pandas.read_sql_table从源中读取数据,然后使用pandas.to_sql将其写入目标。但是,由于源表之一具有Postgres中的JSON类型列,to_sql函数崩溃,并显示以下错误消息。

    df.to_sql(table_name, analytics_db)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/core/generic.py", line 1201, in to_sql
    chunksize=chunksize, dtype=dtype)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 470, in to_sql
    chunksize=chunksize, dtype=dtype)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 1147, in to_sql
    table.insert(chunksize)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 663, in insert
    self._execute_insert(conn, keys, chunk_iter)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 638, in _execute_insert
    conn.execute(self.insert_statement(), data)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
    context)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", line 459, in do_executemany
    cursor.executemany(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'
3个回答

54
我在网上寻找了解决方案,但没有找到任何内容,下面是我们想出的方法(可能存在更好的方法,但至少这是一个开始,如果其他人遇到相同的问题,可以尝试使用这个方法):
to_sql 中指定 dtype 参数。
我们从:df.to_sql(table_name, analytics_db) 修改为 df.to_sql(table_name, analytics_db, dtype={'name_of_json_column_in_source_table': sqlalchemy.types.JSON}) ,它完美的解决了问题。

dtype = {'name_of_json_column_in_source_table': sqlalchemy.types.JSON} 如果源数据类型为jsonp,则此方法同样适用。 - Shriganesh Kolhe
1
即使将 Pandas 数据帧中的字典列写入 MySQL 8.0.19 中的 JSON 列,设置 dtype={'name_of_json_column_in_source_table': sqlalchemy.types.JSON} 仍然有效。谢谢! - nonbeing
我在我的 JSON 列“scans”中遇到一个错误:ValueError: scans (<class 'sqlalchemy.sql.sqltypes.JSON'>) 不是一个字符串。 - Raksha

11

如果你使用json.dumps()重新创建JSON列,那么你就设置好了。通过这种方式,可以使用pandas的.to_sql()方法来写入数据,也可以使用PostgreSQL的更快的COPY方法(通过psycopg2的copy_expert()或sqlalchemy的raw_connection())。

为了简单起见,我们假设有一个字典列应该写入到一个JSON(B)列中:

import json
import pandas as pd

df = pd.DataFrame([['row1',{'a':1, 'b':2}],
                   ['row2',{'a':3,'b':4,'c':'some text'}]],
                  columns=['r','kv'])

# conversion function:
def dict2json(dictionary):
    return json.dumps(dictionary, ensure_ascii=False)

# overwrite the dict column with json-strings
df['kv'] = df.kv.map(dict2json)

6

我无法评论 peralmq的答案,但在postgresql JSONB的情况下,您可以使用以下方法:

from sqlalchemy import dialects
dataframe.to_sql(..., dtype={"json_column":dialects.postgresql.JSONB})

我的JSON列"scans"出现了一个错误: ValueError: scans (<class 'sqlalchemy.dialects.postgresql.json.JSONB'>) 不是一个字符串。 - Raksha

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接