Apache Spark - foreach Vs foreachPartition 何时使用哪一个?

51

我想知道使用foreachPartition是否会带来更好的性能,因为它具有更高级别的并行性,与使用foreach方法相比,特别是在我遍历一个RDD以执行一些加法到累加器变量中的情况下。

5个回答

41

foreachforeachPartitions是操作行为。

foreach(function): Unit

用于调用带有副作用的操作的通用函数。对于 RDD 中的每个元素,它都会调用传递的函数。这通常用于操作累加器或写入外部存储器。

注意:在foreach()之外修改除累加器之外的变量可能导致未定义的行为。详情请参见了解闭包

示例

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(function): Unit

foreach()类似,但不是为每个元素调用函数,而是为每个分区调用函数。该函数应该能够接受一个迭代器。这比foreach()更有效率,因为它减少了函数调用的次数(就像mapPartitions()一样)。

foreachPartition的使用示例:


  • 示例1:对于每个分区,如果你想要使用一个数据库连接(在每个分区块内),则以下是如何使用scala的示例。
/**
    * 使用foreach partition插入数据库。
    *
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {
//numPartitions = 同时适用的DB连接数
datframe.repartition(numofpartitionsyouwant)
val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // 注意:每个分区一个连接(更好的方法是使用连接池) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //由于某些数据库不能使用大于1000的批量大小,因此使用1000的批量大小 partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") }
sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) }
sqlExecutorConnection.close() //关闭连接以避免连接耗尽。 } }
  • 示例2:

使用foreachPartition与sparkstreaming(dstreams)和kafka生产者的例子

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

注:如果您想避免每个分区创建生产者的方式,更好的方法是使用sparkContext.broadcast广播生产者,因为Kafka生产者是异步的,会在发送数据之前大量缓冲。


累加器示例片段...通过这可以测试性能。

     test("Foreach - Spark") {
        import spark.implicits._
        var accum = sc.longAccumulator
        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
        assert(accum.value == 6L)
      }
test("Foreach partition - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_))) assert(accum.value == 6L) }

结论:

foreachPartition 操作在分区上进行,因此显然比 foreach 更优秀。

经验法则:

当您访问昂贵的资源(例如数据库连接或Kafka生产者等)时,应该使用 foreachPartition,而不是像 foreach 一样每一个元素都初始化一个。对于累加器,您可以通过上述测试方法来衡量性能,这对于累加器也应该更快。

此外,请参见 map vs mappartitions,它们具有类似的概念,但它们是转换操作。


3
可以。在某些情况下,例如使用累加器,foreach partition会比foreach慢,并且foreachpartition会在内部调用foreach。请提供一些场景以解释这一点。 - Vikram Singh Chandel
1
@RamGhadiyaram,JAVA中是否有类似的功能可用?当我尝试在每个分区上使用grouped()时,它并没有显示任何可用的方法。我正在使用Spark 2.1.0。 - wandermonk
据我所知,Scala是可用的。因此,在Java中不可用,您可以执行普通的批处理操作。我的意思是,您可以执行类似的操作。 - Ram Ghadiyaram
@RamGhadiyaram,我有30个分区和30个核心,需要将15GB的数据复制到Cassandra。在运行SparkJob时,只有一个处理器负载很高,其他执行程序无法参与处理。顺便说一下,我正在使用HDFS中的Parquet文件格式进行保存。你能帮我吗? - BdEngineer
打印分区长度。如果长度为1(因为一个分区正在承载负载),则尝试重新分区,然后对每个分区进行操作。 - Ram Ghadiyaram
@Ram Ghadiyaram val company_model_vals_df = enriched_company_model_vals_df.repartition(col("model_id"), col("fiscal_quarter"),col("fiscal_year"))company_model_vals_df.foreachPartition(partition => { writeAsParquet(partition) })如何使用foreachPartition编写此代码? - BdEngineer

25

foreach自动在多个节点上运行循环。

然而,有时候你希望对每个节点执行某些操作。例如,连接到数据库。你不能只是建立一个连接并将其传递给foreach函数:连接仅在一个节点上建立。

因此,使用foreachPartition可以在运行循环之前在每个节点上连接到数据库。


10
这仍然不是按节点计算,而是按分区计算。分区数量可能比节点数量多得多。如果你需要每个节点(更可能在YARN术语中是每个JVM或容器)建立一个连接,你需要其他解决方案。 - user2456600
@user2456600,你有没有想过如何在每个执行器中只有一个JVM类? - donald
如果使用Scala,一种选择是在对象或类中使用lazy val,在第一次引用时将其初始化到JVM中。但这也有缺点,如果您每个执行器使用多个线程,则必须小心指向的对象是否线程安全。此外,很难将运行时初始化参数(如配置)传递给初始化。 - user2456600

19

foreachforeachPartition之间实际上没有太大的区别。在内部,foreach所做的一切只是使用提供的函数调用迭代器的foreachforeachPartition只是让您有机会在迭代器的循环外执行一些操作,通常是一些昂贵的操作,比如启动数据库连接或类似的操作。因此,如果您没有任何可以针对每个节点的迭代器执行一次并在整个过程中重复使用的操作,则建议使用foreach以提高清晰度并减少复杂性。


4

foreachPartition并不意味着它是每个节点的活动,而是针对每个分区执行的,如果您的分区数量较多,而节点数量较少,则可能会降低性能。如果您打算在节点级别进行活动,则这里解决方案可能会有所帮助,尽管我没有测试过。


我曾使用类似的代码通过foreachPartition向Oracle插入数据,但性能非常慢。 - Sandeep Shetty

4

foreachPartition 只在你按分区聚合数据时迭代数据时有用。

一个很好的例子是处理每个用户的点击流。当您完成一个用户的事件流时,您需要清除计算缓存,但要在同一用户的记录之间保留它,以便计算一些用户行为洞察力。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接