如何在Spark中使用稀疏矩阵训练随机森林?

5
考虑这个使用 sparklyr 的简单例子:
library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source:   table<mytext_spark> [?? x 3]
# Database: spark_connection
   text                                                                    book                label
   <chr>                                                                   <chr>               <int>
 1 SENSE AND SENSIBILITY                                                   Sense & Sensibility     0
 2 ""                                                                      Sense & Sensibility     0
 3 by Jane Austen                                                          Sense & Sensibility     0
 4 ""                                                                      Sense & Sensibility     0
 5 (1811)                                                                  Sense & Sensibility     0
 6 ""                                                                      Sense & Sensibility     0
 7 ""                                                                      Sense & Sensibility     0
 8 ""                                                                      Sense & Sensibility     0
 9 ""                                                                      Sense & Sensibility     0
10 CHAPTER 1                                                               Sense & Sensibility     0
11 ""                                                                      Sense & Sensibility     0
12 ""                                                                      Sense & Sensibility     0
13 The family of Dashwood had long been settled in Sussex.  Their estate   Sense & Sensibility     0
14 was large, and their residence was at Norland Park, in the centre of    Sense & Sensibility     0
15 their property, where, for many generations, they had lived in so       Sense & Sensibility     0
16 respectable a manner as to engage the general good opinion of their     Sense & Sensibility     0

数据框的大小相对较小(约 70k 行和 14k 个唯一单词)。

现在,在我的集群上训练一个 朴素贝叶斯 模型只需要几秒钟。 首先,我定义了 管道

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

然后训练朴素贝叶斯模型

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
                                    expr      min       lq     mean   median       uq      max neval
 model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832     3

现在问题是,在同一个(实际上非常小的!!)数据集上尝试运行任何基于tree的模型(如随机森林提升树等)都无法正常工作。

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240,
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(

错误:org.apache.spark.SparkException: 由于阶段失败而中止作业:第69.0阶段中的任务0已失败4次,最近一次失败: 在第69.0阶段中丢失了任务0.3(TID 1580,1.1.1.1.1执行器5): java.lang.IllegalArgumentException:大小超过了Integer.MAX_VALUE

我认为这是由于标记的矩阵表示稀疏性导致的,但是有什么可以做的吗?这是sparklyr问题吗?还是spark问题?我的代码非高效吗?

谢谢!

2个回答

5
您收到此错误是因为您实际上正在遇到Spark中著名的2G限制,详情请参见https://issues.apache.org/jira/browse/SPARK-6235
解决方法是在将数据提供给算法之前重新分区数据。
这篇文章中有两个要点:
1.使用本地数据。
2.Spark中的基于树的模型需要大量内存。
因此,让我们检查一下您的代码,尽管它看起来无害。
 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>% 
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

那么最后一行代码是做什么的呢?

copy_to(不适用于大型数据集)实际上只是将本地 R 数据框复制到一个分区的 Spark DataFrame。

因此,在将数据送入 gbt 之前,您只需要重新分区数据,以确保管道准备数据时分区大小小于2GB。

因此,您只需执行以下操作即可重新分区数据:

# 20 is an arbitrary number I chose to test and it seems to work well in this case, 
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <- 
 copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% 
 sdf_repartition(partitions = 20)

PS1:max_memory_in_mb是您为gbt计算其统计数据提供的内存量。它与输入数据量没有直接关系。

PS2:如果您没有为执行程序设置足够的内存,可能会遇到java.lang.OutOfMemoryError:GC overhead limit exceeded

编辑:重新分区数据的含义是什么?

在讨论重新分区之前,我们可以始终参考“分区”的定义。我会尽量简短。

一个分区是大型分布式数据集的逻辑块。

Spark使用分区来管理数据,从而帮助并行化分布式数据处理并使发送数据到执行程序的网络流量最小化。 默认情况下,Spark尝试从靠近它的节点读取数据到RDD中。 由于Spark通常访问分布式分区数据,因此创建分区以保存数据块以优化转换操作。

增加分区数量将使每个分区拥有更少的数据(或者根本没有数据!)

来源:@JacekLaskowski摘自Mastering Apache Spark book

但是数据分区并不总是正确的,就像在这种情况下一样。因此需要重新分区(sparklyrsdf_repartition)。

sdf_repartition将使您的数据在节点之间散布和洗牌。即sdf_repartition(20)将创建20个分区的数据,而不是最初在此情况下只有1个的数据。

希望这可以帮助到您。

整个代码:

library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)

library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240, # this is amount of data that can be use for 
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)

pipeline3 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') 

model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7> 
#   Stages 
# |--1 RegexTokenizer (Transformer)
# |    <regex_tokenizer_1ce4342b543b> 
# |     (Parameters -- Column Names)
# |      input_col: text
# |      output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# |    <count_vectorizer_1ce4e0e6489> 
# |     (Parameters -- Column Names)
# |      input_col: mytoken
# |      output_col: finaltoken
# |     (Transformer Info)
# |      vocabulary: <list> 
# |--3 GBTClassificationModel (Transformer)
# |    <gbt_classifier_1ce41ab30213> 
# |     (Parameters -- Column Names)
# |      features_col: finaltoken
# |      label_col: label
# |      prediction_col: pcol
# |      probability_col: prcol
# |      raw_prediction_col: rpcol
# |     (Transformer Info)
# |      feature_importances:  num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ... 
# |      num_classes:  int 2 
# |      num_features:  int 39158 
# |      total_num_nodes:  int 540 
# |      tree_weights:  num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ... 
# |      trees: <list> 

数据在磁盘上不到6MB,在内存中为4.1MB。这距离2GB的限制非常遥远(即使考虑到Spark的效率不足)。只是说一下... :) - zero323
我同意,但出于某种原因,在一个分区中有250k个词汇会使日志文件达到2005mb的大小... - eliasah
我认为这并不是真正的核心问题。看看特征提取流程,思考决策树将如何派生。 - zero323
1
@ℕʘʘḆḽḘ 或许可以,但这是一个比较长的答案,不太适合在SO上回答。稍加调整后,我可以将模型放在单个虚拟核心/1GB内存上,尽管它花费了很长时间(大约8小时左右,对于5MB的数据来说并不令人满意)。如果没有人提供完全令人满意的答案,我会在本周晚些时候重新审视这个问题,并提供一些建议。 - zero323
1
@ℕʘʘḆḽḘ 你的集群设置怎么样?(内存、资源管理器等) - eliasah
显示剩余15条评论

0

请问您能提供完整的错误回溯吗?

我猜测您可能是内存不足了。随机森林和GBT树是集成模型,因此需要比朴素贝叶斯更多的内存和计算能力。

尝试重新分区数据(spark.sparkContext.defaultParallelism值是一个好的起点),以便每个工作节点获得更小、更均匀分布的数据块。

如果这样还不行,请尝试将max_memory_in_mb参数减少到256


不过等等,我有一个庞大的集群,而这个数据集相对较小。我不认为这可能是由于内存错误引起的。您能否请在您的环境中尝试一下,并告诉我您遇到了什么错误?为什么减少 max_memory_in_mb 对这里有用呢?谢谢。 - ℕʘʘḆḽḘ

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接