使用朴素贝叶斯实现Spark作业时出现NullPointerException错误

3
我正在尝试在Spark上实现朴素贝叶斯,但它的效果不好。可能是由于临时函数中的这一行代码引起的:each_prob.map(_.take(1))。如果您对此问题有任何想法,请帮助我...
这是我的主要函数;
object DoNaive extends App{

val file_pathes = 
    Vector( ("plus","resource/doc1.txt"),
            ("plus","resource/doc2.txt"),
            ("plus","resource/doc3.txt"),
            ("minus","resource/doc4.txt"),
            ("minus","resource/doc5.txt"),
            ("minus","resource/doc6.txt")
          )

val pn = ParallelNaive(file_pathes)     

val cached_rdd = read.rdds("resource/examine.txt")  

val each_prob : Vector[RDD[String]] =
    pn.allClassNames.map{   
        class_name =>   
                cached_rdd
                    .map { elt => ( pn.eachProbWord(elt._1 , class_name ) * elt._2 ).toString }
        }

val head_prob = each_prob.head

println(pn.docs.map(elt=>elt._2.take(1).head))




pn.temp("resource/examine.txt")}

这是ParallelNaive类。temp函数只是为了找到问题:

case class ParallelNaive( file_pathes : Vector[(String,String)] ) extends Serializable {

val docs:Vector[(String ,RDD[(String,Int)])] = file_pathes.map( class_path => ( class_path._1 , read.rdds(class_path._2,false) ) )

val wholeClass :Map[String,Vector[(String,RDD[(String,Int)])]] = docs.groupBy(elt=>elt._1)

val allClassNames:Vector[String] = wholeClass.map(elt=>elt._1).toVector

val eachNumDocs:Map[String,Int] = wholeClass.map(elt=>(elt._1,elt._2.length))

val sumNumDocs:Int = docs.size

def eachNumWord(word:String , class_name:String ):Int = {


    var doc_count = 0

    wholeClass(class_name).foreach{
        class_rdd =>  // == (class,rdd)
            val filtered = class_rdd._2.filter{word_occur=> word_occur._1==word}.take(1)
            if(filtered.size!=0) doc_count += 1
    }

    doc_count
}

def eachProbWord(word:String , class_name:String , alpha:Int = 2):Double={

    val Nwc = eachNumWord(word , class_name).toDouble
    val Nc = eachNumDocs(class_name).toDouble

    log( ( Nwc+(alpha-1) ) / ( Nc + 2*(alpha-1) ) )
}

def eachProbClass(class_name:String):Double={

    val Nc = eachNumDocs(class_name).toDouble

    log( ( Nc+1 ) / ( sumNumDocs + NumClass ) )

}

val NumClass = wholeClass.size

def temp(doc_path:String) ={

    val cached_rdd = read.rdds(doc_path)

    val each_prob : Vector[RDD[Double]] =
        allClassNames.map{
            class_name =>   
                    cached_rdd
                        .map { elt => eachProbWord(elt._1 , class_name ) * elt._2 }
        }

    each_prob.map(_.take(1))

}


def classify(doc_path:String , alpha:Int = 2 ) = {

    val cached_rdd = read.rdds(doc_path)    //何度も使うのでcache化

    val ProbPerClass = 
        allClassNames.map{
            class_name =>                   

                val each_prob = 
                    cached_rdd.map { elt => eachProbWord(elt._1 , class_name , alpha) * elt._2 }

                val sum_prob : Double = each_prob.reduce{ (a,b) => a+b } 

                sum_prob + eachProbClass(class_name)
            }   
    //list of probability that this document would belong to
    println(" max_class---------------------------------------------")

    println("ProbPerClass : "+ProbPerClass)

    val max_class : (Double,Int) = ProbPerClass.zipWithIndex.max
    // ( probability , index of the class )
    println(" return estimation class---------------------------------------------")

    allClassNames(max_class._2)

}

}

got error like this ;

14/04/06 13:55:50 INFO scheduler.TaskSchedulerImpl: Adding task set 16.0 with 1 tasks
14/04/06 13:55:50 INFO scheduler.TaskSetManager: Starting task 16.0:0 as TID 15 on executor localhost: localhost (PROCESS_LOCAL)
14/04/06 13:55:50 INFO scheduler.TaskSetManager: Serialized task 16.0:0 as 2941 bytes in 0 ms
14/04/06 13:55:50 INFO executor.Executor: Running task ID 15
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_7 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_1 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_2 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_3 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_4 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_5 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block rdd_57_0 locally
eachProbWord---------------------------------------------
eachNumWord---------------------------------------------
14/04/06 13:55:50 ERROR executor.Executor: Exception in task ID 15
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
at supervised.ParallelNaive$$anonfun$eachNumWord$1.apply(ParallelNaive.scala:38)
at supervised.ParallelNaive$$anonfun$eachNumWord$1.apply(ParallelNaive.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at supervised.ParallelNaive.eachNumWord(ParallelNaive.scala:36)
at supervised.ParallelNaive.eachProbWord(ParallelNaive.scala:50)
at supervised.ParallelNaive$$anonfun$7$$anonfun$apply$1.apply(ParallelNaive.scala:84)
at supervised.ParallelNaive$$anonfun$7$$anonfun$apply$1.apply(ParallelNaive.scala:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:844)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:844)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/04/06 13:55:50 WARN scheduler.TaskSetManager: Lost TID 15 (task 16.0:0)
14/04/06 13:55:50 WARN scheduler.TaskSetManager: Loss was due to                 java.lang.NullPointerException
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
at supervised.ParallelNaive$$anonfun$eachNumWord$1.apply(ParallelNaive.scala:38)
at supervised.ParallelNaive$$anonfun$eachNumWord$1.apply(ParallelNaive.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at supervised.ParallelNaive.eachNumWord(ParallelNaive.scala:36)
at supervised.ParallelNaive.eachProbWord(ParallelNaive.scala:50)
at supervised.ParallelNaive$$anonfun$7$$anonfun$apply$1.apply(ParallelNaive.scala:84)
at supervised.ParallelNaive$$anonfun$7$$anonfun$apply$1.apply(ParallelNaive.scala:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:844)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:844)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/04/06 13:55:50 ERROR scheduler.TaskSetManager: Task 16.0:0 failed 1 times; aborting job
14/04/06 13:55:50 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 16.0 from pool 
14/04/06 13:55:50 INFO scheduler.DAGScheduler: Failed to run take at ParallelNaive.scala:89
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task 16.0:0     failed 1 times (most recent failure: Exception failure:     java.lang.NullPointerException)
org.apache.spark.SparkException: Job aborted: Task 16.0:0 failed 1 times (most recent failure: Exception failure: java.lang.NullPointerException)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[trace] Stack trace suppressed: run last compile:run for the full output.
14/04/06 13:55:50 INFO network.ConnectionManager: Selector thread was interrupted!
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 47 s, completed 2014/04/06 13:55:50

不确定,但是可能是因为文件找不到了吗?第37行的代码是什么? - om-nom-nom
请在261行处发布RDD.scala的源代码。毕竟,Spark有无数个不同的版本。在我使用的版本中,第261行在注释内;) - WestCoastProjects
1个回答

1

嗯,代码格式不好,而且代码量很大,实际上可以用几行代码解决的问题。所以有点难理解正在发生什么。但是我认为我们可以将问题缩小到这一行代码:

val filtered = class_rdd._2.filter{word_occur=> word_occur._1==word}.take(1)

如果写成下面这样会更易读:

val filtered = classRdd._2.filter(_._1 == word).take(1)

无论如何,你遇到了NPE错误,因此你的RDD class_rdd 包含空元组。这不是Spark的问题,而是你的代码存在问题。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接