如何在SparkR中构建逻辑回归模型

3

我是一名新手,对Spark和SparkR都不太熟悉。我已经成功安装了Spark和SparkR。

当我尝试使用R和Spark在HDFS中存储的csv文件上构建逻辑回归模型时,出现了“维度不正确”的错误。

我的代码如下:

points <- cache(lapplyPartition(textFile(sc, "hdfs://localhost:54310/Henry/data.csv"), readPartition))

collect(points)

w <- runif(n=D, min = -1, max = 1)

cat("Initial w: ", w, "\n")

# Compute logistic regression gradient for a matrix of data points
gradient <- function(partition) {
  partition = partition[[1]]
  Y <- partition[, 1] # point labels (first column of input file)
  X <- partition[, -1] # point coordinates
  # For each point (x, y), compute gradient function

  dot <- X %*% w
  logit <- 1 / (1 + exp(-Y * dot))
  grad <- t(X) %*% ((logit - 1) * Y)
  list(grad)
}


for (i in 1:iterations) {
  cat("On iteration ", i, "\n")
  w <- w - reduce(lapplyPartition(points, gradient), "+")
}

错误信息为:
On iteration  1 
Error in partition[, 1] : incorrect number of dimensions
Calls: do.call ... func -> FUN -> FUN -> Reduce -> <Anonymous> -> FUN -> FUN
Execution halted
14/09/27 01:38:13 ERROR Executor: Exception in task 0.0 in stage 181.0 (TID 189)
java.lang.NullPointerException
    at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:701)
14/09/27 01:38:13 WARN TaskSetManager: Lost task 0.0 in stage 181.0 (TID 189, localhost): java.lang.NullPointerException: 
        edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:701)
14/09/27 01:38:13 ERROR TaskSetManager: Task 0 in stage 181.0 failed 1 times; aborting job
Error in .jcall(getJRDD(rdd), "Ljava/util/List;", "collect") : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 181.0 failed 1 times, most recent failure: Lost task 0.0 in stage 181.0 (TID 189, localhost): java.lang.NullPointerException: edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:125) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:701) Driver stacktrace:

数据的维度(样本):

data <- read.csv("/home/Henry/data.csv")

dim(data)

[1] 17,541

这个错误可能的原因是什么?


我认为您忘记展示 dim(data) 的结果了。 - x4nd3r
@voidHead,我已经添加了dim(data)的输出结果。 - Hanry
1个回答

0
问题在于textFile()读取一些文本数据并返回一个分布式的字符串集合,其中每个字符串对应于文本文件的一行。因此,在程序的后面,partition[, -1]会失败。程序的真正意图似乎是将points视为分布式数据框的集合。我们正在努力为SparkR提供数据框支持(SPARKR-1)。
要解决这个问题,只需使用字符串操作来正确提取XY即可操纵你的partition。其他一些方法包括(我认为你可能以前见过)从一开始就生成不同类型的分布式集合,就像这里所做的那样:examples/logistic_regression.R

同时,reduce和lapplyPartition将从接口中删除(https://issues.apache.org/jira/browse/SPARK-7230),因此当DataFrames可用时,该程序将完全无法使用。 - piccolbo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接