在Spark ML管道中缓存中间结果

18
最近我正在计划将独立的Python ML代码迁移到Spark上。 spark.ml中的ML管道非常方便,具有流畅的API,可以链接算法阶段和超参数网格搜索。
然而,我发现其对于一个重要功能的支持在现有文档中比较模糊:缓存中间结果。当管道涉及计算密集型阶段时,这个功能的重要性就体现出来了。
例如,在我的情况下,我使用一个巨大的稀疏矩阵对时间序列数据执行多个移动平均以形成输入特征。矩阵的结构是由一些超参数确定的。这一步骤成为整个管道的瓶颈,因为我必须在运行时构造矩阵。
在参数搜索期间,我通常除了这个“结构参数”之外还有其他参数要考虑。所以如果在“结构参数”不变的情况下可以重用巨大的矩阵,我就可以节省大量时间。出于这个原因,我特意编写了代码来缓存和重用这些中间结果。
所以我的问题是:Spark的ML管道能否自动处理中间缓存?还是我必须手动编写代码来做到这一点?如果是这样,是否有任何最佳实践可以借鉴?
附言:我已经查看了官方文档和一些其他材料,但似乎没有讨论这个主题。
1个回答

12

我遇到了同样的问题,解决的方法是我实现了自己的PipelineStage,它缓存了输入的DataSet并将其原封不动地返回。

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType

class Cacher(val uid: String) extends Transformer with DefaultParamsWritable {
  override def transform(dataset: Dataset[_]): DataFrame = dataset.toDF.cache()

  override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

  override def transformSchema(schema: StructType): StructType = schema

  def this() = this(Identifiable.randomUID("CacherTransformer"))
}

要使用它,您需要像这样操作:

new Pipeline().setStages(Array(stage1, new Cacher(), stage2))

3
我猜唯一的问题是这个解决方案(我实际上已经点赞了!)没有取消持久化之前缓存的数据框(如果链接了多个 Cacher)。你可能会认为这不是问题,因为Spark会在垃圾回收时自动取消持久化,但是从你的UI中看到这么多缓存数据可能会让人感到困惑。 - Vince.Bdn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接