使用C++解析非常大的CSV文件

19

我的目标是在OSX环境中的QT项目中使用C++解析大型csv文件。(当我说csv时,我指的是tsv和其他变体1GB ~ 5GB)。

这似乎是一项简单的任务,但当文件大小变得更大时情况变得复杂。由于与解析csv文件相关的许多边缘情况,我不想编写自己的解析器。

我已经找到了各种csv处理库来处理此工作,但在我的机器上解析1GB文件大约需要90-120秒,这是不能接受的。现在我仅处理并丢弃数据进行测试。

cccsvparser是我尝试过的库之一。但唯一足够快的库是fast-cpp-csv-parser,它可以在我的机器上以15秒的速度给出可接受的结果,但只有在已知文件结构时才能工作。

示例使用:fast-cpp-csv-parser

#include "csv.h"

int main(){
    io::CSVReader<3> in("ram.csv");
    in.read_header(io::ignore_extra_column, "vendor", "size", "speed");
    std::string vendor; int size; double speed;
    while(in.read_row(vendor, size, speed)){
    // do stuff with the data
    }
}

正如您所看到的,我无法加载任意文件,必须明确定义变量以匹配我的文件结构。我不知道是否有任何方法可以让我在运行时动态创建这些变量。

我尝试过的另一种方法是使用fast-cpp-csv-parser LineReader类逐行读取csv文件(速度非常快,约为7秒),然后使用cccsvparser库解析每行字符串,但这需要大约40秒才能完成,虽然相比第一次尝试有所改进,但仍然无法接受。

我看过各种与csv文件解析相关的Stack Overflow问题,但没有一个考虑到大文件处理。

此外,我花了很多时间搜索解决此问题的方法,我真的很想念像npmpip这样的软件包管理器在搜索现成解决方案时提供的自由。

我将感谢任何关于如何处理此问题的建议。

编辑:

使用@fbucek的方法,处理时间缩短至25秒,这是一个巨大的改进。

我们能否进一步优化?


为什么不用传统的方法读取第一行,分析其中的标题数字/名称,并将其作为代码的输入,就像您所展示的那样? - Bowdzone
多线程是否可接受? - fbucek
这些多GB大小的文件存储在什么类型的磁盘上?固态硬盘还是普通磁盘? - Malkocoglu
@Bowdzone:我考虑过这个问题,但是我不知道如何着手解决?你能给我展示一下你所想的例子吗? - Alexander
@Malkocoglu 在我的情况下,它是磁盘驱动器,但它应该能够在两种驱动器上运行。 - Alexander
显示剩余20条评论
3个回答

12

我假设您只使用了一个线程。

多线程可以加速您的处理过程。

迄今为止最好的成就是40秒。让我们坚持这一点。

我假设您首先阅读然后处理 -> (大约需要7秒钟来阅读整个文件)

读取需要7秒 处理需要33秒

首先,您可以将文件划分为块,假设每块大小为50MB。 这意味着在读取完50MB的文件后,您可以开始处理。 您不需要等到整个文件完成才开始处理。 这样需要0.35秒进行读取(现在是0.35秒+33秒处理时间=约34秒)

当您使用多线程时,您可以同时处理多个块。从理论上讲,这可以将处理速度加快到核数。假设您有4个核心。 那就是33/4 = 8.25秒。

我认为您可以使用4个核心将处理速度加快到总共9秒

查看QThreadPoolQRunnableQtConcurrent 我更喜欢QThreadPool

将任务分成几个部分:

  1. 首先尝试循环遍历文件并将其分成块。 然后什么都不做。
  2. 然后创建“ ChunkProcessor ”类,该类可以处理该块
  3. 将“ ChunkProcessor ”作为QRunnable的子类,并在重新实现的run()函数中执行您的处理
  4. 当您有块时,您有一个可以处理它们的类,并且该类与QThreadPool兼容,您可以将其传递到

可能看起来像这样

loopoverfile {
  whenever chunk is ready {
     ChunkProcessor *chunkprocessor = new ChunkProcessor(chunk);
     QThreadPool::globalInstance()->start(chunkprocessor);
     connect(chunkprocessor, SIGNAL(finished(std::shared_ptr<ProcessedData>)), this, SLOT(readingFinished(std::shared_ptr<ProcessedData>)));
  }   
}

为避免使用QMutex或其他东西并避免多线程访问某些资源时出现序列化问题,您可以使用std :: share_ptr来传递处理后的数据。

注意:要使用自定义信号,您必须在使用之前注册它。

qRegisterMetaType<std::shared_ptr<ProcessedData>>("std::shared_ptr<ProcessedData>");

编辑:(根据讨论,我的回答并不清楚) 使用哪种磁盘或其速度都无关紧要。读取是单线程操作。 这个解决方案之所以被建议,只是因为读取需要7秒钟,而且无论使用哪个磁盘都一样。 7秒 才是关键。唯一的目的是尽快开始处理,而不是等待读取完成。

你可以使用:

QByteArray data = file.readAll();

或者你可以使用主要思想:(我不知道为什么需要7秒钟才能读完,背后是什么)

 QFile file("in.txt");
 if (!file.open(QIODevice::ReadOnly | QIODevice::Text))
   return;

 QByteArray* data = new QByteArray;    
 int count = 0;
 while (!file.atEnd()) {
   ++count;
   data->append(file.readLine());
   if ( count > 10000 ) {
     ChunkProcessor *chunkprocessor = new ChunkProcessor(data);
     QThreadPool::globalInstance()->start(chunkprocessor);
     connect(chunkprocessor, SIGNAL(finished(std::shared_ptr<ProcessedData>)), this, SLOT(readingFinished(std::shared_ptr<ProcessedData>)));
     data = new QByteArray; 
     count = 0;
   }
 }

一个文件,一个线程读取,几乎可以像按行读取一样快速,"没有"中断。你对数据怎么处理是另一个问题,但与 I/O 无关。数据已经在内存中。因此唯一需要考虑的是机器上的5GB 文件和 RAM 的数量

这是一个非常简单的解决方案,您只需要子类化 QRunnable,重新实现 run 函数,在完成时发出信号,使用共享指针传递处理后的数据,并在主线程中将该数据连接成一个结构或其他类型的对象。这是一个简单的线程安全解决方案。


1
多线程对于磁盘I/O没有帮助。阅读此线程以获取解释 - https://dev59.com/HHNA5IYBdhLWcg3wk_CA。 - SChepurin
2
@SChepurin - 1)在这个例子中,多线程与磁盘I/O无关。更不用说多个I/O操作了。这里只是像往常一样在一个线程中读取一个文件。2)而且这并不总是正确的。它取决于磁盘类型、你正在执行的任务等等。当你优化好所有东西时,写入多个文件并不能帮助你。但有时瓶颈并不是磁盘。 - fbucek
1
读取文件是磁盘I/O操作。他只需要这个。无论你是否优化,都不会得到承诺的结果。 - SChepurin
2
@SChepurin 这不是纯磁盘I/O的情况。这是磁盘I/O后跟着处理。根据报告的吞吐量(100秒内1GB = 10MB/s),磁盘I/O几乎肯定不是瓶颈(即使是旧的磁盘也比那快一个数量级),因此将工作分成两个线程(一个读取,一个处理)很可能会显著提高吞吐量。 - nobody
@Andrew Medico - "...非常有可能显著提高吞吐量。" 说到这里,你可以提出解决方案。我唯一能同意的是100秒对于这些文件来说太长了。 - SChepurin
显示剩余3条评论

4
我建议使用多线程方式,并稍作变化。其中一个线程专门负责以预定义的(可配置的)块大小读取文件,然后将数据提供给一组线程(基于 CPU 核心)。假设配置如下:
块大小 = 50 MB 磁盘线程 = 1 处理线程 = 5
以下是实现步骤:
1. 创建一个用于从文件中读取数据的类。在此类中,应该包含用于与处理线程通信的数据结构。例如,该结构应包含每个处理线程读取缓冲区的起始偏移量和结束偏移量。对于读取文件数据,读取器类会保存两个大小为块大小(50 MB)的缓冲区。
2. 创建一个处理类,该类持有指向读取缓冲区和偏移数据结构的指针(共享)。
3. 现在创建驱动程序(可能是主线程),创建所有线程并等待它们完成并处理信号。
4. 启动读取线程,传递读取器类作为参数,读取 50 MB 的数据,根据线程数创建偏移数据结构对象。在这种情况下,t1 处理 0 到 10 MB,t2 处理 10 到 20 MB,以此类推。准备就绪后,它向处理器线程发送通知。然后它立即从磁盘上读取下一个块,并等待处理线程完成的通知。
5. 处理线程在收到通知后,从缓冲区中读取数据并对其进行处理。一旦完成,它会通知读取线程,等待下一个块。
6. 该过程循环执行,直到所有数据都被读取和处理完毕。然后,读取线程向主线程发送完成通知,主线程发送PROCESS_COMPLETION,此时所有线程退出。或主线程选择处理队列中的下一个文件。
请注意,为方便说明,此处采用了偏移量,需要通过编程方式处理行分隔符映射。

真的,我们可以在发表任何评论之前讨论这种方法的优缺点。我已经实现了这种算法,并在性能方面取得了改进。如果代码以模块化方式编写,在进行基准测试时使用多线程不是问题。进一步解释如下: 有两个资源相互阻塞:1.磁盘2.处理器 该算法将它们分开,读取线程对文件进行顺序读取,而顺序读写比传统磁盘上的随机读写要快得多。 - bsr
该库有一个限制,它仅在文件结构已知的情况下才能正常工作,就像问题中已经提到的那样。 - bsr
由于我的声望不够,我无法对您的答案添加评论。我已经查看了库的代码,他们使用异步IO来读取文件,这比我建议的方法更加清晰。但是它没有多个线程来处理CSV行。 - bsr
Disk I/O and CSV-parsing are overlapped using threads for efficiency. - László Papp

0
如果您使用的解析器不明显分布式,那么这种方法是不可扩展的。我会推荐以下技术:
  • 将文件分块,使其大小适合机器/时间限制
  • 将块分配给可以满足您的时间/空间要求的机器集群(1..*)
  • 考虑处理给定块大小的块
  • 避免在同一资源上使用线程(即给定块),以免遇到所有与线程相关的问题。
  • 使用线程实现非竞争性(在资源上)操作-例如在一个线程上读取,在另一个线程上写入到不同的文件中。
  • 进行解析(现在对于这个小块,您可以调用您的解析器)。
  • 执行您的操作。
  • 将结果合并回来/如果可以按原样分发它们。
现在,话虽如此,为什么您不能使用类似Hadoop的框架呢?

1
只是出于好奇:它与其他答案有何不同? - László Papp
区别在于-答案旨在纯粹的“技术”,没有任何实现方式(编程代码/设计模式等)的建议。鉴于原始问题和问题描述,我想强调必须是“分布式解决方案”以进行扩展。这也是Hadoop存在和繁荣的原因。从Ctrl + F中,我找不到我的答案之前出现过这个词..现在为什么难以理解呢? - Senthu Sivasambu
并行计算 <> 分布式计算 .. 根据我的理解,分布式计算是一种并行计算技术,但反之则不一定成立 - 这只是为了说明一个观点。 - Senthu Sivasambu
看,我甚至为你找到了一个链接 :) http://cs.stackexchange.com/questions/1580/distributed-vs-parallel-computing - Senthu Sivasambu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接