使用OpenMP并行化while循环

9

我有一个非常大的数据文件,每个记录都有4行。我编写了一个非常简单的C程序来分析这种类型的文件并打印出一些有用的信息。程序的基本思路如下:

int main()
{
  char buffer[BUFFER_SIZE];
  while(fgets(buffer, BUFFER_SIZE, stdin))
  {
    fgets(buffer, BUFFER_SIZE, stdin);
    do_some_simple_processing_on_the_second_line_of_the_record(buffer);
    fgets(buffer, BUFFER_SIZE, stdin);
    fgets(buffer, BUFFER_SIZE, stdin);
  }
  print_out_result();
}

当然,这里省略了一些细节(如检查错误等),但这与问题无关。
程序很好用,但我处理的数据文件非常大。我想通过使用OpenMP并行化循环来加快程序运行速度。然而,经过一番搜索,发现OpenMP只能处理已知迭代次数的for循环。由于我不知道文件大小,甚至像wc -l这样简单的命令也需要很长时间才能运行,那么我该如何并行化这个程序呢?
3个回答

9
正如thiton所提到的,这段代码可能会受到I/O限制。然而,如今许多计算机可能拥有SSD和高吞吐量RAID磁盘。在这种情况下,您可以通过并行化获得加速。此外,如果计算不是微不足道的,则并行化胜出。即使由于饱和带宽而有效地将I/O串行化,您仍然可以通过将计算分布到多核心来获得加速。
回到问题本身,您可以通过OpenMP并行化此循环。对于stdin,我不知道如何并行化,因为它需要按顺序读取,没有结束的先前信息。但是,如果您正在处理典型文件,则可以这样做。
以下是我的omp parallel代码。我使用了一些Win32 API和MSVC CRT:
void test_io2()
{
  const static int BUFFER_SIZE = 1024;
  const static int CONCURRENCY = 4;

  uint64_t local_checksums[CONCURRENCY];
  uint64_t local_reads[CONCURRENCY];

  DWORD start = GetTickCount();

  omp_set_num_threads(CONCURRENCY);

  #pragma omp parallel
  {
    int tid = omp_get_thread_num();

    FILE* file = fopen("huge_file.dat", "rb");
    _fseeki64(file, 0, SEEK_END);
    uint64_t total_size = _ftelli64(file);

    uint64_t my_start_pos = total_size/CONCURRENCY * tid;
    uint64_t my_end_pos   = min((total_size/CONCURRENCY * (tid + 1)), total_size);
    uint64_t my_read_size = my_end_pos - my_start_pos;
    _fseeki64(file, my_start_pos, SEEK_SET);

    char* buffer = new char[BUFFER_SIZE];

    uint64_t local_checksum = 0;
    uint64_t local_read = 0;
    size_t read_bytes;
    while ((read_bytes = fread(buffer, 1, min(my_read_size, BUFFER_SIZE), file)) != 0 &&
      my_read_size != 0)
    {
      local_read += read_bytes;
      my_read_size -= read_bytes;
      for (int i = 0; i < read_bytes; ++i)
        local_checksum += (buffer[i]);
    }

    local_checksums[tid] = local_checksum;
    local_reads[tid]     = local_read;

    fclose(file);
  }

  uint64_t checksum = 0;
  uint64_t total_read = 0;
  for (int i = 0; i < CONCURRENCY; ++i)
    checksum += local_checksums[i], total_read += local_reads[i];

  std::cout << checksum << std::endl
    << total_read << std::endl
    << double(GetTickCount() - start)/1000. << std::endl;
}

这段代码看起来有点混乱,因为我需要精确地分配要读取的文件量。但是,这段代码非常简单明了。需要记住的一件事是,您需要拥有每个线程的文件指针。不能简单地共享一个文件指针,因为内部数据结构可能不是线程安全的。此外,可以通过parallel for并行化此代码。但是,我认为这种方法更自然。


简单实验结果

我已经测试了使用此代码在HDD(WD Green 2TB)和SSD(Intel 120GB)上读取10GB文件。

使用HDD,没有获得任何加速。甚至观察到了减速。这清楚地表明这个代码受I/O限制。这个代码实际上没有计算,只有I/O。

然而,使用SSD,我在4个核心上获得了1.2倍的加速。是的,加速比很小。但是,你仍然可以通过SSD获得它。如果计算变得更多一些(我只是放了一个非常短的忙等待循环),加速将是显着的。我能够获得2.5倍的加速。


总之,我建议您尝试并行化此代码。

此外,如果计算不是微不足道的,则建议使用流水线。上述代码只是将其分成几个大块,导致缓存效率低下。但是,管道并行化可能会产生更好的缓存利用率。尝试使用TBB进行管道并行化。他们提供了一个简单的管道结构。


由于我在Unix上运行,Win32 API并没有什么帮助。 - Daniel Standage
丹尼尔,请查看代码,你会发现它与Unix大部分相同。只需替换一些大文件处理函数,如lseekgettimeofday即可。 - minjang

3

您是否确认您的进程实际上是CPU瓶颈而不是I/O瓶颈?您的代码非常像I/O瓶颈代码,这种情况下并行化无济于事。


我还没有检查过,也没有考虑过这种可能性 O_o。有没有一种简单的方法来检查这个? - Daniel Standage
实际上不是这样的。即使是I/O受限的任务,SSD和高性能RAID也可以提供加速。 - minjang
2
@minjang 嗯,当然可以,但是OpenMP或pthread没有#buymeanSSD指令。 - thiton
3
#pragma omp scale take(money) yield(SSD) 是什么意思? - sehe
这取决于您的系统,代码是否会加速。如果您真的想加速,可以插入一个简单的带宽测量代码,仅在带宽足够时运行并行版本。好的,如果您还不确定,就不要进行并行处理 :) - minjang
显示剩余3条评论

0
针对“minding”的问题,我认为你的代码实际上没有进行任何优化。关于此语句“#pragma omp parallel”,有很多常见的误解。这个语句只会生成线程,而没有“for”关键字,所有的线程将执行随后的任何代码。所以,你的代码实际上会使每个线程重复计算。针对Daniel的问题,你是正确的,OpenMP无法优化while循环,唯一的方法是重构代码,让迭代提前确定(例如用计数器循环一次while循环)。抱歉再次发表答案,因为我还不能进行评论,但希望这能澄清常见的误解。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接