并行计算,在dplyr中有哪些可以替代tidyr::complete的方法?

12

我正在尝试并行化一个管道。

管道中有一个tidyr命令(“tidyr::complete”)。当在并行环境下运行时,该命令会导致代码出现错误,因为对象类别无法被识别。

dplyr是否有完整的替代方案?

library(dplyr)
library(tidyr)
library(zoo)


test <- tibble(year=c(1,2,3,4,5,5,1,4,5),
               var_1=c(1,1,1,1,1,1,2,2,2), 
               var_2=c(1,1,1,1,1,2,3,3,3), 
               var_3=c(0,5,NA,15,20,NA,1,NA,NA))

max_year <- max(test$year,na.rm = T)
min_year <- min(test$year,na.rm = T)

串行


test_serial <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1, year = seq(min_year,max_year)) %>%
  mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))


并行(此处失败)

devtools::install_github("hadley/multidplyr")
library(multidplyr)

cl <- new_cluster(2)
cluster_copy(cl, c("test","max_year","min_year"))
cluster_library(cl, c("dplyr","tidyr","zoo"))

test_parallel <- test %>% group_by(var_1,var_2) %>% partition(cl)
test_parallel <- test_parallel %>% 
  dplyr::group_by(var_1,var_2) %>% 
  tidyr::complete(var_1, year = seq(min_year,max_year)) %>%
  dplyr::mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) %>% 
  collect()

这是错误信息。

Error in UseMethod("complete_") : 
  no applicable method for 'complete_' applied to an object of class "multidplyr_party_df"
1个回答

11
Multidplyr使您能够:
  1. 使用partition()将数据分割
  2. 在专用节点上处理每个分区
  3. collect()结果
并非所有数据处理任务都适用于先前的工作流程。 特别是,complete需要知道输入数据中的所有可能值,以便创建缺失的行,这意味着无法将此操作整体拆分,因此没有可用的方法。 在您提供的示例中,每个节点将只接收一个var_1, var_2对,而不知道其他节点得到的内容,这不允许以并行方式实现预期的结果。 但是,由于您已经知道year=seq(min_year,max_year),因此您可以仅针对此变量并行执行complete任务,通过var_1拆分任务,例如使用furrr包:
library(furrr)
plan(multiprocess)
test_parallel <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1) %>% split(.$var_1) %>% 
  furrr::future_map(~{
    complete(.x, year = seq(min_year,max_year)) %>%
    dplyr::mutate(
        var_3 = na.approx(var_3,na.rm = FALSE),
        var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) 
    }) %>% bind_rows()

> identical(c(test_serial$var_1,test_serial$var_2,test_serial$var_3,test_serial$year),
+           c(test_parallel$var_1,test_parallel$var_2,test_parallel$var_3,test_parallel$year))
[1] TRUE

需要在更大的数据集上进行测试以衡量可能的性能改进。


但是,如果我能够为每个节点分配一个完整的组(例如var_1),那么我就可以使用complete了吗? - MCS
我收到了以下警告:#警告消息:[一次性警告]从RStudio运行R时,未来(> = 1.13.0)禁用了分叉处理(“multicore”),因为它被认为是不稳定的。因此,plan(“multicore”)将退回到plan(“sequential”),而plan(“multiprocess”)将退回到plan(“multisession”)-而不是像过去一样计划(“multicore”)。有关如何控制分叉处理或不控制以及如何在将来的R会话中消除此警告的更多详细信息,请参见?future :: supportsMulticore。 - MCS
作为证明,system.time() 显示 furrr 解决方案比标准解决方案慢了 4 倍。 - MCS
4
在小例子数据集上,多任务处理通常会更慢,因为你需要打开任务、传输数据并收集结果,这对于15行数据来说比直接处理数据要花费更多的时间。参见我最后一句话的内容;-) - Waldi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接