如何高效地将数十亿条数据插入Redis?

13

我有大约20亿个键值对,想要高效地将它们加载到Redis中。我目前正在使用Python,并按照redis-py文档中所述使用管道。如何加速以下方法?

import redis

def load(pdt_dict):
    """
    Load data into redis.

    Parameters
    ----------
    pdt_dict : Dict[str, str]
        To be stored in Redis
    """
    redIs = redis.Redis()
    pipe = redIs.pipeline()
    for key in pdt_dict.keys():
        pipe.hmset(self.seller + ":" + str(key), pdt_dict[key])
    pipe.execute()

你实际上没有使用管道。调用 pipe.hmset 而不是 redIs.hmset - Jared
抱歉,那只是打字错误...对我的错误表示歉意...你能指导我如何实现加速吗? - John Deep
@JohnDeep 请提供实际的代码。 - Itamar Haber
你好 @ItamarHaber,我刚刚更新了完整的函数。 - John Deep
5个回答

21

关于这个问题和示例代码有几点需要注意。

  1. 流水线技术并非万能药 - 在使用之前,您需要了解它的作用。流水线技术可以批量发送多个操作,并一次性接收它们的响应。你所获得的好处是将每个操作的网络往返时间替换为批处理的时间。但如果批次大小无限增大,则会严重消耗资源 - 你需要将批次大小保持在足够小的范围内以发挥其效力。按照经验法则,我通常尝试将每个流水线的大小控制在60KB左右,由于每个数据都不同,因此实际上流水线中的操作数量也不相同。假设您的键和值约为1KB,则需要每60个操作左右调用 pipeline.execute()

  2. 除非我极其误解,否则此代码不应运行。您正在使用 HMSET 作为 SET 来使用,因此您基本上忽略了哈希的字段->值映射。哈希(HMSET)和字符串(SET)是不同的数据类型,因此应相应地使用。

  3. 似乎这个小循环负责整个“十亿数据” - 如果是这样,除非您的运行代码的服务器有足够的内存来保存字典,否则会像疯狂交换一样,并且效率非常低(无论 Python 的速度如何)。您需要通过运行多个此进程的实例来并行插入数据。

  4. 您是否在远程连接 Redis?如果是,则网络可能会限制您的性能。

  5. 考虑您的 Redis 设置 - 或许可以针对这个任务进行调整以获得更好的性能,前提是它确实是瓶颈。


Redis 是单线程的,如何进行并行数据插入会有帮助呢? - Andrei Nikolaenko

6

您可以使用redis-cli的管道模式。

  1. 首先,您需要准备一个文件(注意,行应以CR / LF结尾或通过-d <delimiter>选项设置):
    SET Key0 Value0
    SET Key1 Value1
    ...
    SET KeyN ValueN
  1. 将其序列化为 Redis RESP 格式(例如,作为引号字符串,请参见文档)。

  2. 最后将其导入到redis-cli中(使用 --pipe 参数):

cat data_in_resp_format.txt | redis-cli --pipe

3
重要提示!data.txt不是包含redis命令的文件,而是包含redis协议的文件。就在这个示例前面,文档中提到“创建一个文件,其中包含Redis协议格式的以下命令”。 - Zev Isert
这里的cat命令不起作用,它产生了错误:ERR unknown command '*3 $3 SET $3 key $5 value ',参数以开头。 - Kishor Patidar

2
另一个需要考虑的问题是,在构建管道时设置transaction=False可以提高性能,如果满足以下条件(来自Redis Labs):
对于我们想要向Redis发送多个命令的情况,一个命令的结果不会影响另一个命令的输入,并且我们不需要它们全部以事务方式执行,将False传递给pipeline()方法可以进一步提高Redis的整体性能。

2
我希望您已经安装了 hiredis python 包,同时也安装了 redis python 包。请参阅 https://github.com/andymccurdy/redis-py#parsers,这应该可以提高性能。 self.seller 做了什么?也许这是一个瓶颈?
正如 @Itamar 所说,尝试定期执行 pipeline。
def load(pdtDict):
    redIs = redis.Redis()
    pipe = redIs.pipeline()
    n = 1
    for key in pdtDict.keys():
        pipe.hmset(self.seller+":"+str(key),pdtDict[key])
        n = n + 1
        if (n % 64) == 0:
            pipe.execute()
            pipe = redIs.pipeline()

2
最好在if块内将n=0。由于问题提到了“十亿”数据,我认为在这种情况下保持如此大的计数没有任何用处。 - im_bhatman
3
你需要在 for 循环后添加一个尾随的 pipe.execute() 命令,以确保仍然缓存小于 64 的任何余数。 - Brendan
1
为什么每次执行后都要创建一个新的管道?只是好奇想知道。 - Aleksandar Jovanovic

1
为了向Redis提供大量数据,请考虑使用redis大规模插入功能,在这里有描述。
要使其工作,您需要访问redis-cli。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接