从PySpark向Redis写入数据

5
在Scala中,我们将RDD写入Redis的代码如下:
datardd.foreachPartition(iter => {
      val r = new RedisClient("hosturl", 6379)
      iter.foreach(i => {
        val (str, it) = i
        val map = it.toMap
        r.hmset(str, map)
      })
    })

我尝试在PySpark中这样做:datardd.foreachPartition(storeToRedis),其中函数storeToRedis定义为:

def storeToRedis(x):
    r = redis.StrictRedis(host = 'hosturl', port = 6379)
    for i in x:
        r.set(i[0], dict(i[1]))

它给我这个:

导入错误: ('没有名为 redis 的模块', 函数 subimport 位于 0x47879b0, ('redis',))

当然,我已经导入了redis模块。


2
每个 worker 上都安装了 redis 吗? - zero323
1
工人使用的Python模块必须在所有工人上都存在...所以他指的是Python Redis模块,而不是Redis数据库安装。 - Paul
@Paul:我理解他的意思,这也是我要问的。我是否需要在所有工作节点上手动安装Python Redis模块?应该有一种更简单和快捷的方法,就像Scala API的addJars方法一样。 - kamalbanga
@kamalbanga 我不知道有什么好方法。当然,你可以尝试使用Spark让工作节点运行pipeasy_install,但除非你能将工作节点限制为每台机器上只有一个,否则它可能表现不佳。 - Paul
@kamalbanga 是的,有点像。我认为addPyFile最适合短期项目导向的模块,而不是像scipy这样的大型分发。在搜索“spark集群上没有scipy”时,我找到了databricks的这篇文章,他们建议在本地安装scipy并使用spark中包含的脚本将目录复制到所有工作节点。 - Paul
显示剩余4条评论
1个回答

7

PySpark的SparkContext有一个特别为此事准备的addPyFile方法。将redis模块制作成一个zip文件(像这样),然后调用这个方法:

sc = SparkContext(appName = "analyze")
sc.addPyFile("/path/to/redis.zip")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接