将Cassandra数据读入pandas的Python方法

30

如何将Cassandra数据快速地读入pandas?我目前使用以下代码,但速度非常慢...

import pandas as pd

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

df = pd.DataFrame()

for row in session.execute(sql_query):
    df = df.append(pd.DataFrame(row, index=[0]))

df = df.reset_index(drop=True).fillna(pd.np.nan)

读取1000行需要1分钟时间,我还有“更多”要读取…如果我在DBeaver中运行相同的查询,我可以在一分钟内获得全部结果(约40k行)。

谢谢!!!


您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - ptrj
session.execute(sql_query) 的结果是一个特殊的 <cassandra.cluster.ResultSet at 0x1b4b61d0> 可迭代对象。它的行可以是元组、命名元组或字典。 - ragesz
我明白了。不过,最好先将其转换为列表,例如 lst=[]; for row in session...: lst.append(row),如果没有其他方法的话。然后再连接结果:df = pd.concat(lst)。这样你就可以避免昂贵的 40k 次调用 pd.DataFrame.append - ptrj
6个回答

56

我在官方邮件列表中找到了答案(它能完美地工作):

您好,

尝试定义自己的pandas行工厂:

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT ..."
rslt = session.execute(query, timeout=None)
df = rslt._current_rows

这是我做事的方式 - 它应该会更快...

如果你发现了更快的方法 - 我很感兴趣 :)

迈克尔


3
这应该被标记为答案,它简洁、清晰且通用。 - EndsOfInvention
即使对于陌生的Cassandra类型,它也能完美运行。 - Zachary Oldham
2
在读取数据时效果良好,但写回数据时可能会出现问题,因为pandas必须猜测数据类型。例如:我读取了一个有许多空行的int列,pandas猜测为float类型,然后在插入到类似表中时,CQL会因为列类型错误而报错。 - JosiahJohnston

16

我在Python 3中所做的是:

query = "SELECT ..."
df = pd.DataFrame(list(session.execute(query)))

1

我使用了row_factory解决方案几周,但在尝试将数据框写入具有相同结构的另一个表时遇到了数据类型问题。Pandas猜测一个带有许多空字段的int列的float数据类型。在写入期间,Cassandra驱动程序抱怨类型不匹配。

TypeError: Received an argument of invalid type for column "frequency". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'float'>; (required argument is not an integer)

Pandas的整数列不支持NaN或None,因此最好的选择可能是将该列作为Python对象。

一个快速的解决方法是调整pandas_factory以避免pandas推断。这并不是一个理想的通用策略:

def pandas_factory(colnames, rows):
    df = pd.DataFrame(rows, columns=colnames, dtype=object)
    return df

我也发现,如果我不想使用行工厂,可以这样做:df = pandas.DataFrame(result.all())

作为一个临时解决方案,我希望有一个强大的 result_to_df() 函数,它使用result.column_types(例如:cassandra.cqltypes.Int32Type)并对将其转换为python对象或numpy类型做出良好的猜测。如果我有时间编写这个函数,我会编辑这个答案。Pandas 的 read_cqlto_cql 是理想的选择,但可能超出我的带宽范围。


0

你可以在 pandas DataFrame 中运行一个循环,就能完成任务了!

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
        auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
data = session.execute("SELECT * FROM <table_name>;")

df = pd.DataFrame([d for d in data])

0

将Cassandra数据快速读入pandas的最快方法是自动迭代页面。通过自动迭代所有页面,创建字典并将每个字典添加到其中。然后,使用此字典创建数据框。

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

dictionary ={"column1":[],"column2":[]}

for row in session.execute(sql_query):
    dictionary["column1"].append(row.column1)
    dictionary["column1"].append(row.column1)

df = pd.DataFrame(dictionary)

0

我一直在努力将数据从Cassandra移动到MSSQL,并使用这里提供的答案作为参考,我能够移动数据,但是我的Cassandra源表非常大,我的查询从Cassandra获取超时错误,问题是我们无法增加超时时间,我只剩下批量选择行的选项,在我的查询中,我的代码还将Cassandra集合数据类型转换为str,因为我想将其插入MSSQL,然后解析它,请让我知道是否有人遇到类似的问题,我构建的代码如下:

import sys
import pandas as pd
import petl as etl
import pyodbc
import sqlalchemy
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from sqlalchemy import *
from cassandra.query import SimpleStatement


def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)
    engine = sqlalchemy.create_engine('sql_server_connection string')

cluster = Cluster(
    contact_points=['cassandra_host'], 
    auth_provider = PlainTextAuthProvider(username='username', password='passwrd')
)

session = cluster.connect('keyspace',wait_for_all_pools=True)

session.row_factory = pandas_factory
request_timeout = 60000
query = "SELECT * FROM cassandratable"
statement = SimpleStatement(query, fetch_size=5000) 
rows = session.execute(statement)

df = rows._current_rows
df['attributes'] = df.attributes.astype(str)
df['attributesgenerated'] = df.attributesgenerated.astype(str)
df['components'] = df.components.astype(str)
df['distributioncenterinfo'] = df.distributioncenterinfo.astype(str)
df['images'] = df.images.astype(str)
df['itemcustomerzonezoneproductids'] = 
df.itemcustomerzonezoneproductids.astype(str)
df['itempodconfigids'] = df.itempodconfigids.astype(str)
df['keywords'] = df.keywords.astype(str)
df['validationmessages'] = df.validationmessages.astype(str)
df['zones'] = df.zones.astype(str)
#error_bad_lines=False
#print(df)
df.to_sql(
           name='mssql_table_name',
           con=engine,
           index=False,
           if_exists='append',
           chunksize=1
         )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接