我不确定我们是否可以在Spark中测试RDD。
我看到一篇文章说模拟RDD并不是一个好主意。那么,有没有其他方法或最佳实践来测试RDD?
我不确定我们是否可以在Spark中测试RDD。
我看到一篇文章说模拟RDD并不是一个好主意。那么,有没有其他方法或最佳实践来测试RDD?
RDD
单元测试是不可能的。但是,在开发分析时,仍然有单元测试的位置。rdd.map(foo).map(bar)
foo
和bar
是简单的函数。它们可以像普通方式一样进行单元测试,并且应该使用尽可能多的边角案例进行测试。毕竟,它们不关心输入来自哪里,无论是测试装置还是RDD
。
不要忘记Spark Shell
这并不是测试“本身”,但在这些早期阶段,您还应该在Spark shell中进行实验,以了解您的转换和特别是方法的后果。例如,您可以使用许多不同的函数(如toDebugString
、explain
、glom
、show
、printSchema
等)检查物理和逻辑查询计划、分区策略和保留以及数据状态。我会让您自行探索这些内容。local[2]
,以确定仅在开始分发工作后才可能出现的任何问题。
使用Spark进行集成测试
现在进入有趣的部分。class MySpec extends WordSpec with Matchers with SparkContextSetup {
"My analytics" should {
"calculate the right thing" in withSparkContext { (sparkContext) =>
val data = Seq(...)
val rdd = sparkContext.parallelize(data)
val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
total shouldBe 1000
}
}
}
trait SparkContextSetup {
def withSparkContext(testMethod: (SparkContext) => Any) {
val conf = new SparkConf()
.setMaster("local")
.setAppName("Spark test")
val sparkContext = new SparkContext(conf)
try {
testMethod(sparkContext)
}
finally sparkContext.stop()
}
}
SparkContext
“借”给测试,然后在完成后处理它。HDFSClusterLike
和YARNClusterLike
)混合使用。将这些特性混合在一起可以真正节省大量设置痛苦。另一个它发挥作用的地方是使用类似于Scalacheck的属性和生成器。但同样,我个人会等到我的分析和测试达到那种复杂程度之前再使用它。
使用Spark Streaming进行集成测试
最后,我想展示一下使用内存值进行SparkStreaming集成测试设置的代码片段:val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd
DStream
使用。大部分都是与 Spark API 一起使用的样板设置。有两种测试Spark RDD / 应用程序的方法。它们如下:
例如:
需要测试的单元:
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
class WordCount {
def get(url: String, sc: SparkContext): RDD[(String, Int)] = {
val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
}
}
现在测试它的方法1如下:
import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
class WordCountTest extends FunSuite with BeforeAndAfterAll {
private var sparkConf: SparkConf = _
private var sc: SparkContext = _
override def beforeAll() {
sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local")
sc = new SparkContext(sparkConf)
}
private val wordCount = new WordCount
test("get word count rdd") {
val result = wordCount.get("file.txt", sc)
assert(result.take(10).length === 10)
}
override def afterAll() {
sc.stop()
}
}
WordCount
类的行为。但是在这里,我们必须自己管理SparkContext的创建和销毁。因此,如果您不想为此编写额外的代码,则可以使用spark-testing-base,如下所示:
方法2:
import org.scalatest.FunSuite
import com.holdenkarau.spark.testing.SharedSparkContext
class WordCountTest extends FunSuite with SharedSparkContext {
private val wordCount = new WordCount
test("get word count rdd") {
val result = wordCount.get("file.txt", sc)
assert(result.take(10).length === 10)
}
}
或者
import org.scalatest.FunSuite
import com.holdenkarau.spark.testing.SharedSparkContext
import com.holdenkarau.spark.testing.RDDComparisons
class WordCountTest extends FunSuite with SharedSparkContext with RDDComparisons {
private val wordCount = new WordCount
test("get word count rdd with comparison") {
val expected = sc.textFile("file.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
val result = wordCount.get("file.txt", sc)
assert(compareRDD(expected, result).isEmpty)
}
}
了解有关Spark RDD测试的更多详细信息,请参阅此处 - KnolX: Spark应用程序的单元测试