如何在Spark中对每个执行者执行一次操作

32

我有一个大小约为400MB的weka模型存储在S3中。现在,我有一些记录需要运行该模型并进行预测。

为了进行预测,我尝试了以下操作:

  1. 下载和加载模型到driver作为静态对象,并将其广播到所有执行器上。对预测RDD执行map操作。 ----> 不起作用,因为在Weka中,为了进行预测,需要修改模型对象,而广播需要一个只读副本。

  2. 下载和加载模型到driver作为静态对象,并在每个map操作中将其发送到执行器。 -----> 可行(不高效,因为在每个map操作中都要传递400MB的对象)

  3. 在driver上下载模型并在每个执行器上加载并缓存它。(不知道怎么做)

有没有人知道如何加载模型到每个执行器,并缓存它,以便对于其他记录,我无需再次加载它?


2
从Spark 2.4开始,有一个名为ExecutorPlugin的Java接口,允许自定义init()shutdown()函数。详情请见:https://issues.apache.org/jira/browse/SPARK-24918 - Samson Scharfrichter
5个回答

40

你有两个选择:

1. 使用惰性val创建单例对象来表示数据:

    object WekaModel {
        lazy val data = {
            // initialize data here. This will only happen once per JVM process
        }
    }       

接下来,您可以在map函数中使用懒加载变量lazy vallazy val确保每个工作JVM初始化其自己的数据实例。不会对data执行序列化或广播。

    elementsRDD.map { element =>
        // use WekaModel.data here
    }

优点

  • 更有效率,因为它让您可以针对每个JVM实例初始化您的数据一次。例如,当需要初始化数据库连接池时,此方法是一个不错的选择。

缺点

  • 初始化控制较少。例如,如果需要运行时参数,则初始化对象会更加棘手。
  • 如果需要,您无法真正释放或释放对象。通常情况下,这是可以接受的,因为当进程退出时,操作系统将释放资源。

2. 在RDD上使用mapPartition(或foreachPartition)方法而不仅仅是map

这样可以为整个分区初始化所需的任何内容。

    elementsRDD.mapPartition { elements =>
        val model = new WekaModel()

        elements.map { element =>
            // use model and element. there is a single instance of model per partition.
        }
    }

优点:

  • 在对象的初始化和反初始化方面提供更大的灵活性。

缺点

  • 每个分区都会创建并初始化您的对象的新实例。这取决于每个JVM实例有多少个分区,它可能是一个问题或不是一个问题。

2
你确定 #1 是正确的吗?我在使用它时遇到了序列化错误。另外,如果数据初始化取决于运行时参数,你会怎么做? - Frank
使用第一种方法时不应该发生任何序列化。如果有,很可能是在RDD方法中引用了中间对象。关于您的初始化问题,确实更难控制。您的运行时参数也需要静态可用(例如通过系统属性或配置文件)。单例初始化并不是Spark特定的,而是Scala主题。 - Dia Kharrat
4
你能在Java中做同样的事情吗? - Ross Brigoli
3
关于这个缺点,“初始化控制较少,例如,如果您需要运行时参数来初始化对象,则更加棘手。”。 这正是我想要实现的。你有任何例子或者见过这样做吗? 我正在调用外部系统以获取数据库连接配置。因此,理想情况下,我不希望在每个执行器上都调用外部系统。我刚刚问了一个非常类似的问题。https://dev59.com/6qbja4cB1Zd3GeqPkK9M - Alex Naspo
初始化数据库连接的一种可能的方式是通过系统属性(例如 System.getProperty("db.host"))来进行。 - Dia Kharrat
@DiaKharrat,您能否提供一个Python解决方案?如果我想在每个执行器上加载一个巨大的文件(8GB)预训练嵌入文件,我们该如何做到这一点? - bib

2
这是比惰性初始化器更好的方法:我创建了一个对象级指针并将其初始化为null,让每个执行器都初始化它。在初始化块中,您可以有一次性的代码。请注意,每个处理批次都会重置局部变量但不会重置对象级变量。
object Thing1 {
  var bigObject : BigObject = null

  def main(args: Array[String]) : Unit = {
    val sc = <spark/scala magic here>
    sc.textFile(infile).map(line => {
      if (bigObject == null) {
         // this takes a minute but runs just once
         bigObject = new BigObject(parameters)  
      }
      bigObject.transform(line)
    })
  }
}

这种方法每个执行器只创建一个大对象,而不是其他方法中每个分区都创建一个大对象。

如果您将var bigObject:BigObject = null放在主函数命名空间中,则会有不同的行为。在这种情况下,它会在每个分区(即批处理)开始时运行bigObject构造函数。如果存在内存泄漏,则最终会导致执行器崩溃。垃圾收集也需要做更多的工作。


2
如果你的 spark.executor.cores 大于1,那么这将多次调用 new BigObject。懒惰的方法可以防止并发初始化。 - Dan
1
@Dan 你是指 lazy var bigObject ... 吗? - Dale Johnson
1
@Dale,你的代码在技术上不是线程安全的,因为如果多个执行器同时运行,它们可以初始化你的全局对象。 - Dia Kharrat

1

这是我们通常做的事情

  1. 定义一个单例客户端,执行这些操作以确保每个执行程序中只有一个客户端存在

  2. 拥有一个getorcreate方法来创建或获取客户端信息,通常让您拥有一个共同的服务平台,您希望为多个不同的模型提供服务,然后我们可以使用像concurrentmap这样的东西来确保线程安全和computeifabsent

  3. getorcreate方法将在RDD级别内被调用,例如transform或foreachpartition,因此请确保init发生在执行程序级别


1
你可以通过广播一个带有lazy val的case对象来实现这一点,如下所示:
case object localSlowTwo {lazy val value: Int = {Thread.sleep(1000); 2}}
val broadcastSlowTwo = sc.broadcast(localSlowTwo)
(1 to 1000).toDS.repartition(100).map(_ * broadcastSlowTwo.value.value).collect

这个在三个执行器中每个有三个线程的事件时间轴如下:

Event Timeline for Stage 1

在同一次 spark-shell 会话中再次运行最后一行代码不会再进行初始化:

Event Timeline for Stage 3


0

这对我很有效,如果您像下面所示使用单例和同步,则是线程安全的。

object singletonObj {
  var data: dataObj =null
  def getDataObj(): dataObj = this.synchronized {
     if (this.data==null){
        this.data = new dataObj()
     }
     this.data
  }
}

object app {
   def main(args: Array[String]): Unit = {
      lazy val mydata: dataObj = singletonObj.getDataObj()
      df.map(x=>{ functionA(mydata) })
   }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接