多个Spark作业将Parquet数据附加到相同的基本路径并进行分区。

30

我有多个工作要并行执行,每天会使用分区将数据追加到相同的路径中。

例如:

dataFrame.write().
         partitionBy("eventDate", "category")
            .mode(Append)
            .parquet("s3://bucket/save/path");

工作1 - 类别 =“billing_events” 工作2 - 类别 =“click_events”

这两个作业在执行之前将截断S3存储桶中存在的任何现有分区,然后将生成的parquet文件保存到它们各自的分区中。

工作1-> s3://bucket/save/path/eventDate=20160101/channel=billing_events

工作2-> s3://bucket/save/path/eventDate=20160101/channel=click_events

我面临的问题是Spark作业执行期间创建的临时文件。它将工作输出文件保存到基本路径

s3://bucket/save/path/_temporary/...

因此,两个作业最终共享相同的临时文件夹并导致冲突。我已经注意到这可能会导致一个作业删除临时文件,而另一个作业从S3接收到404错误,表示缺少一个预期的临时文件。

是否有人遇到过这个问题,并想出了一种策略来在同一基本路径下并行执行作业?

我目前使用的是Spark 1.6.0


1
你可以使用直接输出提交者,因为它不使用临时文件夹,所以不会发生冲突。 - Tal Joffe
直接输出提交者将数据写入桶,如果作业失败并留下部分数据在S3中而永远不会被清理,是否存在任何风险? - vcetinick
1
是的,肯定可以。由于文件系统的提交是在任务级别上完成的(即对于每个输出文件),因此可能会出现部分数据的情况。我们解决这个问题的方法是直接写入临时文件夹,并在 Spark 作业完成后将其复制到最终目标位置(使用 S3DistCp 步骤)。我寻找了更好的方法来解决这个问题,但没有找到。 - Tal Joffe
多好的问题 - 我正准备着手做类似的事情,但没有考虑到共享的 _temporary 文件夹(顺便说一句,这是个坏主意..) - WestCoastProjects
1
有没有解决这个问题的干净方法?我检查了vcetinick的解决方案,它肯定有效,但是在最新版本的Spark中是否有解决方案? - Devavrata
1
@TalJoffe 我也遇到了类似的问题,我在使用Hadoop 2.8.5和Spark 2.4(Aws EMR)。我们有任何干净的解决方案吗? - Vikram Ranabhatt
4个回答

27

阅读了有关如何解决这个问题的大量文章后,我认为将一些智慧带回这里来告一段落。其中大部分要感谢Tal的评论。

此外,我发现直接写入s3://bucket/save/path似乎很危险,因为如果一个作业被终止且临时文件夹的清理在作业结束时没有发生,那么它似乎会留给下一个作业,并且我注意到有时前一个终止的作业的临时文件会出现在s3://bucket/save/path中导致重复...完全不可靠...

此外,将_temporary文件夹文件重命名为其适当的s3文件需要极长的时间(每个文件约1秒),因为S3仅支持复制/删除而不是重命名。此外,仅驱动程序实例使用单个线程重命名这些文件,因此在具有大量文件/分区的某些作业中,多达1/5的时间都花在等待重命名操作上。

我已经排除了使用DirectOutputCommitter的选项,由于以下原因:

  1. 与推测模式结合使用时会导致重复(https://issues.apache.org/jira/browse/SPARK-9899
  2. 任务失败将留下无法找到和清理/删除的混乱。
  3. Spark 2.0已完全删除了对其的支持,没有升级路径。(https://issues.apache.org/jira/browse/SPARK-10063

唯一安全、高效和一致执行这些任务的方法是首先将它们保存到具有唯一标识(应用程序ID或时间戳)的HDFS临时文件夹中。然后在作业完成时复制到S3。

这使并发作业能够执行,因为它们将保存到唯一的临时文件夹中,无需使用DirectOutputCommitter,因为在HDFS上重命名操作比S3更快,而且保存的数据更加一致。


2
感谢这些宝贵的见解。这是一个比看起来更复杂的努力。 - WestCoastProjects
只是对这个答案的补充。我做了类似的事情。您可以使用 s3distcp(https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/)来实现。但是,我还没有解决当我使用 saveAsTable 时的情况。手动创建表是可以的。 - gorros
2
@vcetinick 我也遇到了这个问题。感谢提供的解决方案。在 Spark 2.4.4 和 Hadoop 2.8.5 中,我们有解决方案吗? - Vikram Ranabhatt
这仍然适用于Spark 2.4.0.cludera2,HDFS(parquet,但这不重要)。从多个应用程序运行向同一目录追加数据是不安全的,即使是顺序执行也不行!如果一个运行崩溃了,你就无法知道它留下了什么。比如,如果它无法删除<TABLE>/_temporary/0,另一个应用程序的下一次提交也会提交留在那里的行。我们使用像/data/foo/batch=1234这样的表,而不是只有/data/foo,每个运行都有自己的批次。你仍然可以将/data/foo读取为单个表(其中有一个额外的“batch”列),只是不要在该级别上追加数据。 - ddekany

4

不要使用partitionBy

,而是
dataFrame.write().
         partitionBy("eventDate", "category")
            .mode(Append)
            .parquet("s3://bucket/save/path");

或者您可以将文件编写为

在 job-1 中,将 Parquet 文件路径指定为:

dataFrame.write().mode(Append)            
.parquet("s3://bucket/save/path/eventDate=20160101/channel=billing_events")

在作业2中,将Parquet文件路径指定为:

dataFrame.write().mode(Append)            
.parquet("s3://bucket/save/path/eventDate=20160101/channel=click_events")
  1. 这两个作业将在各自的文件夹下创建单独的_temporary目录,从而解决并发问题。
  2. 同时,还将发生基于事件日期为20160101和频道列的分区发现。
  3. 缺点是即使数据中不存在channel=click_events,也会创建channel=click_events的parquet文件。

2
感觉这不是很理想。您需要针对每个频道进行迭代,然后确保过滤掉不相关的频道数据,然后保存到正确的分区文件夹中。这可以通过partitionBy方法自动处理。 - vcetinick

2
我怀疑这是由于Spark 1.6中引入的分区发现更改所致。这些更改意味着,只有在指定了“basepath”选项(请参见Spark发布说明here)时,Spark才会将路径视为分区,例如.../xxx=yyy/

因此,如果您添加basepath选项,问题应该就会解决,像这样:

dataFrame
  .write()
  .partitionBy("eventDate", "category")
  .option("basepath", "s3://bucket/save/path")
  .mode(Append)
  .parquet("s3://bucket/save/path");

我还没有验证过,但希望它能解决问题 :)


没有关于分区的问题,更多的是并发问题。当作业正在执行时,某些内部机制会在基本路径s3://bucket/save/path/_temporary创建临时文件,这意味着不能有效地在同一基本路径上运行2个作业。此外,我不确定设置基本路径是否对分区发现是强制性的,只有在您指向子路径而不是在创建数据框时指向基本路径时才需要。 - vcetinick
@vcetinick,看起来我们使用HDFS时遇到了类似的问题,我会进一步调查并可能在Spark问题跟踪器上提交一个错误报告。 - Base_v

-1

使用“partitionBy”对同一路径进行多个写入任务,当在FileOutputCommittercleanupJob中删除_temporary时,将失败,就像没有这样的文件或目录

测试代码:

def batchTask[A](TASK_tag: String, taskData: TraversableOnce[A], batchSize: Int, fTask: A => Unit, fTaskId: A => String): Unit = {
  var list = new scala.collection.mutable.ArrayBuffer[(String, java.util.concurrent.Future[Int])]()
  val executors = java.util.concurrent.Executors.newFixedThreadPool(batchSize)
  try {
    taskData.foreach(d => {
      val task = executors.submit(new java.util.concurrent.Callable[Int] {
        override def call(): Int = {
          fTask(d)
          1
        }
      })
      list += ((fTaskId(d), task))
    })
    var count = 0
    list.foreach(r => if (!r._2.isCancelled) count += r._2.get())
  } finally {
    executors.shutdown()
  }
}
def testWriteFail(outPath: String)(implicit spark: SparkSession, sc: SparkContext): Unit = {
  println(s"try save: ${outPath}")
  import org.apache.spark.sql.functions._
  import spark.sqlContext.implicits._
  batchTask[Int]("test", 1 to 20, 6, t => {
    val df1 =
      Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")))
        .toDF("int_column", "string_column", "date_column")
        .withColumn("t0", lit(t))
    df1.repartition(1).write
      .mode("overwrite")
      .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
      .partitionBy("t0").csv(outPath)
  }, t => f"task.${t}%4d") // some Exception
  println(s"fail: count=${spark.read.csv(outPath).count()}")
}
try {
  testWriteFail(outPath + "/fail")
} catch {
  case e: Throwable =>
}

失败

使用 OutputCommitter

package org.jar.spark.util
import java.io.IOException
/*
  * 用于 DataFrame 多任务写入同一个目录。
  * <pre>
  * 1. 基于临时目录写入
  * 2. 如果【任务的输出】可能会有重叠,不要使用 overwrite 方式,以免误删除
  * </pre>
  * <p/>
  * Created by liao on 2018-12-02.
  */
object JMultiWrite {
  val JAR_Write_Cache_Flag = "jar.write.cache.flag"
  val JAR_Write_Cache_TaskId = "jar.write.cache.taskId"
  /** 自动删除目标目录下同名子目录 */
  val JAR_Write_Cache_Overwrite = "jar.write.cache.overwrite"
  implicit class ImplicitWrite[T](dw: org.apache.spark.sql.DataFrameWriter[T]) {
    /**
      * 输出到文件,需要在外面配置 option format mode 等
      *
      * @param outDir    输出目标目录
      * @param taskId    此次任务ID,用于隔离各任务的输出,必须具有唯一性
      * @param cacheDir  缓存目录,最好是 '_' 开头的目录,如 "_jarTaskCache"
      * @param overwrite 是否删除已经存在的目录,默认 false 表示 Append模式
      *                  <font color=red>(如果 并行任务可能有相同 子目录输出时,会冲掉,此时不要使用 overwrite)</font>
      */
    def multiWrite(outDir: String, taskId: String, cacheDir: String = "_jarTaskCache", overwrite: Boolean = false): Boolean = {
      val p = path(outDir, cacheDir, taskId)
      dw.options(options(cacheDir, taskId))
        .option(JAR_Write_Cache_Overwrite, overwrite)
        .mode(org.apache.spark.sql.SaveMode.Overwrite)
        .save(p)
      true
    }
  }
  def options(cacheDir: String, taskId: String): Map[String, String] = {
    Map(JAR_Write_Cache_Flag -> cacheDir,
      JAR_Write_Cache_TaskId -> taskId,
      "mapreduce.fileoutputcommitter.marksuccessfuljobs" -> "false",
      "mapreduce.job.outputformat.class" -> classOf[JarOutputFormat].getName
    )
  }
  def path(outDir: String, cacheDir: String, taskId: String): String = {
    assert(outDir != "", "need OutDir")
    assert(cacheDir != "", "need CacheDir")
    assert(taskId != "", "needTaskId")
    outDir + "/" + cacheDir + "/" + taskId
  }
  /*-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-*/
  class JarOutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat {
    var committer: org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter = _

    override def getOutputCommitter(context: org.apache.hadoop.mapreduce.TaskAttemptContext): org.apache.hadoop.mapreduce.OutputCommitter = {
      if (this.committer == null) {
        val output = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath(context)
        this.committer = new JarOutputCommitter(output, context)
      }
      this.committer
    }
  }
  class JarOutputCommitter(output: org.apache.hadoop.fs.Path, context: org.apache.hadoop.mapreduce.TaskAttemptContext)
    extends org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(output, context) {
    override def commitJob(context: org.apache.hadoop.mapreduce.JobContext): Unit = {
      val finalOutput = this.output
      val cacheFlag = context.getConfiguration.get(JAR_Write_Cache_Flag, "")
      val myTaskId = context.getConfiguration.get(JAR_Write_Cache_TaskId, "")
      val overwrite = context.getConfiguration.getBoolean(JAR_Write_Cache_Overwrite, false)
      val hasCacheFlag = finalOutput.getName == myTaskId && finalOutput.getParent.getName == cacheFlag
      val finalReal = if (hasCacheFlag) finalOutput.getParent.getParent else finalOutput // 确定最终目录
      // 遍历输出目录
      val fs = finalOutput.getFileSystem(context.getConfiguration)
      val jobAttemptPath = getJobAttemptPath(context)
      val arr$ = fs.listStatus(jobAttemptPath, new org.apache.hadoop.fs.PathFilter {
        override def accept(path: org.apache.hadoop.fs.Path): Boolean = !"_temporary".equals(path.getName())
      })
      if (hasCacheFlag && overwrite) // 移除同名子目录
      {
        if (fs.isDirectory(finalReal)) arr$.foreach(stat =>
          if (fs.isDirectory(stat.getPath)) fs.listStatus(stat.getPath).foreach(stat2 => {
            val p1 = stat2.getPath
            val p2 = new org.apache.hadoop.fs.Path(finalReal, p1.getName)
            if (fs.isDirectory(p1) && fs.isDirectory(p2) && !fs.delete(p2, true)) throw new IOException("Failed to delete " + p2)
          })
        )
      }
      arr$.foreach(stat => {
        mergePaths(fs, stat, finalReal)
      })
      cleanupJob(context)
      if (hasCacheFlag) { // 移除缓存目录
        try {
          fs.delete(finalOutput, false)
          val pp = finalOutput.getParent
          if (fs.listStatus(pp).isEmpty)
            fs.delete(pp, false)
        } catch {
          case e: Exception =>
        }
      }
      // 不用输出 _SUCCESS 了
      //if (context.getConfiguration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
      //  val markerPath = new org.apache.hadoop.fs.Path(this.outputPath, "_SUCCESS")
      //  fs.create(markerPath).close()
      //}
    }
  }
  @throws[IOException]
  def mergePaths(fs: org.apache.hadoop.fs.FileSystem, from: org.apache.hadoop.fs.FileStatus, to: org.apache.hadoop.fs.Path): Unit = {
    if (from.isFile) {
      if (fs.exists(to) && !fs.delete(to, true)) throw new IOException("Failed to delete " + to)
      if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
    }
    else if (from.isDirectory) if (fs.exists(to)) {
      val toStat = fs.getFileStatus(to)
      if (!toStat.isDirectory) {
        if (!fs.delete(to, true)) throw new IOException("Failed to delete " + to)
        if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
      }
      else {
        val arr$ = fs.listStatus(from.getPath)
        for (subFrom <- arr$) {
          mergePaths(fs, subFrom, new org.apache.hadoop.fs.Path(to, subFrom.getPath.getName))
        }
      }
    }
    else if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
  }
}

然后:

def testWriteOk(outPath: String)(implicit spark: SparkSession, sc: SparkContext): Unit = {
  println(s"try save: ${outPath}")
  import org.apache.spark.sql.functions._
  import org.jar.spark.util.JMultiWrite.ImplicitWrite // 导入工具
  import spark.sqlContext.implicits._
  batchTask[Int]("test.ok", 1 to 20, 6, t => {
    val taskId = t.toString
    val df1 =
      Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")))
        .toDF("int_column", "string_column", "date_column")
        .withColumn("t0", lit(taskId))
    df1.repartition(1).write
      .partitionBy("t0")
      .format("csv")
      .multiWrite(outPath, taskId, overwrite = true) // 这里使用了 overwrite ,如果分区有重叠,请不要使用 overwrite
  }, t => f"task.${t}%4d")
  println(s"ok: count=${spark.read.csv(outPath).count()}") // 40
}
try {
  testWriteOk(outPath + "/ok")
} catch {
  case e: Throwable =>
}

成功:
$  ls ok/
t0=1  t0=10 t0=11 t0=12 t0=13 t0=14 t0=15 t0=16 t0=17 t0=18 t0=19 t0=2  t0=20 t0=3  t0=4  t0=5  t0=6  t0=7  t0=8  t0=9

对于其他输出格式同样适用,请注意使用 overwrite

在 Spark 2.11.8 上进行测试。

感谢 @Tal Joffe。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接