使用text8文件的Spark Word2Vec示例

4

我正在尝试运行apache.spark.org上的此示例(代码如下,整个教程在此处:https://spark.apache.org/docs/latest/mllib-feature-extraction.html),使用他们网站上引用的text8文件(http://mattmahoney.net/dc/text8.zip):

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("/Users/rkita/Documents/Learning/random/spark/MLlib/examples/text8",4).map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("china", 40)

for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}

// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")

我正在我的Mac上使用Spark(2核,8GB内存),我认为我已经在spark-env.sh文件中正确设置了内存分配,如下所示:

export SPARK_EXECUTOR_MEMORY=4g
export SPARK_WORKER_MEMORY=4g

当我尝试拟合模型时,我一直收到Java堆错误。我在Python中也得到了相同的结果。我使用JAVA_OPTS增加了Java内存大小。
文件只有100MB,所以我认为我的内存设置可能不正确,但我不确定这是否是根本原因。
有人在笔记本电脑上尝试过这个例子吗?
我不能将文件放在我们公司的服务器上,因为我们不应该导入外部数据,所以我只能在个人笔记本电脑上工作。如果您有任何建议,我会感激听到。谢谢!

1
这个文件的问题在于它只有一行。这意味着你试图将整行内容放入一个单一的数据字段中。 - eliasah
这不是将其标记化吗?.map(line => line.split(" ").toSeq) - anonygrits
没有令牌化的意义。也许在句点处分割更具表现力。 - eliasah
我也遇到了同样的问题。我尝试将该文件拆分为行,但是仍然出现相同的错误。我正在使用Spark 1.4.1。 - alhuelamo
2个回答

2
首先,我是一个Spark的新手,所以其他人可能有更快或更好的解决方案。我遇到了同样的困难来运行这个示例代码。我主要通过以下方式使其工作:
  1. 在我的机器上运行自己的Spark集群:使用您的Spark安装目录中/sbin/目录中的启动脚本。为此,您必须根据自己的需要配置conf/spark-env.sh文件。不要使用127.0.0.1 IP地址来连接Spark。
  2. 编译和打包Scala代码成jar包(sbt package),然后将其提供给集群(请参见Scala代码中的addJar(...))。似乎可以使用类路径/额外类路径向Spark提供编译后的代码,但我还没有尝试过。
  3. 设置执行程序内存和驱动程序内存(请参见Scala代码)

spark-env.sh:

export SPARK_MASTER_IP=192.168.1.53
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080

export SPARK_DAEMON_MEMORY=1G
# Worker : 1 by server
# Number of worker instances to run on each machine (default: 1). 
# You can make this more than 1 if you have have very large machines and would like multiple Spark worker processes. 
# If you do set this, make sure to also set SPARK_WORKER_CORES explicitly to limit the cores per worker, 
# or else each worker will try to use all the cores.
export SPARK_WORKER_INSTANCES=2
# Total number of cores to allow Spark applications to use on the machine (default: all available cores).
export SPARK_WORKER_CORES=7

#Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g 
# (default: total memory minus 1 GB); 
# note that each application's individual memory is configured using its spark.executor.memory property.
export SPARK_WORKER_MEMORY=8G
export SPARK_WORKER_DIR=/tmp

# Executor : 1 by application run on the server
# export SPARK_EXECUTOR_INSTANCES=4
# export SPARK_EXECUTOR_MEMORY=4G

export SPARK_SCALA_VERSION="2.10"

运行示例所需的Scala文件:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

object SparkDemo {

  def log[A](key:String)(job : =>A) = {
    val start = System.currentTimeMillis
    val output = job
    println("===> %s in %s seconds"
      .format(key, (System.currentTimeMillis - start) / 1000.0))
    output
  }

  def main(args: Array[String]):Unit ={

    val modelName ="w2vModel"

    val sc = new SparkContext(
      new SparkConf()
      .setAppName("SparkDemo")
      .set("spark.executor.memory", "8G")
      .set("spark.driver.maxResultSize", "16G")
      .setMaster("spark://192.168.1.53:7077") // ip of the spark master.
      // .setMaster("local[2]") // does not work... workers loose contact with the master after 120s
    )

    // take a look into target folder if you are unsure how the jar is named
    // onliner to compile / run : sbt package && sbt run
    sc.addJar("./target/scala-2.10/sparkling_2.10-0.1.jar")

    val input = sc.textFile("./text8").map(line => line.split(" ").toSeq)

    val word2vec = new Word2Vec()

    val model = log("compute model") { word2vec.fit(input) }
    log ("save model") { model.save(sc, modelName) }

    val synonyms = model.findSynonyms("china", 40)
    for((synonym, cosineSimilarity) <- synonyms) {
      println(s"$synonym $cosineSimilarity")
    }

    val model2 = log("reload model") { Word2VecModel.load(sc, modelName) }
  }
}

0

sc.textFile 只会按照换行符进行分割,而 text8 文件中并没有换行符。

你正在创建一个只有一行的 RDD。 .map(line => line.split(" ").toSeq) 会创建另一个类型为 RDD[Seq[String]] 的只有一行的 RDD。

Word2Vec 最好每行只处理一个句子(这样也可以避免 Java 堆错误)。不幸的是,text8 中已经去掉了句号,因此不能直接按照句号进行分割。但是你可以在 这里 找到原始版本以及用于处理它的 Perl 脚本,编辑脚本以保留句号并不难。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接