Redis on Spark:任务不可序列化

6
我们在Spark上使用Redis来缓存我们的键值对。以下是代码:

import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = perhitFile.map(x => {
    val arr = x.split(" ")
    val readId = arr(0).toInt
    val refId = arr(1).toInt
    val start = arr(2).toInt
    val end = arr(3).toInt
    val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
    val readStr = r.hmget("readStr", readId).get(readId)
    val realend = if(end > refStr.length - 1) refStr.length - 1 else end
    val refOneStr = refStr.substring(start, realend)
      (readStr, refOneStr, refId, start, realend, readId)
 })

但是编译器给了我这样的反馈:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.ynu.App$.main(App.scala:511)
    at com.ynu.App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.redis.RedisClient
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 12 more

请问有人可以告诉我如何序列化从Redis获取的数据吗?非常感谢。

2个回答

22
在Spark中,对于RDD上的函数(例如这里的map),它们会被序列化并发送到执行器进行处理。这意味着这些操作中包含的所有元素都应该是可序列化的。
这里的Redis连接不可序列化,因为它打开TCP连接以连接到目标数据库,并绑定到创建它的机器。
解决方法是在本地执行上下文中在执行器上创建这些连接。有几种方法可以做到这一点。两种我想到的是:
1. rdd.mapPartitions:让您一次处理整个分区,从而分摊创建连接的成本。 2. 单例连接管理器:每个执行器创建一次连接。
使用mapPartitions更容易,因为它只需要对程序结构进行小的更改。
val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

一个单例连接管理器可以被建模为一个对象,该对象持有对连接的延迟引用(注意:可变引用也可以工作)。

object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

然后可以使用此对象在每个工作进程JVM中实例化1个连接,并将其用作操作闭包中的Serializable对象。

val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

使用单例对象的优点是,由JVM仅创建一次连接(而不是每个RDD分区1次),从而减少了开销。

但也存在一些缺点:

  • 连接的清理比较棘手(需要关机钩子/计时器)
  • 必须确保共享资源的线程安全性

(*) 此为示例代码,未编译或测试。


谢谢您的回答!我正在使用这个库https://github.com/debasishg/scala-redis。它没有一个名为“close”的方法,而是“disconnect”。我不知道它是否有效。您能告诉我您现在使用哪个库来处理Redis数据吗? - fanhk
为单例解决方案加1。您能举个例子来说明如何管理连接的关闭吗? - Sohaib
@Sohaib,考虑到这是一个VM绑定的对象,您需要注册一个关闭挂钩来清理关闭连接。 - maasg

2
您正在尝试序列化客户端。您有一个RedisClient(r),您正在尝试在将运行于不同群集节点上的map中使用它。在执行群集任务之前,要么从Redis中单独获取所需的数据,要么在您的map块内为每个群集任务单独创建客户端(可能是通过使用mapPartitions而不是map,因为为每个单独的行创建新的redis客户端可能是一个坏主意)。

谢谢您的回答,但您能告诉我在这种情况下如何使用mapPartitions吗? - fanhk
调用 mapPartitions,传递一个接受可迭代对象的代码块(正如您从 mapPartitions 的签名中可以看到的那样),在此代码块内创建 RedisClient,然后使用它来 map 可迭代对象,就像您一直在做的那样。重点是,在单个分区的处理内部创建了 RedisClient。您尝试了什么,遇到了什么问题? - lmm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接