高效的循环多线程

3

我有一个函数,它接受一个整数并在一段时间内运行(根据函数输入的不同,时间为5-500秒)。

void foo(int parameter) { /*执行一些操作*/ }

计算结果存储在由该函数创建的文本文件中。

resultsFor{parameter}.txt

然后,我有一个for循环,它对给定的一组整数评估此函数。

for (int i = 0; i < 100; i++)
   foo(i);

现在我想使用线程优化此循环的计算时间。最好使用标准库。
#include <thread>

首先,我希望创建一定数量的线程,这些线程数量应与当前机器的核心数相等。

numberOfThreads = std::thread::hardware_concurrency();

然后我想将这些线程分配到循环的前numberOfThreads个整数来计算我的函数,如果一个线程完成了它的工作,那么下一步循环将被分配给它。我该怎么做呢?
问题在于我不能通过将for循环分成numberOfThreads部分来重新分配工作,因为无法确定给定整数的计算需要多长时间。将循环平均分配到线程中可能导致一个线程完成了其工作,而另一个线程甚至还没有完成十分之一的工作。

1
所以..你想要一个静态工作队列?即在启动线程之前预加载队列,然后每个线程从队列中拉取下一个工作细节并运行它,在完成时返回获取更多?老实说,我更喜欢将工作分成几个子集并将其发送到每个线程。除非依赖于i的运行时间差异非常大,否则几个线程不工作的时间应该是最小的,并且好处简直无法言喻:在锁定队列上没有争用。 - WhozCraig
这是我第一次看到“静态工作队列”这个术语,但你的描述似乎恰好符合我的问题。而且分割工作会使它变得非常缓慢,因为有些配置比其他配置需要更多的时间,也许我低估了这一点。 - Adam Sikora
1
然后它变得有点复杂,但如果线程在其工作运行期间非常繁忙,那么队列闩时间很可能会很小。因此,工作队列绝对可以解决这个问题。 - WhozCraig
1
那么这可能比你想象的要简单。我会尝试为您发布一些内容。这不会变成一个关于多线程的课程,而是一个原子变量的简单集成,以及如何使用它来分配工作应该相当直接。 - WhozCraig
1
好的,已发布。希望能对你有所帮助。我花了一分钟的时间思考如何旋转合法的CPU周期来展示并发性。 - WhozCraig
显示剩余5条评论
1个回答

4
以下是如何使用原子性完成此操作的示例,这是一种适用于需要处理静态数据集的有效方法。对于此示例,我正在构建100个具有100,000个元素的向量,所有向量都被随机打乱,并将它们发送到线程进行排序。
注意:我必须在这里引入一个互斥锁,以使stdout输出正确地不交错。实际上,如果您不像这样进行交互式输出,则不需要它。特别注意输出线程ID,您将看到每个线程获得多个工作项,所有工作项都相当平衡(因为工作负载是均衡的)。
希望能够帮助您。
#include <iostream>
#include <algorithm>
#include <iterator>
#include <vector>
#include <thread>
#include <atomic>
#include <random>
#include <mutex>

// our work item
typedef std::vector<std::vector<int>> WorkQueue;

// holds the atomic counter for our vector
std::atomic<std::size_t> cnt = ATOMIC_VAR_INIT(0);
std::mutex mtx;

void thread_proc(WorkQueue& wq)
{
    int count = 0;
    std::size_t n = std::atomic_fetch_add(&cnt, std::size_t(1));
    while (n < wq.size())
    {
        //
        // TODO: do something with the n-th item in the vector
        //
        std::unique_lock<std::mutex> lck(mtx);
        std::cout << "Thread " << std::this_thread::get_id() << ": item " << n << std::endl;
        lck.unlock();

        std::sort(wq[n].begin(), wq[n].end());
        ++count;

        // increment by one and move to next item
        n = std::atomic_fetch_add(&cnt, std::size_t(1));
    }

    std::unique_lock<std::mutex> lck(mtx);
    std::cout << "Thread " << std::this_thread::get_id() << " processed " << count << " items." << std::endl;
    lck.unlock();
}

int main(int argc, char *argv[])
{
    std::random_device rd;
    std::default_random_engine rng(rd());

    // build a sizable work queue.
    WorkQueue wq;

    // make a 100000 element sequence of incrementing numbers
    std::vector<int> item;
    item.reserve(100000);
    std::generate_n(std::back_inserter(item), 100000,
                    [](){ static int i=0; return ++i;});

    // add 100 vectors of 1000000 elements each
    wq.reserve(100);
    for (unsigned i=0; i<100; ++i)
    {
        // make a unique shuffle for this insertion
        std::shuffle(item.begin(), item.end(), rng);

        // append a random length of values (1000..size);
        long len = 1000 + (rng() % (item.size() - 1000));
        std::cout << "Appending vector of " << len << " items. " << std::endl;
        wq.emplace_back(item.begin(), std::next(item.begin(), len));
    }

    // ok. get the number of threads we're going to be using.
    unsigned n_threads = std::thread::hardware_concurrency();
    std::vector<std::thread> threads;
    for (unsigned i=0; i<n_threads;++i)
        threads.push_back(std::thread(thread_proc, std::ref(wq)));

    // now join the threads
    for (auto& thrd : threads)
        thrd.join();

    return EXIT_SUCCESS;
}

输出(显然取决于系统)

Appending vector of 67742 items. 
Appending vector of 87839 items. 
Appending vector of 13960 items. 
Appending vector of 64161 items. 
Appending vector of 56993 items. 
Appending vector of 30626 items. 
Appending vector of 24970 items. 
Appending vector of 96336 items. 
Appending vector of 2697 items. 
Appending vector of 70087 items. 
Appending vector of 35234 items. 
Appending vector of 82828 items. 
Appending vector of 15808 items. 
Appending vector of 38646 items. 
Appending vector of 55819 items. 
Appending vector of 99380 items. 
Appending vector of 33486 items. 
Appending vector of 9742 items. 
Appending vector of 50267 items. 
Appending vector of 4421 items. 
Appending vector of 33577 items. 
Appending vector of 55888 items. 
Appending vector of 61601 items. 
Appending vector of 19894 items. 
Appending vector of 90217 items. 
Appending vector of 80498 items. 
Appending vector of 40101 items. 
Appending vector of 50601 items. 
Appending vector of 71679 items. 
Appending vector of 65707 items. 
Appending vector of 8671 items. 
Appending vector of 67409 items. 
Appending vector of 47066 items. 
Appending vector of 53989 items. 
Appending vector of 88724 items. 
Appending vector of 82923 items. 
Appending vector of 38571 items. 
Appending vector of 58705 items. 
Appending vector of 13149 items. 
Appending vector of 97816 items. 
Appending vector of 59586 items. 
Appending vector of 20798 items. 
Appending vector of 45906 items. 
Appending vector of 96078 items. 
Appending vector of 24951 items. 
Appending vector of 19954 items. 
Appending vector of 52154 items. 
Appending vector of 49653 items. 
Appending vector of 14830 items. 
Appending vector of 45169 items. 
Appending vector of 96009 items. 
Appending vector of 15941 items. 
Appending vector of 37832 items. 
Appending vector of 55441 items. 
Appending vector of 65057 items. 
Appending vector of 69484 items. 
Appending vector of 27425 items. 
Appending vector of 11579 items. 
Appending vector of 43795 items. 
Appending vector of 71688 items. 
Appending vector of 17214 items. 
Appending vector of 69687 items. 
Appending vector of 18897 items. 
Appending vector of 96105 items. 
Appending vector of 62040 items. 
Appending vector of 26292 items. 
Appending vector of 34464 items. 
Appending vector of 49473 items. 
Appending vector of 83357 items. 
Appending vector of 50406 items. 
Appending vector of 48313 items. 
Appending vector of 22030 items. 
Appending vector of 2352 items. 
Appending vector of 82989 items. 
Appending vector of 42358 items. 
Appending vector of 59134 items. 
Appending vector of 46823 items. 
Appending vector of 36615 items. 
Appending vector of 40752 items. 
Appending vector of 49963 items. 
Appending vector of 73666 items. 
Appending vector of 29816 items. 
Appending vector of 4112 items. 
Appending vector of 31045 items. 
Appending vector of 8810 items. 
Appending vector of 43021 items. 
Appending vector of 83699 items. 
Appending vector of 5551 items. 
Appending vector of 85914 items. 
Appending vector of 11835 items. 
Appending vector of 82329 items. 
Appending vector of 13567 items. 
Appending vector of 74271 items. 
Appending vector of 49083 items. 
Appending vector of 42803 items. 
Appending vector of 92871 items. 
Appending vector of 17562 items. 
Appending vector of 28686 items. 
Appending vector of 61544 items. 
Appending vector of 13375 items. 
Thread 0x101bc3000: item 0
Thread 0x101c46000: item 1
Thread 0x101cc9000: item 2
Thread 0x101e81000: item 3
Thread 0x101cc9000: item 4
Thread 0x101bc3000: item 5
Thread 0x101e81000: item 6
Thread 0x101e81000: item 7
Thread 0x101bc3000: item 8
Thread 0x101c46000: item 9
Thread 0x101bc3000: item 10
Thread 0x101bc3000: item 11
Thread 0x101e81000: item 12
Thread 0x101e81000: item 13
Thread 0x101bc3000: item 14
Thread 0x101cc9000: item 15
Thread 0x101c46000: item 16
Thread 0x101bc3000: item 17
Thread 0x101bc3000: item 18
Thread 0x101c46000: item 19
Thread 0x101c46000: item 20
Thread 0x101bc3000: item 21
Thread 0x101cc9000: item 22
Thread 0x101bc3000: item 23
Thread 0x101bc3000: item 24
Thread 0x101c46000: item 25
Thread 0x101e81000: item 26
Thread 0x101e81000: item 27
Thread 0x101bc3000: item 28
Thread 0x101c46000: item 29
Thread 0x101e81000: item 30
Thread 0x101e81000: item 31
Thread 0x101cc9000: item 32
Thread 0x101bc3000: item 33
Thread 0x101cc9000: item 34
Thread 0x101e81000: item 35
Thread 0x101c46000: item 36
Thread 0x101bc3000: item 37
Thread 0x101cc9000: item 38
Thread 0x101cc9000: item 39
Thread 0x101c46000: item 40
Thread 0x101bc3000: item 41
Thread 0x101bc3000: item 42
Thread 0x101e81000: item 43
Thread 0x101cc9000: item 44
Thread 0x101bc3000: item 45
Thread 0x101bc3000: item 46
Thread 0x101cc9000: item 47
Thread 0x101e81000: item 48
Thread 0x101c46000: item 49
Thread 0x101e81000: item 50
Thread 0x101cc9000: item 51
Thread 0x101bc3000: item 52
Thread 0x101c46000: item 53
Thread 0x101cc9000: item 54
Thread 0x101bc3000: item 55
Thread 0x101c46000: item 56
Thread 0x101e81000: item 57
Thread 0x101c46000: item 58
Thread 0x101cc9000: item 59
Thread 0x101e81000: item 60
Thread 0x101bc3000: item 61
Thread 0x101c46000: item 62
Thread 0x101e81000: item 63
Thread 0x101c46000: item 64
Thread 0x101cc9000: item 65
Thread 0x101cc9000: item 66
Thread 0x101bc3000: item 67
Thread 0x101c46000: item 68
Thread 0x101cc9000: item 69
Thread 0x101e81000: item 70
Thread 0x101bc3000: item 71
Thread 0x101cc9000: item 72
Thread 0x101cc9000: item 73
Thread 0x101bc3000: item 74
Thread 0x101e81000: item 75
Thread 0x101c46000: item 76
Thread 0x101bc3000: item 77
Thread 0x101cc9000: item 78
Thread 0x101e81000: item 79
Thread 0x101c46000: item 80
Thread 0x101bc3000: item 81
Thread 0x101cc9000: item 82
Thread 0x101cc9000: item 83
Thread 0x101e81000: item 84
Thread 0x101e81000: item 85
Thread 0x101cc9000: item 86
Thread 0x101bc3000: item 87
Thread 0x101bc3000: item 88
Thread 0x101e81000: item 89
Thread 0x101e81000: item 90
Thread 0x101c46000: item 91
Thread 0x101c46000: item 92
Thread 0x101cc9000: item 93
Thread 0x101bc3000: item 94
Thread 0x101e81000: item 95
Thread 0x101c46000: item 96
Thread 0x101cc9000: item 97
Thread 0x101bc3000: item 98
Thread 0x101c46000: item 99
Thread 0x101cc9000 processed 24 items.
Thread 0x101c46000 processed 22 items.
Thread 0x101bc3000 processed 30 items.
Thread 0x101e81000 processed 24 items.

谢谢,这似乎是我需要的完美解决方案。不过我还有一个问题。写入磁盘是否线程安全,或者我需要使用互斥锁来实现? - Adam Sikora
1
@AdamSikora 取决于你在哪里编写。如果像你说的那样,所有这些线程都在写入自己的文件,那么你可能不用担心任何问题。 - WhozCraig
1
n_threads = std::thread::hardware_concurrency(); 这句话不应该像这样吗 n_threads = std::thread::hardware_concurrency() == 0? 2: std::thread::hardware_concurrency(),因为 std::thread::hardware_concurrency() 如果信息不可用,也会返回0,从而使代码在其他机器上无法运行。 - Alexandru Barbarosie

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接