Spark UDF线程安全性

5
我正在使用Spark,将包含日期列的数据框转换为包含与今天日期之间的天数、周数和月数的3个新列。
我的担忧在于SimpleDateFormat的使用,它不是线程安全的。通常情况下,这不会有问题,因为它是一个本地变量,但是由于Spark的惰性评估,共享单个SimpleDateFormat实例是否可能导致问题?
def calcTimeDifference(...){
    val sdf = new SimpleDateFormat(dateFormat)

    val dayDifference = udf{(x: String) => math.abs(Days.daysBetween(new DateTime(sdf.parse(x)), presentDate).getDays)}
    output = output.withColumn("days", dayDifference(myCol))

    val weekDifference = udf{(x: String) => math.abs(Weeks.weeksBetween(new DateTime(sdf.parse(x)), presentDate).getWeeks)}
    output = output.withColumn("weeks", weekDifference(myCol))

    val monthDifference = udf{(x: String) => math.abs(Months.monthsBetween(new DateTime(sdf.parse(x)), presentDate).getMonths)}
    output = output.withColumn("months", monthDifference(myCol))
}

我认为你是安全的,因为sdf将按任务进行序列化和反序列化,这意味着在每个任务中你将有单独的实例。我只是不确定是否每个任务实例都存在,或者可能是每个执行器实例 - 在后一种情况下,你会遇到问题,因为每个执行器可能会并行运行多个任务。 - Tzach Zohar
1个回答

0

我认为这样做不安全,因为我们知道,SimpleDateFormat 不是线程安全的。

所以如果需要在 Spark 中使用 SimpleDateFormat,我更喜欢使用这种方法:

import java.text.SimpleDateFormat
import java.util.SimpleTimeZone

/**
  * Thread Safe SimpleDateFormat for Spark.
  */
object ThreadSafeFormat extends ThreadLocal[SimpleDateFormat] {

  override def initialValue(): SimpleDateFormat = {
    val dateFormat = new SimpleDateFormat("yyyy-MM-dd:H")
    // if you need get UTC time, you can set UTC timezone
    val utcTimeZone = new SimpleTimeZone(SimpleTimeZone.UTC_TIME, "UTC")
    dateFormat.setTimeZone(utcTimeZone)
    dateFormat
  }

}

然后使用ThreadSafeFormat.get()获取线程安全的SimpleDateFormat来执行任何操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接