在C/C++中快速从二进制文件中读取并添加大量整数

3
我正在使用C/C++编写代码,从一个二进制文件中读取无符号整数,该文件是在8核x86系统上运行的32位Linux操作系统中生成的。应用程序接受一个输入文件,其中包含按顺序排列的小端格式的无符号整数。因此,输入文件大小以字节为单位是4的倍数。该文件可能包含十亿个整数。如何快速读取和添加所有整数,并使用64位精度返回总和?
以下是我的实现。在这种情况下,损坏数据的错误检查不是主要问题,并且假设输入文件没有任何问题。
#include <iostream>
#include <fstream>
#include <pthread.h>
#include <string>
#include <string.h>


using namespace std;

string filepath;
unsigned int READBLOCKSIZE = 1024*1024;
unsigned long long nFileLength = 0;

unsigned long long accumulator = 0; // assuming 32 bit OS running on X86-64
unsigned int seekIndex[8] = {};
unsigned int threadBlockSize = 0; 
unsigned long long acc[8] = {};

pthread_t thread[8];
void* threadFunc(void* pThreadNum);

//time_t seconds1;
//time_t seconds2;

int main(int argc, char *argv[])
{
    if (argc < 2) 
    {
        cout << "Please enter a file path\n";
        return -1;
    }

    //seconds1 = time (NULL);
    //cout << "Start Time in seconds since January 1, 1970 -> " << seconds1 << "\n";

    string path(argv[1]);
    filepath = path;
    ifstream ifsReadFile(filepath.c_str(), ifstream::binary);  // Create FileStream for the file to be read
    if(0 == ifsReadFile.is_open()) 
    {
        cout << "Could not find/open input file\n";
        return -1;
    }

    ifsReadFile.seekg (0, ios::end);
    nFileLength = ifsReadFile.tellg();           // get file size
    ifsReadFile.seekg (0, ios::beg);



    if(nFileLength < 16*READBLOCKSIZE)
    {
        //cout << "Using One Thread\n"; //**
        char* readBuf = new char[READBLOCKSIZE];
        if(0 == readBuf) return -1;

        unsigned int startOffset = 0;   
        if(nFileLength >  READBLOCKSIZE)
        {
            while(startOffset + READBLOCKSIZE < nFileLength)
            {
                //ifsReadFile.flush();
                ifsReadFile.read(readBuf, READBLOCKSIZE);  // At this point ifsReadFile is open
                int* num = reinterpret_cast<int*>(readBuf);
                for(unsigned int i = 0 ; i < (READBLOCKSIZE/4) ; i++) 
                {
                    accumulator += *(num + i);  
                }
                startOffset += READBLOCKSIZE;
            }

        }

        if(nFileLength - (startOffset) > 0)
        {
            ifsReadFile.read(readBuf, nFileLength - (startOffset));  
            int* num = reinterpret_cast<int*>(readBuf);
            for(unsigned int i = 0 ; i < ((nFileLength - startOffset)/4) ; ++i) 
            {
                accumulator += *(num + i);  
            }
        }
        delete[] readBuf; readBuf = 0;
    }
    else
    {
        //cout << "Using 8 Threads\n"; //**
        unsigned int currthreadnum[8] = {0,1,2,3,4,5,6,7};
        if(nFileLength > 200000000) READBLOCKSIZE *= 16; // read larger blocks
        //cout << "Read Block Size -> " << READBLOCKSIZE << "\n";       

        if(nFileLength % 28)
        {
            threadBlockSize = (nFileLength / 28);
            threadBlockSize *= 4;
        }
        else
        {   
            threadBlockSize = (nFileLength / 7);
        }

        for(int i = 0; i < 8 ; ++i)
        {
            seekIndex[i] = i*threadBlockSize;
            //cout << seekIndex[i] << "\n";
        }
        pthread_create(&thread[0], NULL, threadFunc, (void*)(currthreadnum + 0));
        pthread_create(&thread[1], NULL, threadFunc, (void*)(currthreadnum + 1));
        pthread_create(&thread[2], NULL, threadFunc, (void*)(currthreadnum + 2));
        pthread_create(&thread[3], NULL, threadFunc, (void*)(currthreadnum + 3));
        pthread_create(&thread[4], NULL, threadFunc, (void*)(currthreadnum + 4));
        pthread_create(&thread[5], NULL, threadFunc, (void*)(currthreadnum + 5));
        pthread_create(&thread[6], NULL, threadFunc, (void*)(currthreadnum + 6));
        pthread_create(&thread[7], NULL, threadFunc, (void*)(currthreadnum + 7));

        pthread_join(thread[0], NULL);
        pthread_join(thread[1], NULL);
        pthread_join(thread[2], NULL);
        pthread_join(thread[3], NULL);
        pthread_join(thread[4], NULL);
        pthread_join(thread[5], NULL);
        pthread_join(thread[6], NULL);
        pthread_join(thread[7], NULL);

        for(int i = 0; i < 8; ++i)
        {
            accumulator += acc[i];
        }
    }

    //seconds2 = time (NULL);
    //cout << "End Time in seconds since January 1, 1970 -> " << seconds2 << "\n";
    //cout << "Total time to add " << nFileLength/4 << " integers -> " << seconds2 - seconds1 << " seconds\n";

    cout << accumulator << "\n";      
    return 0;
}

void* threadFunc(void* pThreadNum)
{
    unsigned int threadNum = *reinterpret_cast<int*>(pThreadNum);
    char* localReadBuf = new char[READBLOCKSIZE];
    unsigned int startOffset = seekIndex[threadNum];
    ifstream ifs(filepath.c_str(), ifstream::binary);  // Create FileStream for the file to be read
    if(0 == ifs.is_open()) 
    {
        cout << "Could not find/open input file\n";
        return 0;
    }   
    ifs.seekg (startOffset, ios::beg); // Seek to the correct offset for this thread
    acc[threadNum] = 0;
    unsigned int endOffset = startOffset + threadBlockSize;
    if(endOffset > nFileLength) endOffset = nFileLength; // for last thread
    //cout << threadNum << "-" << startOffset << "-" << endOffset << "\n"; 
    if((endOffset - startOffset) >  READBLOCKSIZE)
    {
        while(startOffset + READBLOCKSIZE < endOffset)
        {
            ifs.read(localReadBuf, READBLOCKSIZE);  // At this point ifs is open
            int* num = reinterpret_cast<int*>(localReadBuf);
            for(unsigned int i = 0 ; i < (READBLOCKSIZE/4) ; i++) 
            {
                acc[threadNum] += *(num + i);   
            }
            startOffset += READBLOCKSIZE;
        }   
    }

    if(endOffset - startOffset > 0)
    {
        ifs.read(localReadBuf, endOffset - startOffset);
        int* num = reinterpret_cast<int*>(localReadBuf);
        for(unsigned int i = 0 ; i < ((endOffset - startOffset)/4) ; ++i) 
        {
            acc[threadNum] += *(num + i);   
        }
    }

    //cout << "Thread " << threadNum + 1 << " subsum = " << acc[threadNum] << "\n"; //**
    delete[] localReadBuf; localReadBuf = 0;
    return 0;
}

我写了一个小的C#程序来生成用于测试的输入二进制文件。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;

namespace BinaryNumWriter
{
    class Program
    {
        static UInt64 total = 0;
        static void Main(string[] args)
        {
            BinaryWriter bw = new BinaryWriter(File.Open("test.txt", FileMode.Create));
            Random rn = new Random();
            for (UInt32 i = 1; i <= 500000000; ++i)
            {
                UInt32 num = (UInt32)rn.Next(0, 0xffff);
                bw.Write(num);
                total += num;
            }
            bw.Flush();
            bw.Close();
        }
    }
}

在一台Core i5机器上运行程序,时钟频率为3.33 GHz(虽然它是四核的,但目前只有这个)配备2 GB RAM和Ubuntu 9.10 32位操作系统,其性能如下:
- 100个整数~0秒(否则我真的要很失败) - 100,000个整数<0秒 - 1亿个整数~7秒 - 5亿个整数~29秒(1.86 GB输入文件)
我不确定硬盘是5400RPM还是7200RPM。我尝试了不同的缓冲区大小来读取,发现每次读取16 MB对于大型输入文件来说是一个比较好的选择。
有没有更好的方法可以从文件中更快地读取以提高整体性能?是否有更聪明的方法来更快地添加大量整数并重复折叠?在我编写代码方面是否存在任何主要障碍/我是否做错了什么导致浪费了很多时间?
我该怎么做才能使读取和添加数据的过程更快?
谢谢。
Chinmay

2
你的进程可能会受到IO限制。与使用fread()读取块的单线程实现进行比较。 - jfs
1
将输入数据分成8个不同的文件,并将每个文件放置在不同的“物理”硬盘中,您可能会看到一些改进。但是除此之外,我认为您的CPU可以比硬盘更快地读取和添加这些数字。但是由于数字很少,我只会使用单个线程来处理它们。 - Martin York
@J.F.Sebastian - 我强制让我的程序通过循环的单线程部分运行了500000000个整数,并在27秒内完成了计算总和。这比8线程实现快2秒。我没想到会这样。另外,fstream.write()是C++中fread()的等效函数吗?还是使用fread()有什么好处? - Chinmay Nerurkar
@LokiAstari - 对于小文件大小,我使用单线程实现,对于较大的文件大小则使用8个线程。但我猜测将整个1.86GB文件放在一个硬盘上似乎会削弱多线程的任何好处。这就是为什么多线程实现需要29秒而单线程实现只需27秒的原因吗? - Chinmay Nerurkar
1
@ChinmayNerurkar:C++ IO需要特殊配置才能像stdio一样快,例如https://dev59.com/_mox5IYBdhLWcg3wLhei。您还可以尝试使用带有POSIX_FADV_SEQUENTIAL、POSIX_FADV_WILLNEED的posix_fadvice()函数。 - jfs
显示剩余3条评论
2个回答

3
从多个线程访问机械硬盘的方式会导致一些磁头移动(读取速度变慢)。您几乎肯定会受到IO限制(1.86GB文件的速度为65MBps)。
请尝试更改您的策略:
- 启动8个线程 - 我们称之为"消费者"。 - 这8个线程将等待数据可用。 - 在主线程中,开始读取文件块(例如256KB),以提供给"消费者"。 - 主线程到达EOF并通知工作线程没有更多可用的数据。 - 主线程等待这8个线程加入。 要使其正常工作,您需要进行相当多的同步,并且我认为通过顺序文件访问可以完全充分利用HDD /文件系统的IO能力。对于较小的可以从缓存中缓存并以闪电般的速度提供服务的文件,则可能会有所不同。
您可以尝试的另一件事是仅启动7个线程,为主线程和系统剩余空出一个CPU。
..或者购买SSD :)
编辑:
为了简单起见,请查看可以使用单线程简单地读取文件(丢弃缓冲区)而不进行任何处理的速度。这个速度再加上epsilon就是您可以完成此操作的理论极限速度。

我避免使用Provider-Consumer模式,因为消费者必须等待提供者,这种同步会浪费时间。相反,我将文件分成不重叠的部分,以便线程不需要同步。奇怪的是,使用单线程方法比使用8个线程的方法性能提高了2秒钟。如果不使用Provider-Consumer模式,我是否会错过什么?我同意关于HDD读取速度瓶颈的观点。我将尝试您的建议,只读取和丢弃缓冲区,并计时该过程。 - Chinmay Nerurkar
1
当您同时从8个线程读取相同的硬盘时,每次读取需要14-11毫秒来重新对齐磁头(最坏情况)。对于1.86GB而言,这意味着浪费了1.6秒的时间(1.86GB/16MB*14ms),这正是您所看到的。在您的情况下,提供者-消费者可能实际上比单线程更糟糕,因为您的计算量很小,但如果您不是进行加法而是进行阶乘或正切等操作,则可以遵循该模式。 只有当您拥有一个CPU绑定的进程和多核CPU时,多个线程才有用。这在这里并不适用。 - Cristian Niculescu
这绝对是了解硬盘头重新对齐时间的有用信息。我注释掉了添加读取数字的for循环,但仍然需要29秒才能使用8个线程读取1.86GB的文件。这在仅仅读取文件方面来说是异常耗时的。而且我有一个四核CPU。为什么会出现“这里不是这种情况”?是因为每个线程都按顺序读取以便重新对齐头部吗? - Chinmay Nerurkar

3
如果您想快速读取(或写入)大量数据,并且不想对该数据进行太多处理,则需要避免在缓冲区之间进行额外的数据副本。这意味着您要避免使用fstream或FILE抽象(因为它们引入了一个需要复制的额外缓冲区),并避免在内核和用户缓冲区之间复制内容的读/写类型调用。
相反,在Linux上,您需要使用mmap(2)。在64位操作系统上,只需将整个文件映射到内存中,使用madvise(MADV_SEQUENTIAL)告诉内核您将主要按顺序访问它,然后开始工作。对于32位操作系统,您需要分块映射,每次取消映射前一个块。与您当前的结构非常相似,每个线程每次都会映射一个固定大小的块,应该能够很好地工作。

那听起来确实很有趣也很有用。我会在晚上试一下。32位系统上的块能有多大才能映射它们呢?谢谢。 - Chinmay Nerurkar
我尝试使用mmap()映射一个小的400KB文件。程序的结果是0。由于无法在此处发布代码,因此我已将代码粘贴到Pastebin链接上。你能告诉我我做错了什么吗?谢谢。 - Chinmay Nerurkar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接