使用std::thread实现并行计算的方法?

5

我刚接触std::thread,并尝试编写一个parallel_for。我编写了以下代码:

// parallel_for.cpp
// compilation: g++ -O3 -std=c++0x parallel_for.cpp -o parallel_for -lpthread
// execution: time ./parallel_for 100 50000000 
// (100: number of threads, 50000000: vector size)
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <vector>
#include <thread>
#include <cmath>
#include <algorithm>
#include <numeric>
#include <utility>

// Parallel for
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
    const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
    std::vector<std::thread> threads;
    for (Iterator it = first; it < last; it += group) {
        threads.push_back(std::thread([=](){std::for_each(it, std::min(it+group, last), f);}));
    }
    std::for_each(threads.begin(), threads.end(), [=](std::thread& x){x.join();});
}

// Function to apply
template<typename Type>
void f1(Type& x)
{
    x = std::sin(x)+std::exp(std::cos(x))/std::exp(std::sin(x)); 
}

// Main
int main(int argc, char* argv[]) {

    const unsigned int nthreads = (argc > 1) ? std::atol(argv[1]) : (1);
    const unsigned int n = (argc > 2) ? std::atol(argv[2]) : (100000000);
    double x = 0;
    std::vector<double> v(n);
    std::iota(v.begin(), v.end(), 0);

    parallel_for(v.begin(), v.end(), f1<double>, nthreads);

    for (unsigned int i = 0; i < n; ++i) x += v[i];
    std::cout<<std::setprecision(15)<<x<<std::endl;
    return 0;
}

但是这个不起作用:(来自g++ 4.6的错误代码)
parallel_for.cpp: In instantiation of ‘parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]::<lambda()>’:
parallel_for.cpp:22:9:   instantiated from ‘void parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]’
parallel_for.cpp:43:58:   instantiated from here
parallel_for.cpp:22:89: erreur: field ‘parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]::<lambda()>::__f’ invalidly declared function type

如何解决这个问题?
编辑:这个新版本可以编译,但是没有给出正确的结果:
// parallel_for.cpp
// compilation: g++ -O3 -std=c++0x parallel_for.cpp -o parallel_for -lpthread
// execution: time ./parallel_for 100 50000000 
// (100: number of threads, 50000000: vector size)
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <vector>
#include <thread>
#include <cmath>
#include <algorithm>
#include <numeric>
#include <utility>

// Parallel for
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
    const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
    std::vector<std::thread> threads;
    for (Iterator it = first; it < last; it += group) {
        threads.push_back(std::thread([=, &f](){std::for_each(it, std::min(it+group, last), f);}));
    }
    std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}

// Function to apply
template<typename Type>
void f(Type& x)
{
    x = std::sin(x)+std::exp(std::cos(x))/std::exp(std::sin(x)); 
}

// Main
int main(int argc, char* argv[]) {

    const unsigned int nthreads = (argc > 1) ? std::atol(argv[1]) : (1);
    const unsigned int n = (argc > 2) ? std::atol(argv[2]) : (100000000);
    double x = 0;
    double y = 0;
    std::vector<double> v(n);

    std::iota(v.begin(), v.end(), 0);
    std::for_each(v.begin(), v.end(), f<double>);
    for (unsigned int i = 0; i < n; ++i) x += v[i];

    std::iota(v.begin(), v.end(), 0);
    parallel_for(v.begin(), v.end(), f<double>, nthreads);
    for (unsigned int i = 0; i < n; ++i) y += v[i];

    std::cout<<std::setprecision(15)<<x<<" "<<y<<std::endl;
    return 0;
}

结果是:
./parallel_for 1 100
155.524339894552 4950

并行版本返回4950,而串行版本返回155……问题出在哪里?
6个回答

5

在(last-first)处需要进行转换或类型转换。原因是在模板参数推断期间不会执行类型转换。

这样做非常好(同时修复了DeadMG和Ben Voigt发现的问题)。两个版本都以n = 100000000为例,给出了156608294.151782。

template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
    const unsigned int group = std::max(std::max(ptrdiff_t(1), ptrdiff_t(std::abs(threshold))), ((last-first))/std::abs(nthreads));
    std::vector<std::thread> threads;
    threads.reserve(nthreads);
    Iterator it = first;
    for (; it < last-group; it += group) {
        threads.push_back(std::thread([=,&f](){std::for_each(it, std::min(it+group, last), f);}));
    }
    std::for_each(it, last, f); // last steps while we wait for other threads

    std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}

由于步骤for_each(it, last, f)较小,我们可以利用调用线程在等待其他结果时完成该步骤。


1
  • 必须通过引用捕获函数。

    [=, &f] () { /* your code */ };

  • 看一下代码。

    #include <iostream>
    
    template <class T>
    void foo(const T& t)
    {
        const int a = t;
        [&]
        {
            std::cout << a << std::endl;
        }();
    }
    
    
    int main()
    {
        foo(42);
        return 0;
    }
    

    Clang输出42,但g++会抛出警告:‘a’ is used uninitialized in this function,并打印0。看起来像是一个错误。

    解决方法:在您的代码中使用const auto(对于变量group)。

    更新:我想,就是这样。http://gcc.gnu.org/bugzilla/show_bug.cgi?id=52026


谢谢!但现在出现了一个新问题。 - Vincent

1

一个问题是it += group在法律上可以是last,但在超出范围时创建值是未定义的行为。仅仅检查it < last已经太晚了。

相反,您需要在it仍然有效时测试last - it。(虽然group的计算方式应该是安全的,但it + grouplast - group都不确定是否安全。)

例如:

template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function f, const int nthreads = 1, const int threshold = 100)
{
    const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
    std::vector<std::thread> threads;
    threads.reserve(nthreads);
    Iterator it = first;
    for (; last - it > group; it += group) {
        threads.push_back(std::thread([=, &f](){std::for_each(it, it+group, last), f);}));
    }
    threads.push_back(std::thread([=, &f](){std::for_each(it, last, f);}));

    std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}

是的。OP在另一个帖子中已经发布了这个问题,所以那里已经有答案了。小细节:在第二个for_each中,it不在使用范围内。 - Johan Lundberg

0

vc11解决方案,请告诉我是否与gcc不兼容。

template<typename Iterator, class Function>
void parallel_for( const Iterator& first, const Iterator& last, Function&& f, const size_t nthreads = std::thread::hardware_concurrency(), const size_t threshold = 1 )
{
    const size_t portion = std::max( threshold, (last-first) / nthreads );
    std::vector<std::thread> threads;
    for ( Iterator it = first; it < last; it += portion )
    {
        Iterator begin = it;
        Iterator end = it + portion;
        if ( end > last )
            end = last;

        threads.push_back( std::thread( [=,&f]() {
            for ( Iterator i = begin; i != end; ++i )
                f(i);
        }));
    }
    std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}

0

你需要通过引用进行捕获,同时在(last-first)处进行转换或类型转换。

原因是模板参数推断过程中永远不会进行类型转换。

此外,修复DeadMG发现的问题,然后你就可以得到以下代码。

它可以正常运行,两个版本在n=100000000时都可以给出156608294.151782。

template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
    const unsigned int group = std::max(std::max(ptrdiff_t(1), ptrdiff_t(std::abs(threshold))), ((last-first))/std::abs(nthreads));
    std::vector<std::thread> threads;
    Iterator it = first;
    for (; it < last-group; it += group) {
        threads.push_back(std::thread([=,&f](){std::for_each(it, std::min(it+group, last), f);}));
    }
    std::for_each(it, last, f); // use calling thread while we wait for the others
    std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}

第二个 lambda 为什么需要通过引用进行捕获?它并没有引用任何局部变量。 - Puppy
此外,类型转换并不重要。如果有什么影响的话,现在你正在打破比“int”更大的差异的迭代器。 - Puppy
@DeadMG,试试看。你需要一个转换来避免模板参数推导,但我同意int可能太小了。 - Johan Lundberg
认真点,就...停止吧。你还没有改变任何有意义的东西,也不知道问题出在哪里。去找个地方重现它,修复它,然后再发一个答案。 - Puppy
我已经做了...有了你的修复,这很好。 - Johan Lundberg

0
你将 std::min(it+group, last) 传递给了 std::for_each,但总是在结尾添加 group。这意味着如果从 itlast 的距离不是 group 的倍数,那么你将把 it 移过 last,这是未定义行为。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接