std::thread:如何等待(join)任意给定的线程完成?

16

例如,我有两个线程,t1t2。我想等待 t1 或者 t2 完成,这可能吗?

如果我有一系列线程,比如一个 std::vector<std::thread>,我该怎么做呢?


1
你有一个 [mcve] 来展示你自己尝试解决这个问题吗? - Xirema
1
我不知道有任何标准机制可以直接做到这一点。你实际上想要解决什么问题?也许有另一种替代方案。 - François Andrieux
1
@KerrekSB 这将会阻塞所有线程。我认为,如果你想要阻塞 一个 线程(n个线程中的一个),并且仍然使用 std::thread,那么你需要一个外部变量(例如 std::atomic_bool)作为信号。 - Chad
啊,好的,你只想阻塞直到第一个线程完成...没关系! - Kerrek SB
@t123yh 可能是 POSIX 吗? - Bob
这取决于你写了什么以及接下来想要做什么。你可以让它们共享一个互斥锁并在主线程上等待。这取决于你想对那些没有先完成的线程做什么(让它们完成?杀死它们?忽略它们?)。它们有输出吗?你是如何获取输出的?那么其他线程的输出呢?等等... - Donnie
4个回答

10

您可以使用std::condition_variable实现等待和通知(wait & notify),例如:

std::mutex m;
std::condition_variable cond;
std::atomic<std::thread::id> val;

auto task = [&] {
    std::this_thread::sleep_for(1s); // Some work

    val = std::this_thread::get_id();
    cond.notify_all();
};

std::thread{task}.detach();
std::thread{task}.detach();
std::thread{task}.detach();

std::unique_lock<std::mutex> lock{m};
cond.wait(lock, [&] { return val != std::thread::id{}; });

std::cout << "Thread " << val << " finished first" << std::endl;

注意:val并不一定代表最先完成的线程,因为所有线程都大约在同一时间结束,并且可能会发生覆盖,但这只是为了本示例而言。


4
请注意,分离线程是一种设计决策,而不是编码方便,只有在极少数情况下才适用。在这种情况下,它是不必要的。一旦主线程唤醒,它就可以加入这三个线程。 - Pete Becker
1
点赞。一旦您将.join()视为不是魔法,而只是互斥锁和条件变量之间通信的语法糖,那么这就是正确的方法。当.join()的默认行为不是您想要的时候,请自己编写。 - Howard Hinnant
1
完整示例:主线程监视10个线程1秒钟。平均而言,其中一半将在1秒内完成工作,另一半则不会。主线程在每个完成的线程上打印结果,并在1秒后协同取消其余线程。然后,主线程等待所有已取消的线程返回后再返回自身。不需要原子操作。只需使用互斥锁和条件变量之间的有限状态机进行线程间(双向)通信即可。可以轻松扩展此示例的通信丰富性。https://wandbox.org/permlink/5RjOy09nY89zqTwv - Howard Hinnant
Howard Hinnant: 据我所知,“join()” 确实 是魔法(从库用户的角度来看),因为线程本地对象的析构函数保证已经完成。这在用户代码中通常无法模拟。 - Arne Vogel

2
不,C++11的线程库中没有等待多个对象的功能。
如果要等待一组操作中的第一个完成,可以考虑使用线程安全的生产者-消费者队列。这篇文章Here包含了一个threaded_queue<T>的实现,将线程的工作产品发送到这样的队列中,并从队列的另一端读取结果。
现在可以同时等待多个线程的工作产品,或等待单个线程、GPU着色器或通过RESTful网络接口传递的工作产品。您并不关心这些细节。
线程本身应该由类似于线程池或其他基于std::thread的更高级抽象管理,因为std::thread本身并不是一个很好的面向客户端的线程抽象。
template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return r;
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};

我会在C++17中使用std::optional而不是boost::optional。它也可以用unique_ptr或其他构造替换。

1
这很容易通过轮询等待来实现:
#include<iostream>
#include<thread>
#include<random>
#include<chrono>
#include<atomic>

void thread_task(std::atomic<bool> & boolean) {
    std::default_random_engine engine{std::random_device{}()};
    std::uniform_int_distribution<int64_t> dist{1000, 3000};
    int64_t wait_time = dist(engine);
    std::this_thread::sleep_for(std::chrono::milliseconds{wait_time});
    std::string line = "Thread slept for " + std::to_string(wait_time) + "ms.\n";
    std::cout << line;
    boolean.store(true);
}

int main() {
    std::vector<std::thread> threads;
    std::atomic<bool> boolean{false};
    for(int i = 0; i < 4; i++) {
        threads.emplace_back([&]{thread_task(boolean);});
    }
    std::string line = "We reacted after a single thread finished!\n";
    while(!boolean) std::this_thread::yield();
    std::cout << line;
    for(std::thread & thread : threads) {
        thread.join();
    }
    return 0;
}

在Ideone.com上我得到的示例输出:
Thread slept for 1194ms.
We reacted after a single thread finished!
Thread slept for 1967ms.
Thread slept for 2390ms.
Thread slept for 2984ms.

这可能不是最佳代码,因为轮询循环并不一定是最佳实践,但它应该作为一个开始而有效。

可以通过使用std::condition_variable而不是使用while循环轮询来改进这个问题。 - rwols
如果您知道那段代码的样子,可以将其提交为答案。我对条件变量的使用还不是非常熟悉。 - Xirema

0

等待多个线程没有标准的方法。

您需要使用操作系统特定的函数,例如在Windows上使用WaitForMultipleObjects。 以下是一个仅适用于Windows的示例:

HANDLE handles[] = { t1.native_handle(), t2.native_handle(),  };
auto res = WaitForMultipleObjects(2 , handles, FALSE, INFINITE);

有趣的是,当std::when_any被标准化后,人们可以使用一种标准但浪费的解决方案:
std::vector<std::thread> waitingThreads;
std::vector<std::future<void>> futures;
for (auto& thread: threads){
    std::promise<void> promise;
    futures.emplace_back(promise.get_future());
    waitingThreads.emplace_back([&thread, promise = std::move(promise)]{
         thread.join();
         promise.set_value();
    });
}

auto oneFinished = std::when_any(futures.begin(), futures.end());

非常浪费,仍然不可用,但标准。


waitingThreads.emplace lambda 中,您是指 thread 而不是 t 吗?(或者在循环中是 t 而不是 thread?) - Caleth

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接