使用`readr::read_csv_chunked()`分块读取csv文件

11

我希望能够读取更大的csv文件,但是遇到了内存问题。因此,我想尝试使用readr包中的read_csv_chunked()函数来分块读取它们。我的问题在于,我并不真正理解callback参数。

这是我迄今为止尝试过的最简单的示例(我知道我必须将所需的操作包含在f()中,否则在内存使用方面就不会有优势,对吧?):

library(tidyverse)
data(diamonds)
write_csv(diamonds, "diamonds.csv") # to have a csv to read

f <- function(x) {x}
diamonds_chunked <- read_csv_chunked("diamonds.csv", 
                                     callback = DataFrameCallback$new(f),
                                     chunk_size = 10000)

我试图让callback参数与官方文档的示例尽可能接近:

# Cars with 3 gears
f <- function(x, pos) subset(x, gear == 3)
read_csv_chunked(readr_example("mtcars.csv"), 
                 DataFrameCallback$new(f), 
                 chunk_size = 5)

然而,我收到了下面的错误,似乎是在第一块被读取后出现的,因为我看到进度条移动到了18%。

Error in eval(substitute(expr), envir, enclos) : unused argument (index)

我已经尝试将我想要进行的操作包含在f()内部,但仍然收到相同的错误。

2个回答

12

我发现调用 DataFrameCallback$new() 函数时,需要传入一个额外的参数(pos 是文档示例中的参数名)。这个参数不一定需要使用,所以我并不真正理解它的目的。但至少,以这种方式运行是有效的。

有没有人了解关于这个第二个参数的更多细节?


6

pos表示位置,它是每个块中第一行的索引号。使用此回调函数,您可以处理块中的每一行。

以下是来自https://readr.tidyverse.org/reference/callback.html的官方示例:

ChunkCallback 回调接口定义,所有回调函数都应该从这个类继承。

SideEffectChunkCallback 回调函数仅用于副作用,不返回结果。

DataFrameCallback 回调函数在结尾时将每个结果组合起来。

AccumulateCallBack 回调函数累积单个结果。需要使用参数 acc 指定累加器的初始值。默认情况下,参数acc为NULL。

# Print starting line of each chunk
f <- function(x, pos) print(pos)
read_lines_chunked(readr_example("mtcars.csv"), SideEffectChunkCallback$new(f), chunk_size = 5)

# The ListCallback can be used for more flexible output
f <- function(x, pos) x$mpg[x$hp > 100]
read_csv_chunked(readr_example("mtcars.csv"), ListCallback$new(f), chunk_size = 5)

请在答案中添加相关的代码和解释,而不是链接到外部URL。 - Nilambar Sharma

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接