如何处理大而不是大数据集?

6

我有一个大约1.5亿条记录的约200GB数据集,需要对其进行条件分析和数据聚合。

问题是我不习惯(也没有训练)处理大型数据集。通常我会使用R或Python(并且还会用一些Julia),但当数据集无法放入内存时,我完全迷失了方向。

人们如何处理那些适合磁盘但不适合内存的数据集?我应该从哪里开始寻找解决方案?是否有一个集中介绍大型但不属于“大数据”的数据集的信息中心?

*长话短说,我有另一个可以放入内存的数据集,并且对于这个小数据集的每一行,我想要计算与大数据集匹配某些条件的观测值数量。我的最初反应是按块运行代码,但这非常低效,需要几个世纪的单处理器计算时间。

由于已经特别要求,我将描述文件的结构。

我有一个大文件BIG,它有(尤其)两个ID变量,“$ ID0 $”和“$ ID1 $”,以及一个日期变量“$ date1 $”。

我有一个小文件SMALL,其中有两个ID变量,“$ ID2 $”和“$ ID3 $”,以及一个日期变量“$ date2 $”。

对于每个“$ID2_i$”,我想要计算所有观测值,使得$\{ID0 = ID2_i, date1<date2_i, ID1=ID2_j | j : ID3_j = ID3_i \cap date2_j < date2_i \}$


1
你需要提供文件的具体结构以及你想要从中提取的信息。只需考虑你真正需要的信息,不要将原始文件直接放入内存。例如,你可能不需要精确值,而是可以直接将该值填入直方图中。我敢打赌,你的问题可能只需要几个MB的RAM就能解决! - KaPy3141
是的,我可以利用1MB的RAM来解决这个问题。主要问题是我需要在小数据集的每一行上运行条件计数一次,这意味着要运行1.6百万次,而每次计数超过15亿个观测值时,时间非常长。我也不能近似计算,因为我需要准确的计数。我不是在寻求解决我的问题的答案,因为它很长且无趣,但我主要是想找在线资源或指导良好的实践方向。 - SomePhDStudentGuy
如果它是“大数据”,你会如何运行它? - Dave
2
对于统计学家而言,没有特定应用场景的数据是不重要的。回归分析、绘图等都有方便的分块方式,这可能是一个相关主题的问题。总的来说,我们只能建议获取SAS许可证。 - AdamO
我在处理非常大(压缩)CSV文件时的查询方法:https://dev59.com/TcHqa4cB1Zd3GeqP36Jz#68693819 - San
显示剩余5条评论
3个回答

3

我可能误解了你的问题,但是像评论中建议的那样将大文件分块似乎是最直接的方法。

假设你将200GB的文件分成100个块,然后迭代处理每个块并对每个块进行期望的计数,随后聚合结果。如果每个块的操作在几分钟内完成,除非你需要一遍又一遍地执行此操作,否则应该可以满足需求。

要获得更具体的建议,我需要了解更多关于数据存储格式的信息。我们是否正在讨论一个大的.csv文件?如果是这种情况,在R中,你可以研究 readr包的分块API。要再次尽快在R中完成计数,可以使用data.table包。

编辑:添加一些示例代码

这段代码不会完全按照你的请求执行,但希望可以涵盖我建议的解决方案的一些关键点。

library(data.table)
library(readr)

ids <- seq.int(1, 1e2)
dates <- seq(as.Date("1999/01/01"), as.Date("2000/01/01"), by = "day")

big <- data.table(id0 = sample(ids, 1e6, replace = TRUE),
                  id1 = sample(ids, 1e6, replace = TRUE),
                  date1 = sample(dates, 1e6, replace = TRUE))

write.csv(big, "big.csv", row.names = FALSE)

small <- data.table(id2 = sample(ids, 1e2),
                    id3 = sample(ids, 1e2),
                    date2 = sample(dates, 1e2))

count_fun <- function(x, pos, acc) {
  setDT(x)
  tmp <- small[x, list(counts = .N),
               on = c("id2 == id0", "id3 == id1", "date2 > date1"),
               by = .EACHI, nomatch = NULL]
  acc[tmp<span class="math-container">$id2] <- acc[tmp$</span>id2] + tmp$counts
  acc
}

accumulator <- AccumulateCallback$new(count_fun, acc = rep(0, length(ids)))

counts <- read_csv_chunked("big.csv", accumulator, chunk_size = 1e4)

是的,它是CSV格式,但我可以获取另一种格式,这不是问题。 - SomePhDStudentGuy
我在我的问题中更新了一些R语言的示例代码。如果您有任何问题,请随时问我。 - nbenn

3

有不同的方法

对数据集进行分块(节省未来的时间但需要初始时间投入)

分块可以使您轻松完成许多操作,例如洗牌等。

确保每个子集/块都代表整个数据集。每个块文件应具有相同数量的行。

这可以通过将一行附加到另一个文件来完成。很快,您就会意识到,在读写同一驱动器时打开每个文件并写入一行效率低下。
-> 添加适合内存的写入和读取缓冲区。

enter image description here
enter image description here

选择适合您需求的块大小。我选择这个特定的大小,因为我的默认文本编辑器仍然可以相当快速地打开它。

较小的块可以提高性能,特别是如果您想获取类分布之类的指标,因为您只需要通过一个代表性文件循环即可获得整个数据集的估计值,这可能就足够了。
较大的块文件在每个文件中都具有更好的整体数据集表示,但是您也可以浏览x个较小的块文件。

我使用c#来完成这个任务,因为我在那里经验更丰富,因此我可以使用完整的功能集,例如将任务分割为不同的线程:读取/处理/写入

如果您熟练掌握使用python或r,则应该也有类似的功能。并行化可能是处理如此庞大的数据集的重要因素。

可以将分块数据集建模为一个交错的数据集,您可以使用张量处理单元进行处理。 这可能会产生最佳性能之一,并且可以在大型机器上本地执行以及在云端执行。 但这需要在tensorflow上学习很多知识。

使用读取器逐步读取文件

与其像 all_of_it = file.read()那样做,你想使用某种类型的流读取器。以下函数逐行读取一个块文件(或整个300gb数据集),以计算文件内的每个类别。通过一次处理一行,程序不会溢出内存。

您可能需要添加一些进度指示,例如X行/秒或X MBbs,以便估计总处理时间。

def getClassDistribution(path):
    classes = dict()
    # open sample file and count classes
    with open(path, "r",encoding="utf-8",errors='ignore') as f:
        line = f.readline()
        while line:
            if line != '':
                labelstring = line[-2:-1]
                if labelstring == ',':
                    labelstring = line[-1:]
                label = int(labelstring)
                if label in classes:
                    classes[label] += 1
                else:
                    classes[label] = 1
            line = f.readline()
    return classes

在此输入图片描述

我使用分块数据集和估计的组合。

性能陷阱:

  • 尽可能避免嵌套循环。每个内部循环都会将复杂度乘以n。
  • 尽可能避免逐一处理数据。每个后续循环都会增加n的复杂度。
  • 如果您的数据以csv格式呈现,请避免预制函数,例如cells = int(line.Split(',')[8]),否则会很快导致内存吞吐量瓶颈。可以在getClassDistribution中找到一个正确的例子,其中我只想获取标签。

以下C#函数将csv行快速拆分成元素。

// Call function
ThreadPool.QueueUserWorkItem((c) => AnalyzeLine("05.02.2020,12.20,10.13").Wait());

// Parralelize this on multiple cores/threads for ultimate performance
private async Task AnalyzeLine(string line)
{
    PriceElement elementToAdd = new PriceElement();
    int counter = 0;
    string temp = "";
    foreach (char c in line)
    {
        if (c == ',')
        {
            switch (counter)
            {
                case 0:
                    elementToAdd.spotTime = DateTime.Parse(temp, CultureInfo.InvariantCulture);
                    break;
                case 1:
                    elementToAdd.buyPrice = decimal.Parse(temp);
                    break;
                case 2:
                    elementToAdd.sellPrice = decimal.Parse(temp);
                    break;
            }
            temp = "";
            counter++;
        }
        else temp += c;
    }
    // compare the price element to conditions on another thread
    Observate(elementToAdd);
}

创建数据库并加载数据

处理类似csv的数据时,可以将数据加载到数据库中。
数据库专门用于容纳大量数据,并且您可以期望非常高的性能。
数据库可能会占用比原始数据更多的磁盘空间。这是我放弃使用数据库的原因之一。

硬件优化

如果您的代码已经进行了优化,那么瓶颈很可能是硬盘吞吐量。

  • 如果数据适合本地硬盘,请在本地使用,这样可以消除网络延迟(想象一下每个记录在本地网络中需要2-5毫秒,在远程位置需要10-100毫秒)。
  • 使用现代硬盘。1TB NVME SSD今天的成本约为130美元(Intel 600p 1TB)。 NVME SSD使用PCIe,比普通SSD快5倍,比普通硬盘快50倍,特别是在快速写入不同位置(分块数据)时。在最近几年中,SSD在容量方面已经取得了巨大进展,对于这样的任务来说,它将非常有效。

以下屏幕截图提供了Tensorflow在相同机器上使用相同数据进行训练的性能比较。在一个本地标准SSD上保存一次,另一次保存在连接到局域网的网络附加存储(普通硬盘)。
输入图像描述
输入图像描述


1

看起来是一个O(n^2)的问题:BIG中的每个元素都必须与BIG中的所有其他元素进行比较。

也许您可以将比较所需的所有字段都适合内存中(在文件中留下其余部分)。 例如:1.5G观察值x 1日期(4字节)x 2个ID(8字节)可以适合18GB。

也许您可以按日期对BIG进行排序,然后您的问题变为O(n x log(n))。

也许您可以将BIG拆分成ID3i = ID3j的块。

有很多可能性。


对于许多这样的连接,通过适当的索引,可以将$O(n^2)$的性能降低到$O(n\log(n))$。这是强大的数据库平台提供的服务之一。 - whuber

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接