快速将data.table列连接成一个字符串列

26

data.table中给定任意列名列表,我想将这些列的内容连接为一个字符串,并存储到新列中。我需要动态生成表达式来进行拼接,因为需要拼接的列并不总是相同的。

我怀疑我使用 eval(parse(...)) 方法的方式可以用更优美的方式替换,但下面的方法是目前我能得到的最快的方法。

对于1000万行数据,在这个样例数据上运行时间大约为21.7秒(基本R函数paste0稍微慢一些--23.6秒)。我的实际数据有18-20列被连接,并且最多有1亿行,所以减速变得更加不切实际了。

有什么想法可以加速吗?


目前的方法

library(data.table)
library(stringi)

RowCount <- 1e7
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- c("x","a","b","c","d","e","f","y")
PasteStatement <- stri_c('stri_c(',stri_c(ConcatCols,collapse = ","),')')
print(PasteStatement)

提供

[1] "stri_c(x,a,b,c,d,e,f,y)"

然后使用以下表达式将列连接起来:

DT[,State := eval(parse(text = PasteStatement))]

输出的示例:

     x   y a b c d e f        State
1: foo bar 4 8 3 6 9 2 foo483692bar
2: foo bar 8 4 8 7 8 4 foo848784bar
3: foo bar 2 6 2 4 3 5 foo262435bar
4: foo bar 2 4 2 4 9 9 foo242499bar
5: foo bar 5 9 8 7 2 7 foo598727bar

性能分析结果

火焰图 数据


更新1:freadfwritesed

根据@Gregor的建议,尝试使用sed在磁盘上进行连接。由于data.table的极快速度的freadfwrite函数,我能够将列写入磁盘,在磁盘上使用sed消除逗号分隔符,然后在约18.3秒内读回后处理过的输出——虽然还不足以使转换,但也是一个有趣的侧面研究!

ConcatCols <- c("x","a","b","c","d","e","f","y")
fwrite(DT[,..ConcatCols],"/home/xxx/DT.csv")
system("sed 's/,//g' /home/xxx/DT.csv > /home/xxx/DT_Post.csv ")
Post <- fread("/home/xxx/DT_Post.csv")
DT[,State := Post[[1]]]

18.3秒的细项分解(无法使用profvis,因为sed对R分析器不可见):

  • data.table::fwrite() - 0.5秒
  • sed - 14.8秒
  • data.table::fread() - 3.0秒
  • := - 0.0秒

如果没有其他问题,这证明了data.table作者在磁盘IO性能优化方面的广泛工作。 (我正在使用添加了多线程支持的1.10.5开发版本的freadfwrite已经有一段时间支持多线程)。

一个注意事项:如果有一种方法可以使用空白分隔符通过fwrite写入文件,就像下面@Gregor建议的那样,那么此方法可能被削减到~3.5秒!

关于这个话题的更新:forked data.table并注释掉需要大于长度0的分隔符的行,神奇地得到了一些空格?在试图搞乱C内部时引起了一些segfaults,所以暂时放一放。理想的解决方案不需要写入磁盘,并且可以将所有内容保存在内存中。


第二次更新:sprintf用于整数特定情况

这里是第二次更新:虽然我在原始用法示例中包括了字符串,但我的实际用例仅串联整数值(可以基于上游清理步骤始终假设非空)。

由于使用情况高度特定并且与先前发布的时间不同,因此我不会直接将时间与先前发布的时间进行比较。 但是,一个结论是,虽然stringi很好地处理了许多字符编码格式,不需要指定混合向量类型,并且在箱外执行了一堆错误处理,但这确实增加了一些时间。(这对大多数情况可能值得)

通过使用基本的R的sprintf函数并让它事先知道所有输入都将是整数,我们可以削减计算18个整数列的5百万行的运行时间约30%。(20.3秒而不是28.9秒)

library(data.table)
library(stringi)
RowCount <- 5e6
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- list("a","b","c","d","e","f")
## Do it 3x as many times
ConcatCols <- c(ConcatCols,ConcatCols,ConcatCols)

## Using stringi::stri_c ---------------------------------------------------
stri_joinStatement <- stri_c('stri_join(',stri_c(ConcatCols,collapse = ","),', sep="", collapse=NULL, ignore_null=TRUE)')
DT[, State := eval(parse(text = stri_joinStatement))]

## Using sprintf -----------------------------------------------------------
sprintfStatement <- stri_c("sprintf('",stri_flatten(rep("%i",length(ConcatCols))),"', ",stri_c(ConcatCols,collapse = ","),")")
DT[,State_sprintf_i := eval(parse(text = sprintfStatement))]
生成的语句如下:
> cat(stri_joinStatement)
stri_join(a,b,c,d,e,f,a,b,c,d,e,f,a,b,c,d,e,f, sep="", collapse=NULL, ignore_null=TRUE)
> cat(sprintfStatement)
sprintf('%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i', a,b,c,d,e,f,a,b,c,d,e,f,a,b,c,d,e,f)

sprintf


更新 3: R 不必慢。

基于@Martin Modrák的答案,我制作了一个“一招鲜”的软件包,基于一些针对特定“单个数字整数”情况的data.table内部专门化: fastConcat(不要指望它会很快出现在CRAN上,但是您可以自己承担风险从github repo安装并使用,msummersgill/fastConcat。)

也许有人更好地理解c,这可能进一步改善,但目前为止,它正在运行与更新2中相同的用例,在 2.5秒 内--比sprintf()稍快约 8倍,比我最初使用的stringi::stri_c()方法快 11.5倍

对我来说,这凸显了在某些最简单的操作中提高性能的巨大机会,例如使用更好调整的c进行基本字符串向量连接。 我想人们像@Matt Dowle这样的人已经看到了这一点--如果只是他有时间重写R的所有内容,而不仅仅是data.frame。

fastConcat



2
stri_c 的作用就是立即调用一个 C++ 函数来连接字符串。我认为你不可能在 R 中超越它的性能。即使 paste 也很快转换为编译代码,因此其性能几乎一样好。 - Gregor Thomas
2
也许对你来说,使用命令行工具预处理或后处理数据会更有效?或者在 SQL 或 Hadoop 中连接数据,无论你是如何加载它的? - Gregor Thomas
2
几个想法:(a) 从Hadoop中提取时将列合并。据我所知,Hive、Pig和Spark都支持列连接。(b) 不幸的是,fread不允许空分隔符,但readr::write_delim可以。它可能太慢了,但值得一试。(c) sed可能是从命令行中最快的方法,但是这个问题的答案表明,如果你复制文件而不是直接在原地编辑文件,你可以通过不同的语法特别是获得一些加速。 - Gregor Thomas
3
不知道这是否可行,但似乎在fwrite中进行一行输入检查可以防止您将 "" 作为分隔符进行指定。您可以尝试使用fixInNamespace来删除该行,并查看是否允许您使用 sep = "" 进行 fwrite。我以前从未使用过fixInNamespace,但应该是可行的。未解决的问题是sep无法为空字符串的更深层次原因是否存在。 - Gregor Thomas
1
提交一个功能请求以支持 sep = "" - eddi
显示剩余6条评论
3个回答

15

C来拯救了!

从data.table中窃取一些代码,我们可以编写一个C函数,它的运行速度要快得多(并且可以进行并行化以获得更快的速度)。

首先确保你有一个正常工作的C++工具链:

library(inline)

fx <- inline::cfunction( signature(x = "integer", y = "numeric" ) , '
    return ScalarReal( INTEGER(x)[0] * REAL(y)[0] ) ;
' )
fx( 2L, 5 ) #Should return 10

假设数据仅为整数类型,以下代码应该可以正常工作(但该代码可扩展至其他类型):

library(inline)
library(data.table)
library(stringi)

header <- "

//Taken from https://github.com/Rdatatable/data.table/blob/master/src/fwrite.c
static inline void reverse(char *upp, char *low)
{
  upp--;
  while (upp>low) {
  char tmp = *upp;
  *upp = *low;
  *low = tmp;
  upp--;
  low++;
  }
}

void writeInt32(int *col, size_t row, char **pch)
{
  char *ch = *pch;
  int x = col[row];
  if (x == INT_MIN) {
  *ch++ = 'N';
  *ch++ = 'A';
  } else {
  if (x<0) { *ch++ = '-'; x=-x; }
  // Avoid log() for speed. Write backwards then reverse when we know how long.
  char *low = ch;
  do { *ch++ = '0'+x%10; x/=10; } while (x>0);
  reverse(ch, low);
  }
  *pch = ch;
}

//end of copied code 

"



 worker_fun <- inline::cfunction( signature(x = "list", preallocated_target = "character", columns = "integer", start_row = "integer", end_row = "integer"), includes = header , "
  const size_t _start_row = INTEGER(start_row)[0] - 1;
  const size_t _end_row = INTEGER(end_row)[0];

  const int max_out_len = 256 * 256; //max length of the final string
  char buffer[max_out_len];
  const size_t num_elements = _end_row - _start_row;
  const size_t num_columns = LENGTH(columns);
  const int * _columns = INTEGER(columns);

  for(size_t i = _start_row; i < _end_row; ++i) {
    char *buf_pos = buffer;
    for(size_t c = 0; c < num_columns; ++c) {
      if(c > 0) {
        buf_pos[0] = ',';
        ++buf_pos;
      }
      writeInt32(INTEGER(VECTOR_ELT(x, _columns[c] - 1)), i, &buf_pos);
    }
    SET_STRING_ELT(preallocated_target,i, mkCharLen(buffer, buf_pos - buffer));
  }
return preallocated_target;
" )

#Test with the same data

RowCount <- 5e6
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- list("a","b","c","d","e","f")
## Do it 3x as many times
ConcatCols <- c(ConcatCols,ConcatCols,ConcatCols)


ptm <- proc.time()
preallocated_target <- character(RowCount)
column_indices <- sapply(ConcatCols, FUN = function(x) { which(colnames(DT) == x )})
x <- worker_fun(DT, preallocated_target, column_indices, as.integer(1), as.integer(RowCount))
DT[, State := preallocated_target]
proc.time() - ptm

虽然你的(仅限整数)示例在我的电脑上运行约20秒,但此代码可以在大约5秒内运行,并且可以轻松并行化。

需要注意的一些事项:

  • 该代码不能直接用于生产环境 - 应对函数输入进行很多检查(特别是检查所有列是否具有相同的长度、检查列类型、预分配目标大小等)
  • 该函数将输出放入预分配的字符向量中,这是非标准且不美观的(R通常没有传递引用语义),但允许并行化(见下文)
  • 最后两个参数是要处理的起始和结束行,同样是为了实现并行化
  • 该函数接受列索引而不是列名。所有列都必须是整数类型。
  • 除了输入的data.table和预分配目标之外,其他输入都必须是整数
  • 函数的编译时间不包括在内(因为应该先编译它 - 甚至可能制作一个包)

并行化

编辑:下面的方法实际上由于clusterExport和R字符串存储方式的原因会失败。因此,并行化可能需要采用C语言实现,类似于data.table的实现方式。

由于无法跨R进程传递内联编译函数,因此并行化需要更多的工作。要能够在并行中使用上述函数,您需要使用R编译器单独编译它并使用dyn.load OR将其包装在一个程序包中OR使用分叉后端进行并行(我没有分叉后端,分叉仅在UNIX上工作)。

然后,并行运行看起来会像这样(未经测试):

no_cores <- detectCores()

# Initiate cluster
cl <- makeCluster(no_cores)

#Preallocated target and prepare params
num_elements <- length(DT[[1]])
preallocated_target <- character(num_elements)
block_size <- 4096 #No of rows processed at once. Adjust for best performance
column_indices <- sapply(ConcatCols, FUN = function(x) { which(colnames(DT) == x )})

num_blocks <- ceiling(num_elements / block_size)

clusterExport(cl, 
   c("DT","preallocated_target","column_indices","num_elements", "block_size"))
clusterEvalQ(cl, <CODE TO LOAD THE NATIVE FUNCTION HERE>)

parLapply(cl, 1:num_blocks ,
          function(block_id)
          {
            throw_away <- 
              worker_fun(DT, preallocated_target, columns, 
              (block_id - 1) * block_size + 1, min(num_elements, block_id * block_size - 1))
            return(NULL)
          })



stopCluster(cl)

感谢你所付出的所有努力!我按原样运行你的代码,得到了8.5秒的运行时间,相比使用sprintf()的基准时间20.5秒,速度提升了2.4倍。我目前正在逐行分析代码,试图理解每一部分的作用,但似乎这里有一些非常可靠的潜力!我可能会尝试将其放入一个函数包中,以便可以预编译为OpenMP并允许非整数、可变长度的输入。如果我能够实现这个,那么我认为我们可能会获得胜利! - Matt Summersgill
不确定你计划深入探究多少,但我最终把这些放入了一个在 GitHub 上的包中 msummersgill/fastConcat。目前只是试图将相同的代码编译成 R 函数,我猜测 inline 包正在抽象出一些东西,我需要获取这些以使其能够作为独立的 C/C++ 运行。 - Matt Summersgill
3
老实说,这是我第一次为R编写C代码,所以我也不知道让这个C代码在软件包中工作需要什么。不过,我很开心。请注意,inline可以给你完整的源码(每当有编译时错误时它都会显示)。我相信OpenMP和其他列类型的writeXX函数可以很容易地从data.table的fwrite.c中获取,并做出一些小修改使其实际运行。 - Martin Modrák
感谢内联提示!我从 inline::code(worker_fun) 的输出重新开始,发现这个包实际上可以运行!现在开始调整它,我添加了一个 if-else 来跳过分隔符部分,如果用户指定 sep = ""(我的整数用例),现在只需要 3.1 秒,比 sprintf() 基准快了 6.6 倍 - Matt Summersgill

10

我不知道样本数据对于您的实际数据来说有多具有代表性,但在样本数据的情况下,仅连接每个ConcatCols唯一组合一次而不是多次,您可以实现显着的性能改进。

这意味着对于样本数据,如果您也包括所有重复项,则需要进行大约10百万次连接,而仅连接每个唯一组合一次只需进行约500k次连接。

请参见以下代码和计时示例:

system.time({
  setkeyv(DT, ConcatCols)
  DTunique <- unique(DT[, ConcatCols, with=FALSE], by = key(DT))
  DTunique[, State :=  do.call(paste, c(DTunique, sep = ""))]
  DT[DTunique, State := i.State, on = ConcatCols]
})
#       user      system     elapsed 
#      7.448       0.462       4.618 

大约有一半的时间花在了setkey部分。如果您的数据已经是键入的,则时间进一步缩短,仅略多于2秒。

setkeyv(DT, ConcatCols)
system.time({
  DTunique <- unique(DT[, ConcatCols, with=FALSE], by = key(DT))
  DTunique[, State :=  do.call(paste, c(DTunique, sep = ""))]
  DT[DTunique, State := i.State, on = ConcatCols]
})
#       user      system     elapsed 
#      2.526       0.280       2.181 

2
这也是一个很好的答案!我的实际数据通常只有大约10,000个唯一组合(无论我处理1百万还是1亿),因此这对我的应用程序来说是非常高效的方法。使用更具代表性的数据集(10百万行,18列,9216个唯一组合),该方法在5.2秒内执行,仅比基于@Martin Modrák的回答中的定制函数fastConcat::concat()msummersgill/fastConcat中的运行时间略慢,为3.4秒 - Matt Summersgill
1
如果您的最大唯一组合数受到限制,如您所说的10k,我怀疑我的方法在比如100百万行的情况下会比其他答案更具可扩展性。但我还没有测试过。 - talat
非常聪明的技巧!你可能可以结合这两种方法,进一步提高效率。 - Martin Modrák
@MartinModrák,谢谢!当然,两者的结合应该是非常快的。我也喜欢你的答案,但由于我对C代码的理解有限,我更倾向于保持简单,只使用data.table。 - talat
不错!我会采用你的方法,但还有另一种data.table的方式:DT[, z := do.call(paste0, .BY), by=ConcatCols](需要三倍的时间) - Frank
谢谢,这也是一个不错的选择。我经常忘记使用.BY。 - talat

0

这个使用了tidyr包中的unite函数。它可能不是最快的,但它很可能比手写的R代码更快。

library(tidyr)
system.time(
  DNew <- DT %>% unite(State, ConcatCols, sep = "", remove = FALSE)
)
# user  system elapsed 
# 14.974   0.183  15.343 

DNew[1:10]
# State   x   y a b c d e f
# 1: foo211621bar foo bar 2 1 1 6 2 1
# 2: foo532735bar foo bar 5 3 2 7 3 5
# 3: foo965776bar foo bar 9 6 5 7 7 6
# 4: foo221284bar foo bar 2 2 1 2 8 4
# 5: foo485976bar foo bar 4 8 5 9 7 6
# 6: foo566778bar foo bar 5 6 6 7 7 8
# 7: foo892636bar foo bar 8 9 2 6 3 6
# 8: foo836672bar foo bar 8 3 6 6 7 2
# 9: foo963926bar foo bar 9 6 3 9 2 6
# 10: foo385216bar foo bar 3 8 5 2 1 6

3
不确定你正在运行哪个基准测试,但在原始的含有1000万行数据集上,“tidyr::unite()”函数在我的服务器上花费了25.7秒,而基础R中的“paste()”函数只需要23.6秒。当你查看Github上的源代码时,稍慢的执行速度就像预期的那样,因为实际上“unite”只是基于基础R的“paste()”函数编写的包装函数。第二个示例使用整数进行连接,并且与其他方法进行基准测试时产生的结果不同,因为列没有重复。 - Matt Summersgill

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接