C++如何在主线程中处理多个线程

5

我对多线程还不太熟悉,如果这些问题太简单,请见谅。

我的应用程序需要在一个线程中创建多个线程,并从每个线程执行操作。

例如,我有一组要读取的文件,比如50个,我使用CreateThread()函数创建一个线程来读取这些文件。

现在,这个主线程创建4个线程来访问文件。第1个线程被分配文件1,第2个线程被分配文件2,以此类推。

在第1个线程完成读取文件1并向主线程提供所需数据后,主线程需要调用文件5并获取其中的数据。对于所有其他线程都是相似的,直到所有50个文件都被读取。

之后,每个线程都被销毁,最后销毁主线程。

我面临的问题是:

1)如何停止线程在文件读取后退出?

2)如何再次调用线程并指定其他文件名?

3)我的子线程如何向主线程传递信息?

4)线程完成读取文件并向主线程返回数据后,主线程如何知道是哪个线程提供的数据?

谢谢。

3个回答

4
这是多线程编程中非常常见的问题。你可以把它看作是一个生产者-消费者问题:主线程“生产”任务,由工作线程“消费”任务(例如:http://www.mario-konrad.ch/blog/programming/multithread/tutorial-06.html)。你可能还想了解一下“线程池”。
我强烈建议阅读boost的同步库(http://www.boost.org/doc/libs/1_50_0/doc/html/thread.html)并使用其线程功能,因为它是平台无关的且易于使用。
更具体地回答你的问题:你应该创建一个包含要执行操作的队列(通常对所有工作线程都是相同的队列。如果你确实希望确保线程1执行任务1、5、9…,你可能需要为每个工作线程创建一个队列)。通过mutex同步访问此队列,当向mutex添加新数据时,等待线程可以通过condition_variables被通知。
1.) 不要退出线程函数,而是等待条件触发,然后使用while ([exit condition not true])循环重新启动
2.) 参见1。
3.) 通过任何双方都可以访问且由mutex保护的变量(例如结果队列)
4.) 通过将此信息作为写入结果队列的结果添加。
另一个建议:始终很难正确处理多线程。因此,尽可能小心并编写测试以检测死锁和竞争条件。

0
这种问题的典型解决方案是使用线程池和队列。主线程将所有文件/文件名推送到队列中,然后启动一个线程池,即不同的线程,在其中每个线程从队列中取出一个项目并处理它。当一个项目被处理时,它继续进行下一个(如果此时队列尚未为空)。当队列为空且所有线程已退出时,主线程知道一切都已处理完毕。
因此,1)和2)有些冲突:您不需要停止线程并再次调用它,只要在队列上找到项目,它就会一直运行。 对于3),您可以再次使用队列,线程将信息放入其中,主线程从中读取。对于4),您可以为每个线程分配一个ID,并将其与数据一起放置。但通常,主线程不需要知道哪个线程确切地处理了数据。
以下是一些非常基本的伪代码,省略了线程安全锁定,以便让您有一个想法:
//main
for( all filenames )
  queue.push_back( filename );

//start some thread
threadPool.StartThreads( 4, CreateThread( queue ) );

//wait for threads to end
threadPool.Join();

//thread
class Thread
{
public:
  Thread( queue q ) : q( q ) {}

  void Start();

  bool Join();

  void ThreadFun()
  {
    auto nextQueueItem = q.pop_back();
    if( !nextQueuItem )
      return; //q empty
    ProcessItem( nextQueueItem );
  }
}

队列 queue 是指标准库中的 std::queue 吗?那么你的示例代码中缺少互斥锁或其他锁定机制,对吗? - Philipp

0

无论您使用线程池与否来执行同步文件读取,它都归结为一系列函数或函数组必须串行运行。因此,假设您找到了一种并行执行函数的方法(可以是每个函数启动一个线程或使用线程池),要等待前4个文件读取完成,您可以使用队列,其中读取线程将其结果推入队列中,第五个函数现在从队列中拉出4个结果(当队列为空时,队列会阻塞)并进行处理。如果函数之间有更多依赖关系,您可以在它们之间添加更多队列。草图:

void read_file( const std::string& name, queue& q )
{
    file_content f= .... // read file
    q.push( f )
}

void process4files( queue& q )
{
    std::vector< file_content > result;
    for ( int i = 0; i != 4; ++i )
        result.push_back( q.pop() ); 

    // now 4 files are read ...
    assert( result.size() == 4u );
}

queue       q;
thread t1( &read_file, "file1", q );
thread t2( &read_file, "file2", q );
thread t3( &read_file, "file3", q );
thread t4( &read_file, "file4", q );
thread t5( &process4files, q );

t5.join();

希望你明白我的意思。

Torsten


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接