如何从Spark UI中检索指标,例如输出大小和写入记录数?

11

在任务或作业完成后,我如何在控制台(Spark Shell或Spark Submit Job)上收集这些指标。

我们正在使用Spark从Mysql加载数据到Cassandra,数据量非常大(例如:约200 GB和600M行)。当任务完成后,我们想要验证Spark究竟处理了多少行?我们可以从Spark UI中获取该数字,但是如何从Spark shell或spark-submit job中检索该数字(“输出记录已写入”)。

从Mysql加载到Cassandra的示例命令。

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))
我想检索上述任务的所有Spark UI指标,主要是输出大小和写入记录数。

请帮忙。

感谢您的时间!


你的意思是在Spark UI上可以找到指标,但我用类似的代码(读取JDBC源)却没有在UI上找到指标? - Tom
它会显示在Spark的应用程序界面上,通常位于作业和阶段下。您可以查看统计信息、执行器信息以及单个任务信息,例如每个任务读取多少数据,每个任务写入了多少洗牌等。 - Ajay Guyyala
感谢@ajay-guyyala。我没有在用户界面上看到它的运行结果,我将调查发生了什么。 - Tom
这里是我找到的一些示例图片。它可能不会显示所有工作/阶段的指标。此外,这取决于我们使用的Spark版本。在我发布这篇文章时,我们正在使用Spark 1.5.x或1.6.x。https://www.google.com/url?sa=i&rct=j&q=&esrc=s&source=images&cd=&cad=rja&uact=8&ved=2ahUKEwjyiczO26fdAhVHCKwKHTgACQEQjRx6BAgBEAU&url=https%3A%2F%2Fcommunity.hortonworks.com%2Fquestions%2F67659%2Fwhat-are-the-important-metrics-to-notice-for-each.html&psig=AOvVaw1cfVaeYdVnmWpa7t2uOym8&ust=1536369025933899 - Ajay Guyyala
@AjayGuyyala,你能从Spark UI中获取这些数据吗?我也有同样的需求,需要从Spark UI中提取一些有用的数据到我的Java代码中。 - Akash Patel
1个回答

10

找到答案了。你可以通过使用SparkListener获取统计信息。

如果您的作业没有输入或输出指标,您可能会收到None.get异常,您可以通过提供if语句来安全地忽略它们。

sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.get.recordsRead}
    if(metrics.outputMetrics != None){
      outputWritten += metrics.outputMetrics.get.recordsWritten }
  }
})
请看以下示例。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

val conf = new SparkConf()
.set("spark.cassandra.connection.host", "...")
.set("spark.driver.allowMultipleContexts","true")
.set("spark.master","spark://....:7077")
.set("spark.driver.memory","1g")
.set("spark.executor.memory","10g")
.set("spark.shuffle.spill","true")
.set("spark.shuffle.memoryFraction","0.2")
.setAppName("CassandraTest")
sc.stop
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

var outputWritten = 0L

sc.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val metrics = taskEnd.taskMetrics
    if(metrics.inputMetrics != None){
      inputRecords += metrics.inputMetrics.get.recordsRead}
    if(metrics.outputMetrics != None){
      outputWritten += metrics.outputMetrics.get.recordsWritten }
  }
})

val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))

println("outputWritten",outputWritten)

结果:

scala> println("outputWritten",outputWritten)
(outputWritten,16383)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接