std::thread - 逐行读取文件

4

我希望能够从输出文件中并行逐行阅读。每个线程读取一行,然后处理数据。同时,下一个线程必须读取下一行。

std::ifstream infile("test.txt");
std::mutex mtx;

void read(int id_thread){
   while(infile.good()){
     mtx.lock();
     std::string sLine;
     getline(infile, sLine);
     std::cout << "Read by thread: " << id_thread;
     std::cout << sLine << std::endl;
     mtx.unlock();
   }
}

void main(){
  std::vector<std::thread> threads;
  for(int i = 0; i < num; i++){
     threads.push_back(std::thread(parallelFun, i));
  }

  for(auto& thread : threads){
      thread.join();
  }
  return 0;
}

当我运行这段代码时,我得到了这个结果:第一个线程读取了所有行。如何让每个线程只读取一行? 编辑 如评论中所提到的,我只需要使用更大的测试文件。 谢谢大家!

2
什么问题?无论如何,读取都是由互斥锁进行序列化的。 - Anton Savin
3
这就是线程的工作方式,它们运行一段时间,然后让另一个线程运行一段时间,如此循环。如果单个线程运行了足够长的时间以读取短文件中的所有行,则您必须想出其他方法来逐行阅读(例如发出条件变量的信号?)。 - Some programmer dude
3
如果您希望在处理数据的同时进行读取,您应该在读取完一行后立即解锁互斥量,而不是在处理已读取的数据之后再解锁。 - Angew is no longer proud of SO
2
可能存在一个问题,即在检查文件仍然有效后立即调度线程,但在文件读取完成时返回。这不是死锁情况,但在大文件和许多线程的情况下可能会产生IO错误。解决方法是将infile.good()移动到关键部分,并使用其结果更新一个bool,该bool用于while循环。 - didierc
1
你所要求的基本上是线程滥用。说:“[我]想要:第一个线程读取第一行,第二个线程读取第二行,...,第n个线程读取第n行”实际上就是说:“我想要串行执行。”如果你想要串行执行,不要使用多个线程。 - Jerry Coffin
显示剩余4条评论
3个回答

6
我会将循环改为
while(infile.good()){
     mtx.lock();
     std::string sLine;
     getline(infile, sLine);
     mtx.unlock();
     std::cout << "Read by thread: " << id_thread;
     std::cout << sLine << std::endl;
   }

您的std::cout部分是测试循环中需要稍后替换为实际代码的繁忙部分。这样可以给其他线程腾出时间来启动。此外,请将您的测试文件制作得较大。在初始化线程花费一些时间且第一个线程占用所有数据的情况并不罕见。


1
在我看来,这似乎是把糟糕的代码变得更糟。原代码中有一个损坏的输入循环,你没有修复它。通过将输出移至互斥部分之外,您允许多个线程并发写入,因此输出可能不再连贯。 - Jerry Coffin
3
这段代码不具备异常安全,请使用std::lock_guard或其他RAII互斥锁处理程序,以确保在下一个循环迭代或函数退出之前始终释放锁。 - Mgetz
1
@JerryCoffin 我的假设是这只是一个极简的示例代码,而并发线程在实际代码中应该执行一些真正繁重的任务,不一定与同步I/O相关。如果问题是“如何在线程之间同步I/O”,我同意应该提到这一点。 - Oncaphillis

2
如果您想让您的5个线程按照每5行读取一次,那么您必须同步读取,这样每个线程都必须知道前一个已经完成了其部分的读取。这一要求可能会造成巨大的效率低下,因为一些线程可能需要等待很长时间才能运行。

概念代码,未经测试,请自行决定风险。

首先我们可以制作一个默认类来处理原子锁。我们对齐它以避免假共享和相关的高速缓存来回跳动问题。

constexpr size_t CACHELINESIZE = 64; // could differ on your architecture
template<class dType>
class alignas(CACHELINESIZE) lockstep {
  std::atomic<dType> lock = dType(0);

public:
  // spinlock spins until the previous value is prev and then tries to set lock to value
  // until success, restart the spin if prev changes.
  dType Spinlock(dType prev = dType(0), dType next = dType(1)) {
     dType expected = prev;
     while (!lock.compare_exchange_weak(expected, next)) { // request for locked-exclusiv ~100 cycles?
       expected = prev;  // we wish to continue to wait for expected
       do {
         pause(); // on intel waits roughly one L2 latency time.
       } while(lock.load(std::memory_order_relaxed) != prev);  // only one cache miss per change
     }
     return expected;
  }

  void store(dType value) {
    lock.store(value);
  }
};

lockstep<int> lock { 0 };

constexpr int NoThreads = 5;

std::ifstream infile("test.txt");

void read(int id_thread) {
   locks[id_thread].lock = id_thread;
   bool izNoGood = false;
   int next = id_thread;

   while(!izNoGood){
     // get lock for next iteration
     lock.spinlock(next, next); // wait on our number

     // moved file check into locked region     
     izNoGood = !infile.good();
     if (izNoGood) {
       lock.store(next+1); // release next thread to end run.
       return;
     }

     std::string sLine;
     getline(infile, sLine);

     // release next thread
     lock.store(next+1);

     // do work asynchronous
     // ...

     // debug log, hopefully the whole line gets written in one go (atomic)
     // but can be in "random" order relative to other lines.
     std::cout << "Read by thread: " << id_thread << " line no. " << next
               << " text:" << sLine << std::endl;  // endl flushes cout, implicit sync?
     next += NoThreads;  // our next expected line to process
   }
}

void main() {
  std::vector<std::thread> threads;
  for(int i = 0; i < NoThreads; i++) {
     threads.push_back(std::thread(parallelFun, i));
  }

  for(auto& thread : threads){
      thread.join();
  }
  return 0;
}

1

如果你希望每个线程读取一行(从你的描述中可以看出),请删除while循环,然后确保你有与文件中行数相同的线程数量。

为了摆脱上述限制,您可以使用boost threadpool。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接