在Scala中,我们将RDD写入Redis的代码如下:
datardd.foreachPartition(iter => {
val r = new RedisClient("hosturl", 6379)
iter.foreach(i => {
val (str, it) = i
val map = it.toMap
r.hmset(str, map)
})
})
我尝试在PySpark中这样做:datardd.foreachPartition(storeToRedis)
,其中函数storeToRedis
定义为:
def storeToRedis(x):
r = redis.StrictRedis(host = 'hosturl', port = 6379)
for i in x:
r.set(i[0], dict(i[1]))
它给我这个:
导入错误: ('没有名为 redis 的模块', 函数 subimport 位于 0x47879b0, ('redis',))
当然,我已经导入了redis模块。
redis
吗? - zero323addJars
方法一样。 - kamalbangapip
或easy_install
,但除非你能将工作节点限制为每台机器上只有一个,否则它可能表现不佳。 - Paul