如何在Eclipse/Intellij IDE中运行简单的Spark应用程序?

11

在将我的MapReduce任务实际部署到Hadoop之前,我为了便于开发,在使用自己编写的简单MapReducer 进行测试:

object mapreduce {
  import scala.collection.JavaConversions._

  val intermediate = new java.util.HashMap[String, java.util.List[Int]]
                                                  //> intermediate  : java.util.HashMap[String,java.util.List[Int]] = {}
  val result = new java.util.ArrayList[Int]       //> result  : java.util.ArrayList[Int] = []

  def emitIntermediate(key: String, value: Int) {
    if (!intermediate.containsKey(key)) {
      intermediate.put(key, new java.util.ArrayList)
    }
    intermediate.get(key).add(value)
  }                                               //> emitIntermediate: (key: String, value: Int)Unit

  def emit(value: Int) {
    println("value is " + value)
    result.add(value)
  }                                               //> emit: (value: Int)Unit

  def execute(data: java.util.List[String], mapper: String => Unit, reducer: (String, java.util.List[Int]) => Unit) {

    for (line <- data) {
      mapper(line)
    }

    for (keyVal <- intermediate) {
      reducer(keyVal._1, intermediate.get(keyVal._1))
    }

    for (item <- result) {
      println(item)
    }
  }                                               //> execute: (data: java.util.List[String], mapper: String => Unit, reducer: (St
                                                  //| ring, java.util.List[Int]) => Unit)Unit

  def mapper(record: String) {
    var jsonAttributes = com.nebhale.jsonpath.JsonPath.read("$", record, classOf[java.util.ArrayList[String]])
    println("jsonAttributes are " + jsonAttributes)
    var key = jsonAttributes.get(0)
    var value = jsonAttributes.get(1)

    println("key is " + key)
    var delims = "[ ]+";
    var words = value.split(delims);
    for (w <- words) {
      emitIntermediate(w, 1)
    }
  }                                               //> mapper: (record: String)Unit

  def reducer(key: String, listOfValues: java.util.List[Int]) = {
    var total = 0
    for (value <- listOfValues) {
      total += value;
    }

    emit(total)
  }                                               //> reducer: (key: String, listOfValues: java.util.List[Int])Unit
  var dataToProcess = new java.util.ArrayList[String]
                                                  //> dataToProcess  : java.util.ArrayList[String] = []
  dataToProcess.add("[\"test1\" , \"test1 here is another test1 test1 \"]")
                                                  //> res0: Boolean = true
  dataToProcess.add("[\"test2\" , \"test2 here is another test2 test1 \"]")
                                                  //> res1: Boolean = true

  execute(dataToProcess, mapper, reducer)         //> jsonAttributes are [test1, test1 here is another test1 test1 ]
                                                  //| key is test1
                                                  //| jsonAttributes are [test2, test2 here is another test2 test1 ]
                                                  //| key is test2
                                                  //| value is 2
                                                  //| value is 2
                                                  //| value is 4
                                                  //| value is 2
                                                  //| value is 2
                                                  //| 2
                                                  //| 2
                                                  //| 4
                                                  //| 2
                                                  //| 2


  for (keyValue <- intermediate) {
      println(keyValue._1 + "->"+keyValue._2.size)//> another->2
                                                  //| is->2
                                                  //| test1->4
                                                  //| here->2
                                                  //| test2->2
   }


}

这使我能够在Windows上的Eclipse IDE中运行我的mapreduce任务,然后再部署到实际的Hadoop集群中。我希望能够执行类似的操作来测试Spark代码,或者具备在Eclipse中编写Spark代码并进行测试的能力,然后再部署到Spark集群中。Spark是否支持此类操作?由于Spark是运行在Hadoop之上的,这是否意味着我必须首先安装Hadoop才能运行Spark?换句话说,我可以仅使用Spark库来运行代码吗?

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
      List("target/scala-2.10/simple-project_2.10-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

如果是这样,我需要在我的项目中包含哪些Spark库?

翻译来源:https://spark.apache.org/docs/0.9.0/quick-start.html#a-standalone-app-in-scala


1
spark.apache.org 指出 spark-core_2.10,版本为0.9.0-incubating。我建议从这个开始,以及它的依赖项。你可以在这里找到它:http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/0.9.0-incubating,或者在这里:http://search.maven.org/#browse|-183575761。如果你没有使用依赖管理插件创建你的eclipse项目,你就必须自己下载依赖项。其中一个依赖项似乎是hadoop-client。 - n0741337
1个回答

2
请将以下内容添加到您的build.sbt文件中:libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1"并确保您的scalaVersion已设置(例如:scalaVersion := "2.10.3")。
如果您只是在本地运行程序,则可以跳过SparkContext的最后两个参数,如下所示:val sc = new SparkContext("local", "Simple App") 最后,Spark可以在Hadoop上运行,也可以以独立模式运行。请参阅:https://spark.apache.org/docs/0.9.1/spark-standalone.html

您还需要使用 sbteclipse 重新生成 Eclipse 项目,并可能在 Eclipse 中刷新项目。 - Iulian Dragos
5
这是否可以直接工作?难道你不需要打包一个jar文件并将其提交给spark-submit吗? - Neil

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接