高效合并大型数据表

17

我有两个相当大的 data.table 对象需要合并。

  • dt1 具有 5 列,500 000 000 个观测值。
  • dt2 具有 2 列,300 000 个观测值。

这两个对象都有名为 id 的相同 key

我想将来自 dt2 的信息进行 left_joindt1 中。

例如:

dt1  <- data.table(id = c(1, 2, 3, 4),
               x1 = c(12, 13, 14, 15),
               x2 = c(5, 6, 7, 8),
               x3 = c(33, 44, 55, 66),
               x4 = c(123, 123, 123, 123))

dt2 <- data.table(id = c(1, 2, 3, 4),
              x5 = c(555, 666, 777, 888))
setkey(dt1, id)
setkey(dt2, id)

dt2[dt1, on="id"] 

> dt2[dt1, on="id"]
   id  x5 x1 x2 x3  x4
1:  1 555 12  5 33 123
2:  2 666 13  6 44 123
3:  3 777 14  7 55 123
4:  4 888 15  8 66 123

然而,当合并我的原始数据时,R 无法再分配内存。但是,合并的输出适合内存。

获得这个大型合并的最有效方法(速度 vs. 内存限制)是什么?

我们应该采用拆分-应用-合并吗?

我们应该使用数据库库完成此操作吗?

您会如何高效地完成此操作?


你在完整数据集上遇到了哪些具体的内存问题?你认为合并后的表格能否适应你的RAM,而R只是在合并过程中使用了太多的内存? - Marius
1
@Marius:合并后的表格可以轻松地适应我的RAM。但在合并过程中,它无法再分配内存。 - wake_wake
2
可能重复:使用data.table进行左连接 - Jaap
1
并不是上述提到的重复问题,因为OP正在寻找在资源限制下操作具有大型数据集的左连接的方法。 - zonfl
2个回答

15

键赋值应该节省内存。

dt1[dt2, on = "id", x5 := x5]

我们应该使用一个数据库库来完成这个任务吗?

这可能是个不错的主意。如果设置和使用数据库对你来说很痛苦,可以尝试使用RSQLite包。它非常简单。


我的实验

简短概述:针对一个玩具示例,键入赋值比合并替换少使用55%的内存。

我编写了两个脚本,每个脚本都调用一个设置脚本dt-setup.R来创建dt1dt2。第一个脚本dt-merge.R通过“合并”方法更新dt1。第二个脚本dt-keyed-assign.R则使用键入赋值。两个脚本都使用Rprofmem()函数记录内存分配情况。

为了不让我的笔记本电脑受折磨,我设置了dt1为500,000行,dt2为3,000行。

脚本:

# dt-setup.R
library(data.table)

set.seed(9474)
id_space <- seq_len(3000)
dt1  <- data.table(
  id = sample(id_space, 500000, replace = TRUE),
  x1 = runif(500000),
  x2 = runif(500000),
  x3 = runif(500000),
  x4 = runif(500000)
)
dt2 <- data.table(
  id = id_space,
  x5 = 11 * id_space
)
setkey(dt1, id)
setkey(dt2, id)
# dt-merge.R
source("dt-setup.R")
Rprofmem(filename = "dt-merge.out")
dt1 <- dt2[dt1, on = "id"]
Rprofmem(NULL)
# dt-keyed-assign.R
source("dt-setup.R")
Rprofmem(filename = "dt-keyed-assign.out")
dt1[dt2, on = "id", x5 := x5]
Rprofmem(NULL)

我将三个脚本放在我的工作目录中,在单独的R进程中运行每个连接脚本。

system2("Rscript", "dt-merge.R")
system2("Rscript", "dt-keyed-assign.R")

我认为输出文件中的行通常遵循模式"<bytes> :<call stack>"。我没有找到关于此的良好文档。然而,在前面的数字从未低于128,并且这是R在向量不进行malloc的默认最小字节数以下的情况下的做法。

请注意,并非所有这些分配都会增加R使用的内存。 R可以在垃圾回收后重用一些已有的内存。因此,这不是衡量任何特定时间使用多少内存的好方法。但是,如果我们假设垃圾回收行为是独立的,则它确实可以作为脚本之间比较的一种方法。

内存报告的一些示例行:

cat(readLines("dt-merge.out", 5), sep = "\n")
# 90208 :"get" "[" 
# 528448 :"get" "[" 
# 528448 :"get" "[" 
# 1072 :"get" "[" 
# 20608 :"get" "["

还有一些像 new page:"get" "[" 这样的行用于页面分配。

幸运的是,这些很容易解析。

parse_memory_report <- function(path) {
  report <- readLines(path)
  new_pages <- startsWith(report, "new page:")
  allocations <- as.numeric(gsub(":.*", "", report[!new_pages]))
  total_malloced <- sum(as.numeric(allocations))
  message(
    "Summary of ", path, ":\n",
    sum(new_pages), " new pages allocated\n",
    sum(as.numeric(allocations)), " bytes malloced"
  )
}

parse_memory_report("dt-merge.out")
# Summary of dt-merge.out:
# 12 new pages allocated
# 32098912 bytes malloced

parse_memory_report("dt-keyed-assign.out")
# Summary of dt-keyed-assign.out:
# 13 new pages allocated
# 14284272 bytes malloced

重复实验时,我得到了完全相同的结果。

因此,键分配多了一个页面分配。页面的默认字节大小为2000。我不确定malloc是如何工作的,而且相对于所有分配来说,2000微不足道,所以我会忽略这种差异。如果这样做很蠢,请批评我。

因此,不考虑页面,键分配分配的内存比合并少了55%。


2
这真的很有趣。如果这解决了 OP 的问题,我会很乐意听到。因为我遇到过这个问题无数次,但从未想过设置键。 - zonfl
这个解决方案在数据的一个小子集上似乎运行良好,但在我的64GB RAM笔记本电脑上仍然遇到问题。我将保持这个问题开放,以便其他人提出他们的解决方案。 - wake_wake
3
很好。是的,使用join时不需要创建新的data.table作为结果,而第一种方法会创建全新的DT。据我所知,这是最节省内存的方法。另一个可用的技巧是尽可能确保使用较小的列:使用整数而不是双精度将减少内存需求2倍。最终改变平台到Linux也可能有所帮助。 - jangorecki
1
@wake_wake 测试一下我发布的代码片段。它应该可以工作(至少对我来说总是有效的),但无法告诉你速度有多快(或者更慢)。如果它可以工作,请尝试在数据子集上使用mclapply再次运行它,以查看是否加速计算(我认为会因为排列彼此独立)。还要注意,至少从我的经验来看,具有这样维度的数据库如果您没有访问高性能群集,则可能无法更快地执行这些操作。 - zonfl

5

如果您必须使用拆分合并方法,并且以下操作适用于您的内存,请务必尽可能预先分配内存,以使迭代更快。因此,处理类似问题时,以下解决方案是我能想到的最有效的:

dt1  <- data.table(id = c(1, 2, 3, 4),
                   x1 = c(12, 13, 14, 15),
                   x2 = c(5, 6, 7, 8),
                   x3 = c(33, 44, 55, 66),
                   x4 = c(123, 123, 123, 123))

dt2 <- data.table(id = c(1, 2, 3, 4),
                  x5 = c(555, 666, 777, 888))

dt1_id <- sort(unique(dt1$id)) # extract all ids that are in dt1
dt1_l_split <- length(dt1_id) # get number of iterations
dt2_l_split <- length(unique(dt2[id %in% dt1_id]$id))

split_dt1 <- vector(mode = "list", length = length(unique(dt1$id))) # preallocate vector
split_dt1 <- lapply(1:dt1_l_split, function(x) dt1[id %in% dt1_id[[x]]]) # fill list with splits

rm(dt1); gc() # remove the large data table to save memory and clean up RAM

dt1 <- lapply(1:dt1_l_split, function(i) {
  print(Sys.time())
  print(i)

  tmp <- dt2[id %in% dt1_id[[i]]] # load relevant parts from dt2
  merge(tmp, split_dt1[[i]], all = TRUE) # merge dt1 and dt2
})
rbindlist(dt1)

你可以尝试使用parallel包中的mclapply来加速计算,但是我有时候会得到不同的结果,有时会真正加速计算,有时会变慢,所以我想最好还是试一下。

或者(并且在我看来是最简单的解决方案),将项目推送到你喜欢的Dropbox / Google Drive /任何云平台上,并设置一个具有52GB RAM、几个CPU和Windows Server的Google Cloud VM(避免自己安装GUI等)。用了大约10分钟就能设置好,第一年有300美元的预算,基本上是免费的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接