如何测试Spark RDD

6

我不确定我们是否可以在Spark中测试RDD。

我看到一篇文章说模拟RDD并不是一个好主意。那么,有没有其他方法或最佳实践来测试RDD?


2
你看过Holden的spark-test-base了吗? - Pushkr
2个回答

7
感谢您提出这个优秀的问题。不知何故,当涉及到Spark时,每个人都会陷入分析中,忘记了在过去15年左右出现的优秀软件工程实践。这就是为什么我们在课程中强调测试和持续集成(以及其他像DevOps一样的事情)的原因。
在我继续之前,我必须对@himanshuIIITian引用的KnolX演示中的术语表达一些不同意见。真正的单元测试意味着您完全控制测试中的每个组件。不能与数据库、REST调用、文件系统甚至系统时钟交互;如Gerard Mezaros在{{link1:xUnit Test Patterns}}中所说,一切都必须被“加倍”(例如模拟、存根等)。我知道这似乎是语义问题,但它确实很重要。不理解这一点是连续集成中看到间歇性测试失败的一个主要原因。
我们仍然可以进行单元测试。
因此,对于 RDD 单元测试是不可能的。但是,在开发分析时,仍然有单元测试的位置。
(注意:我将使用Scala作为示例,但概念超越语言和框架。)
考虑一个简单的操作:
rdd.map(foo).map(bar)

在这里,foobar是简单的函数。它们可以像普通方式一样进行单元测试,并且应该使用尽可能多的边角案例进行测试。毕竟,它们不关心输入来自哪里,无论是测试装置还是RDD不要忘记Spark Shell 这并不是测试“本身”,但在这些早期阶段,您还应该在Spark shell中进行实验,以了解您的转换和特别是方法的后果。例如,您可以使用许多不同的函数(如toDebugStringexplainglomshowprintSchema等)检查物理和逻辑查询计划、分区策略和保留以及数据状态。我会让您自行探索这些内容。
您还可以将Spark shell和测试中的主节点设置为local[2],以确定仅在开始分发工作后才可能出现的任何问题。 使用Spark进行集成测试 现在进入有趣的部分。
为了在您对辅助函数和RDD/DataFrame转换逻辑的质量感到自信之后,集成测试Spark是至关重要的。无论使用何种构建工具和测试框架,必须执行以下几个步骤:
  • 增加JVM内存。
  • 启用分叉但禁用并行执行。
  • 使用测试框架将您的Spark集成测试累积到套件中,并在所有测试之前初始化SparkContext,在所有测试之后停止它。
有几种方法可以做到最后一点。其中一种方法来自spark-testing-base,由@Pushkr和@himanshuIIITian链接的KnolX演示文稿都引用了它。 借贷模式 另一种方法是使用借贷模式
例如(使用ScalaTest):
class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

正如您所看到的,借贷模式利用高阶函数将SparkContext“借”给测试,然后在完成后处理它。
痛苦导向编程(感谢Nathan)
这完全是个人喜好,但我更喜欢使用借贷模式并尽可能自己连接一切,而不是引入另一个框架。除了试图保持轻量级之外,框架有时会添加很多“魔法”,使得调试测试失败难以理解。因此,我采取痛苦导向编程的方法——在没有它的痛苦变得难以忍受之前避免添加新框架。但是,这取决于您自己。
现在,spark-testing-base真正发挥作用的地方是与基于Hadoop的助手(如HDFSClusterLikeYARNClusterLike)混合使用。将这些特性混合在一起可以真正节省大量设置痛苦。另一个它发挥作用的地方是使用类似于Scalacheck的属性和生成器。但同样,我个人会等到我的分析和测试达到那种复杂程度之前再使用它。 使用Spark Streaming进行集成测试 最后,我想展示一下使用内存值进行SparkStreaming集成测试设置的代码片段:
val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

这比看起来简单得多。实际上,它只是将一系列数据转换成队列以供 DStream 使用。大部分都是与 Spark API 一起使用的样板设置。
这可能是我写过最长的帖子,所以就写到这里吧。我希望其他人能提出其他想法,以帮助改进我们的分析质量,采用同样的敏捷软件工程实践,这些实践已经改进了所有其他应用程序开发。
还有,很抱歉打广告,您可以查看我们的课程 Analytics with Apache Spark,在那里我们讨论了很多这些想法和更多内容。我们希望很快会有一个在线版本。

2

有两种测试Spark RDD / 应用程序的方法。它们如下:

例如

需要测试的单元

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 

class WordCount { 
  def get(url: String, sc: SparkContext): RDD[(String, Int)] = { 
    val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) 
  } 
}

现在测试它的方法1如下:

import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

class WordCountTest extends FunSuite with BeforeAndAfterAll { 
  private var sparkConf: SparkConf = _ 
  private var sc: SparkContext = _ 

  override def beforeAll() { 
    sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local") 
    sc = new SparkContext(sparkConf) 
  } 

  private val wordCount = new WordCount 

  test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10)
   } 

  override def afterAll() { 
    sc.stop() 
  } 
}

在方法1中,我们并没有模拟RDD。我们只是检查我们的WordCount类的行为。但是在这里,我们必须自己管理SparkContext的创建和销毁。因此,如果您不想为此编写额外的代码,则可以使用spark-testing-base,如下所示: 方法2:
import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 

class WordCountTest extends FunSuite with SharedSparkContext { 
  private val wordCount = new WordCount 

  test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc)
    assert(result.take(10).length === 10) 
  } 
}

或者

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 
import com.holdenkarau.spark.testing.RDDComparisons 

class WordCountTest extends FunSuite with SharedSparkContext with RDDComparisons { 
  private val wordCount = new WordCount 

  test("get word count rdd with comparison") { 
    val expected = sc.textFile("file.txt")
                     .flatMap(_.split(" "))
                     .map((_, 1))
                     .reduceByKey(_ + _) 

    val result = wordCount.get("file.txt", sc)

    assert(compareRDD(expected, result).isEmpty)
   } 
}

了解有关Spark RDD测试的更多详细信息,请参阅此处 - KnolX: Spark应用程序的单元测试


我的程序很小,所以我正在尝试使用第一种方法。但是,你(Himanshu)在方法1中展示的并不是比较RDD。你正在对RDD执行操作,然后尝试将其与整数值相等。我想要比较两个RDD…假设RDD[myClass] === RDD[myClass] - AJm
在比较RDD时,应使用“RDDComparisons”,该方法在第二种方法中提到。 - himanshuIIITian
但是这是通过使用由某人开发的自定义库完成的,该库仍在开发中,并不像Apache那样处于任何大型Umbrella之下。它可能也不准备投入生产。 - AJm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接