等待分离线程完成的正确方法是什么?

3

看看这个示例代码:

void OutputElement(int e, int delay)
{
    this_thread::sleep_for(chrono::milliseconds(100 * delay));
    cout << e << '\n';
}

void SleepSort(int v[], uint n)
{
    for (uint i = 0 ; i < n ; ++i)
    {
        thread t(OutputElement, v[i], v[i]);
        t.detach();
    }
}

这段代码启动了n个新线程,每个线程在输出一个值并结束前都会休眠一段时间。在这种情况下,等待所有线程结束的正确/最佳/推荐方法是什么?我知道如何解决这个问题,但我想知道在这种情况下应该使用哪种推荐的多线程工具/设计(例如condition_variablemutex等)。


1
只要不要使用 detach() 分离你的线程,那么你就可以正确地使用 join() - 5gon12eder
你是在暗示我需要有 nstd::thread 对象,全部启动并随后调用它们的 join() 方法吗? - NPS
差不多了。那么我应该什么时候呢? - NPS
不要等待它们完成。如果在任何阶段强制终止它们没有不良后果,就让操作系统在进程终止时终止它们。 - Martin James
1
@NPS 但是为什么你想要分离该线程? - Jagannath
显示剩余8条评论
2个回答

10
现在来说一个略微不同意的答案。我要强调的是“略微”,因为我大部分都同意其他答案和评论的观点,即“不要分离,而是加入”。
首先想象一下没有“join()”。你必须通过互斥锁和条件变量在线程之间通信。这并不难,也不复杂。它允许任意丰富的通信,可以是任何你想要的东西,只要在互斥锁被锁定时进行通信。
现在一个非常常见的通信习惯很简单,就是一个状态,表示“我完成了”。子线程会设置这个状态,父线程会在条件变量上等待,直到子线程说“我完成了。”实际上,这种习惯是如此普遍,以至于应该有一个方便的函数封装了互斥锁、条件变量和状态。
join()就是这个方便函数。
但在我看来,需要小心。当说:“永远不要分离,总是加入”时,这可能被解释为:永远不要使你的线程通信比“我完成了”更复杂。
对于父线程和子线程之间更复杂的交互,请考虑以下情况:父线程启动多个子线程去独立寻找问题的解决方案。当任何一个线程首次发现问题时,它会将问题通知给父线程,然后父线程可以获取该解决方案,并告诉所有其他线程不再需要搜索了。
例如:
#include <chrono>
#include <iostream>
#include <iterator>
#include <random>
#include <thread>
#include <vector>

void OneSearch(int id, std::shared_ptr<std::mutex> mut,
                   std::shared_ptr<std::condition_variable> cv,
                   int& state, int& solution)
{
    std::random_device seed;
//     std::mt19937_64 eng{seed()};
    std::mt19937_64 eng{static_cast<unsigned>(id)};
    std::uniform_int_distribution<> dist(0, 100000000);
    int test = 0;
    while (true)
    {
        for (int i = 0; i < 100000000; ++i)
        {
            ++test;
            if (dist(eng) == 999)
            {
                std::unique_lock<std::mutex> lk(*mut);
                if (state == -1)
                {
                    state = id;
                    solution = test;
                    cv->notify_one();
                }
                return;
            }
        }
        std::unique_lock<std::mutex> lk(*mut);
        if (state != -1)
            return;
    }
}

auto findSolution(int n)
{
    std::vector<std::thread> threads;
    auto mut = std::make_shared<std::mutex>();
    auto cv = std::make_shared<std::condition_variable>();
    int state = -1;
    int solution = -1;
    std::unique_lock<std::mutex> lk(*mut);
    for (uint i = 0 ; i < n ; ++i)
        threads.push_back(std::thread(OneSearch, i, mut, cv,
                          std::ref(state), std::ref(solution)));
    while (state == -1)
        cv->wait(lk);
    lk.unlock();
    for (auto& t : threads)
        t.join();
    return std::make_pair(state, solution);
}

int
main()
{
    auto p = findSolution(5);
    std::cout << '{' << p.first << ", " << p.second << "}\n";
}

上面我创建了一个“虚拟问题”,其中一个线程搜索需要查询URNG多少次,直到出现数字999。父线程将5个子线程放到工作中。子线程工作一段时间,然后偶尔查看是否有其他线程已经找到解决方案。如果找到了,它们就退出,否则它们继续工作。主线程等待直到找到解决方案,然后与所有子线程合并
对我来说,使用bash时间设施,这将输出:
$ time a.out
{3, 30235588}

real    0m4.884s
user    0m16.792s
sys 0m0.017s

但如果它不是将所有线程连接起来,而是分离那些尚未找到解决方案的线程。这可能看起来像:

    for (unsigned i = 0; i < n; ++i)
    {
        if (i == state)
            threads[i].join();
        else
            threads[i].detach();
    }

(使用以下代码代替上面的t.join()循环。) 对我来说,现在运行时间为1.8秒,而不是之前的4.9秒。即子线程没有经常互相检查,因此主程序只需分离工作线程并让操作系统关闭它们。对于这个示例来说,这是安全的,因为子线程拥有它们所涉及的所有内容。没有任何东西会在它们下面被摧毁。

最后一个迭代可以通过注意到甚至找到解决方案的线程也不需要加入进来来实现。 所有的线程都可以被分离。代码实际上要简单得多:

auto findSolution(int n)
{
    auto mut = std::make_shared<std::mutex>();
    auto cv = std::make_shared<std::condition_variable>();
    int state = -1;
    int solution = -1;
    std::unique_lock<std::mutex> lk(*mut);
    for (uint i = 0 ; i < n ; ++i)
        std::thread(OneSearch, i, mut, cv,
                          std::ref(state), std::ref(solution)).detach();
    while (state == -1)
        cv->wait(lk);
    return std::make_pair(state, solution);
}

性能仍然维持在大约1.8秒左右。

这里仍然有一种(某种程度上的)有效的加入解决方案线程的方法。但是它是通过condition_variable::wait而不是join来实现的。

thread::join()是一个便利函数,用于非常常见的模式,即父/子线程通信协议只是“我完成了”。在这种常见情况下,优先使用thread::join(),因为它更易读,更易写。

但是,在不必要地限制自己使用如此简单的父/子通信协议的同时,也不要害怕在需要时构建自己的更丰富的协议。在这种情况下,thread::detach()通常更合适。 thread::detach()并不一定意味着启动并忘记线程。它可以仅意味着您的通信协议比“我完成了”更复杂。


很好的答案,但是在这里通过detach()分离线程获得胜利,至少部分原因是因为在你的示例中main立即返回了吗?如果这是在更复杂的程序中,我们仍然会在短时间内保持线程运行。我认为这里最大的优势是通过原子标志进行通信,以避免需要锁定。然后,我们也可以更频繁地轮询该标志。如果简单性是目标,那么std::packaged_task可能会有用。 - 5gon12eder
1
@5gon12eder:这个例子的重点是说明除了“fire-and-forget”之外,存在合法的使用detach的情况,并且join只是一个封装了互斥量和条件变量的便利函数。在互斥量、原子操作和轮询频率之间总会存在权衡取舍,并且没有一种解决方案适用于所有用例。本答案旨在扩大您的工具箱,包括使用detach结合mutexcondition_variable进行替代通信。当然,原子操作和packaged_task也应该成为您的工具箱中的一部分。 - Howard Hinnant

4
不要分离,而是加入:
std::vector<std::thread> ts;

for (unsigned int i = 0; i != n; ++i)
    ts.emplace_back(OutputElement, v[i], v[i]);

for (auto & t : threads)
    t.join();

为什么join操作会阻塞? - user3085931
@user3085931:是的。将其视为等待(block)的一种方式,即在问题的上下文中查看。 - Kerrek SB

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接