如何使用pyarrow将Pandas数据框存储到Redis中并获取数据框

28

使用

dd = {'ID': ['H576','H577','H578','H600', 'H700'],
      'CD': ['AAAAAAA', 'BBBBB', 'CCCCCC','DDDDDD', 'EEEEEEE']}
df = pd.DataFrame(dd)

在Pandas 0.25之前,以下内容有效。

set:  redisConn.set("key", df.to_msgpack(compress='zlib'))
get:  pd.read_msgpack(redisConn.get("key"))

现在,出现了过时警告..

FutureWarning: to_msgpack is deprecated and will be removed in a future version.
It is recommended to use pyarrow for on-the-wire transmission of pandas objects.

The read_msgpack is deprecated and will be removed in a future version.
It is recommended to use pyarrow for on-the-wire transmission of pandas objects.

pyarrow是如何工作的?我如何将pyarrow对象存储到Redis并取回。

参考: 如何将pandas.DataFrame存储到Redis并取回?

7个回答

46

以下是使用pyarrow将pandas数据框序列化并存储在redis中的完整示例

apt-get install python3 python3-pip redis-server
pip3 install pandas pyarrow redis

然后在Python中

import pandas as pd
import pyarrow as pa
import redis

df=pd.DataFrame({'A':[1,2,3]})
r = redis.Redis(host='localhost', port=6379, db=0)

context = pa.default_serialization_context()
r.set("key", context.serialize(df).to_buffer().to_pybytes())
context.deserialize(r.get("key"))
   A
0  1
1  2
2  3

我刚刚提交了PR 28494到pandas,以便在文档中包含这个pyarrow示例。

参考文档:


4
非常棒。我认为,一个防御性编程者在将数据框推送到 Redis 之前应该检查其大小,因为据我所知,512MB 的限制仍然存在。https://github.com/antirez/redis/issues/757 - Brian Wylie
2
@BrifordWylie:在将数据推送到Redis之前,我使用bz2包压缩数据。 - Javiar Sandra
1
上面的答案在 to_pybytes() 中是否进行了任何压缩? - sray
@sumonc 我假设你现在已经弄清楚了,但为了完整起见:在redis.Redis构造函数中,不要放入decode_responses=True - oerpli
在最新版本的pyarrow中,不需要上下文。只需要使用pa.serialize(..)pa.deserialize(..)即可。 - Aziz Alto
显示剩余3条评论

12

由于default_serialization_context已被弃用且事情变得更简单了,以下是我如何处理它:

import pyarrow as pa
import redis

pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)

def storeInRedis(alias, df):
    df_compressed = pa.serialize(df).to_buffer().to_pybytes()
    res = r.set(alias,df_compressed)
    if res == True:
        print(f'{alias} cached')

def loadFromRedis(alias):
    data = r.get(alias)
    try:
        return pa.deserialize(data)
    except:
        print("No data")


storeInRedis('locations', locdf)

loadFromRedis('locations')

3
看起来pyarrow在2.0.0版本中将不再支持此功能。详情请见https://arrow.apache.org/blog/2020/10/22/2.0.0-release/ 。 - binarymason

8

如果想要在Redis中压缩数据,可以使用内置的parquet和gzip支持。

def openRedisCon():
   pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=0)
   r = redis.Redis(connection_pool=pool)
   return r

def storeDFInRedis(alias, df):
    """Store the dataframe object in Redis
    """

    buffer = io.BytesIO()
    df.to_parquet(buffer, compression='gzip')
    buffer.seek(0) # re-set the pointer to the beginning after reading
    r = openRedisCon()
    res = r.set(alias,buffer.read())

def loadDFFromRedis(alias, useStale: bool = False):
    """Load the named key from Redis into a DataFrame and return the DF object
    """

    r = openRedisCon()

    try:
        buffer = io.BytesIO(r.get(alias))
        buffer.seek(0)
        df = pd.read_parquet(buffer)
        return df
    except:
        return None



0

Pickle和zlib可以作为pyarrow的替代方案:

import pandas as pd
import redis
import zlib
import pickle

df=pd.DataFrame({'A':[1,2,3]})
r = redis.Redis(host='localhost', port=6379, db=0)
r.set("key", zlib.compress( pickle.dumps(df)))
df=pickle.loads(zlib.decompress(r.get("key")))

0
import pandas as pd
import redis
import pickle

r = redis.Redis(host='localhost', port=6379, db=0)

data = {
    "calories": ["v1", 'v2', 'v3'],
    "duration": [50, 40, 45]
}
df = pd.DataFrame(data, index=["day1", "day2", "day3"])

r.set("key", pickle.dumps(df))
print(pickle.loads(r.get("key")))

或者,您可以使用direct-redis
import pandas as pd
from direct_redis import DirectRedis

r = DirectRedis(host='localhost', port=6379)
>>> df =  pd.DataFrame([[1,2,3,'235', '@$$#@'], 
                   ['a', 'b', 'c', 'd', 'e']])
>>> print(df)
   0  1  2    3      4
0  1  2  3  235  @$$#@
1  a  b  c    d      e   

>>> r.set('df', df)   

>>> r.get('df')
   0  1  2    3      4
0  1  2  3  235  @$$#@
1  a  b  c    d      e   

>>> type(r.get('df'))
<class 'pandas.core.frame.DataFrame'>

0
import pandas as pd
import pyarrow as pa
import redis


r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)


def save_df_to_redis(r, redis_key, df):
    buffer = pa.serialize_pandas(df)
    r.set(redis_key, buffer.to_pybytes())


def load_df_from_redis(r, redis_key):
    buffer = r.get(redis_key)
    df = pa.deserialize_pandas(buffer)
    return df



data = {
    "Name": ["John", "Anna", "Peter"],
    "Age": [28, 24, 33],
}
df = pd.DataFrame(data)

save_df_to_redis(r, "key", df)
df_redis = load_df_from_redis(r, "key")
print(df_redis)

0
如果你在Redis的Python客户端中将decode_response设置为true,你也可以尝试这样做。
to_save = value.to_string()
r.set(redis_key, to_save)

value = r.get(redis_key)
df = pd.read_csv(io.StringIO(value))


欢迎来到Stack Overflow。请像接受的答案中所做的那样,用输入验证您的回答。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接