同步push_back和std::thread

4

我的代码

void build(std::vector<RKD <DivisionSpace> >& roots, ...) {
  try {
    // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
    std::lock_guard<std::mutex> lck (mtx);
    roots.push_back(RKD<DivisionSpace>(...));
  }
  catch (const std::bad_alloc&) {
    std::cout << "[exception caught when constructing tree]\n";
    return;
  }
}

现在,实际工作应该按顺序完成,而不是并行完成。 RKD的构造函数可以与其他RKD的构造函数并行运行。然而,在将对象推回std::Vector时,这是一个关键部分,对吗?
我要建立的对象数量是已知的。在实践中,它将在[2,16]范围内。理论上它可以是任何正数。
另外,我对它们被插入容器的顺序不感兴趣。
所以我可以做类似这样的事情:
RKD tree = RKD(...);
mutex_lock(...);
roots.push_back(tree);

但这意味着需要复制,是吗?

我该怎么做才能让我的代码并行执行?


我决定使用锁(而不仅仅是互斥锁)因为这个答案。


1
我并没有真正回答这个问题。真正的并行只有在无锁容器中才是可行的(而且只有在您认真确认这个锁是一个问题时,值得调查,因为这不是一个琐碎的主题,很容易出错)。我只是提到了这样一个事实,如果您设置对象以便支持移动操作(顺便说一下,请确保您的移动构造函数是noexcept),则可以避免复制构造函数的执行。 - WhozCraig
3
如果你从一开始就知道需要多少个RKD,那么RKD很容易构建,并且其构建没有任何副作用。你可以提前分配向量,然后运行就地构建或者将其复制到指定的内存位置——并行访问向量的元素不需要锁定,只有在调整大小时才需要锁定(或者如果你从多个线程写入同一内存)。 - Tomasz Lewowski
我认为@TomaszLewowski的建议是可行的。如果您的对象不满足这些要求,您可以考虑使用std::vector<std::unique_ptr<RKD>>。但是,如果您是RKD的作者,则最好给它移动语义。 - 5gon12eder
1
显而易见的是,如果构建RKD相对较快,并且它们的数量相对较少,则多线程解决方案将更慢,因为需要创建/加入线程和锁定(如果您决定需要锁定)。 如果在您的情况下使用多线程是有意义的,并且sizeof(RKD)相对于缓存行很小,则考虑让每个线程构造向量的连续元素,而不是交错它们以避免伪共享。 - amdn
1
@G.Samaras 是的,5gon12eder的回答特别展示了一种交错的解决方案,并指出由于缓存失效(而没有使用虚假共享这个术语),它是次优的,然后展示了一个顺序分区的解决方案...我在第一次浏览答案时错过了这一点。你的代码是否会从并行构建中受益仍需由你来回答。它会更快吗?这重要吗?额外的复杂性是否值得?只有你能回答这些问题。如果没有其他好处,这是一个很好的学习机会,这一点是不容忽视的。 - amdn
显示剩余8条评论
2个回答

8
Tomasz Lewowski在评论中提出的建议,我进行了扩展,这个建议非常简单,基于以下观察:对于std::vectorpush_back可能需要重新分配后备存储器并复制(或者最好是移动)元素。这构成了一个需要同步的关键部分。
对于下面的示例,请假设我们想要一个填充有前12个质数的向量,但是我们不关心它们的顺序。(我在这里只是硬编码了数字,但是假设它们是通过某些昂贵的计算在并行环境下执行是有意义的。)下面的情况存在危险的竞争条件。
std::vector<int> numbers {};  // an empty vector

// thread A             // thread B             // thread C

numbers.push_back( 2);  numbers.push_back(11);  numbers.push_back(23);
numbers.push_back( 3);  numbers.push_back(13);  numbers.push_back(27);
numbers.push_back( 5);  numbers.push_back(17);  numbers.push_back(29);
numbers.push_back( 7);  numbers.push_back(19);  numbers.push_back(31);

还有一个问题涉及到push_back。如果两个线程同时调用它,它们都会尝试在相同的索引处构造一个对象,可能会产生灾难性后果。因此,在分叉线程之前使用reserve(n)并不能解决这个问题。
然而,由于您事先知道元素的数量,可以简单地将它们分配std::vector内的特定位置,而不改变其大小。如果您不改变大小,就没有关键部分。因此,在以下情况下不存在竞争。
std::vector<int> numbers(12);  // 12 elements initialized with 0

// thread A          // thread B          // thread C

numbers[ 0] =  2;    numbers[ 1] =  3;    numbers[ 2] =  5;
numbers[ 3] =  7;    numbers[ 4] = 11;    numbers[ 5] = 13;
numbers[ 6] = 17;    numbers[ 7] = 19;    numbers[ 8] = 23;
numbers[ 9] = 29;    numbers[10] = 31;    numbers[11] = 37;

当然,如果两个线程都试图写入相同的索引,则竞争将再次出现。幸运的是,在实践中保护它并不困难。如果您的向量有n个元素,并且您有p个线程,则线程i仅写入元素[i n / p, (i + 1) n / p)。请注意,这比使线程i仅在索引j处写入元素更可取,仅当j mod p = i时,因为它会导致较少的缓存失效。因此,上面示例中的访问模式是次优的,最好像这样。
std::vector<int> numbers(12);  // 12 elements initialized with 0

// thread A          // thread B          // thread C

numbers[ 0] =  2;    numbers[ 4] = 11;    numbers[ 8] = 23;
numbers[ 1] =  3;    numbers[ 5] = 13;    numbers[ 9] = 29;
numbers[ 2] =  5;    numbers[ 6] = 17;    numbers[10] = 31;
numbers[ 3] =  7;    numbers[ 7] = 19;    numbers[11] = 37;

到目前为止一切都很好。但是如果你没有一个std::vector<int>而是一个std::vector<Foo>呢?如果Foo没有默认构造函数,那么...
std::vector<Foo> numbers(10);

将是无效的。即使有一个,默认构造许多昂贵的对象并很快重新分配它们而从未检索值是不合理的。

当然,大多数设计良好的类应该有一个非常便宜的默认构造函数。例如,std::string 默认构造为一个不需要内存分配的空字符串。一个好的实现将把默认构造字符串的成本降至最低。

std::memset(this, 0, sizeof(std::string));

如果编译器足够聪明,能够理解我们正在分配和初始化整个 std::vector<std::string>(n),它可能会进一步优化为单个调用。

std::calloc(n, sizeof(std::string));

如果有机会让Foo可以廉价地进行默认构造和赋值,那么您就完成了。然而,如果这很困难,您可以通过将其移动到堆上来避免问题。智能指针可以廉价地进行默认构造,因此。
std::vector<std::unique_ptr<Foo>> foos(n);

最终会简化为a。

std::calloc(n, sizeof(std::unique_ptr<Foo>));

当然,这种便利是以为每个元素进行动态内存分配的代价为代价的,而无需您对Foo进行任何操作。

std::vector<std::unique_ptr<Foo>> foos(n);

// thread A                    // thread B                           // thread C

foos[0].reset(new Foo {...});  foos[n / 3 + 0].reset(new Foo {...});  foos[2 * n / 3 + 0].reset(new Foo {...});
foos[1].reset(new Foo {...});  foos[n / 3 + 1].reset(new Foo {...});  foos[2 * n / 3 + 1].reset(new Foo {...});
foos[2].reset(new Foo {...});  foos[n / 3 + 2].reset(new Foo {...});  foos[2 * n / 3 + 2].reset(new Foo {...});
...                            ...                                    ...

这可能比你想象中的不那么糟糕,因为虽然动态内存分配并非免费,但是std :: unique_ptrsizeof非常小,所以如果sizeof(Foo)很大,则可以获得更紧凑且更快速迭代的向量奖励。 当然,这完全取决于你打算如何使用数据。
如果您不知道预先准确的元素数量或担心会弄乱索引,还有另一种方法:让每个线程填充自己的向量,然后在结尾合并它们。继续上面质数的例子,我们得到了以下结果。
std::vector<int> numbersA {};  // private store for thread A
std::vector<int> numbersB {};  // private store for thread B
std::vector<int> numbersC {};  // private store for thread C

// thread A              // thread B              // thread C

numbersA.push_back( 2);  numbersB.push_back(11);  numbersC.push_back(23);
numbersA.push_back( 3);  numbersB.push_back(13);  numbersC.push_back(27);
numbersA.push_back( 5);  numbersB.push_back(17);  numbersC.push_back(29);
numbersA.push_back( 7);  numbersB.push_back(21);  numbersC.push_back(31);

// Back on the main thread after A, B and C are joined:

std::vector<int> numbers(
    numbersA.size() + numbersB.size() + numbersC.size());
auto pos = numbers.begin();
pos = std::move(numbersA.begin(), numbersA.end(), pos);
pos = std::move(numbersB.begin(), numbersB.end(), pos);
pos = std::move(numbersC.begin(), numbersC.end(), pos);
assert(pos == numbers.end());

// Now dispose of numbersA, numbersB and numbersC as soon as possible
// in order to release their no longer needed memory.

(上述代码中使用的std::move是来自算法库的移动语义。)

这种方法具有最理想的内存访问模式,因为numbersAnumbersBnumbersC都写入完全独立分配的内存。当然,我们还需要做额外的顺序工作来连接中间结果。请注意,效率在很大程度上依赖于移动分配元素的成本与查找/创建元素的成本相比可以忽略不计。至少按照上述方式编写的代码还假定您的类型具有廉价的默认构造函数。当然,如果对于您的类型不是这种情况,您可以再次使用智能指针。

希望这些内容为您优化问题提供了足够的思路。

如果您以前从未使用过智能指针,请查看“C++中的RAII和智能指针并查看标准库的动态内存管理库。当然,上面展示的技术也适用于std::vector<Foo *>,但是在现代C ++中,我们不再使用类似这样的资源拥有原始指针。

很好的答案,我一定会点赞(已达上限)。相比你的回答,你认为sehe的回答怎么样? - gsamaras
嗯,这更低级一些,所以你更有可能自己踩到自己的脚,但它可能会给你一些额外的性能。无论如何,差异很小,所以你可以尝试更简单的std::vector方法,如果它不能满足你的性能期望,再切换到原始内存。这样做不会增加太多额外的工作量,因为线程插入哪个位置的逻辑不会改变。一定要测量性能。 - 5gon12eder
1
这是你的决定,p=n将是一个特殊情况。如所写,我只是使用n作为您想要创建的对象数量,p作为您想要用来执行此操作的线程数量。因此,请选择p,范围从1到n。这是一个工程决策,您必须做出决定。如果有疑问,可以使用p=min(n,P),其中P是您拥有的CPU数量(std::thread::hardware_concurrency),或者让用户通过选项进行选择。 - 5gon12eder
现在索引清晰了。我喜欢新增的方法,肯定会将它们两个进行比较。然而,那个方法也应该使用unique_ptr以避免初始化,对吗?如果是这样,那么最终向量将具有相同的唯一指针作为数据,对吗?那么我们应该怎么做呢?预留足够的空间,然后从子向量复制,还是按照你在答案中的方式进行操作? - gsamaras
我接受了你的答案,但另一个答案也不错。这里我提出了一个后续问题:http://stackoverflow.com/questions/27973606/how-many-threads-should-i-create。你可能会感兴趣。 - gsamaras
显示剩余5条评论

3
问题似乎在于您的构造函数做了很多工作,这违反了关于构造和容器插入的各种库约定。只需通过将插入与创建分离来修复它。
下面的代码与@5gon12eder建议的代码非常相似,除了它不会“强制”您更改对象位置。
在我的小演示中。
  • we use a raw region of memory that's trully uninitialized (this is not possible with vector, where insertion implies initialization), so instead of the "canonical"

    std::array<RKD, 500> rkd_buffer; // OR
    std::vector<RKD> rkd_buffer(500); // OR even
    std::unique_ptr<RKD[]> rkd_buffer(new RKD[500]);
    

    we're gonna use a custom combination:

    std::unique_ptr<RKD[N], decltype(&::free)> rkd_buffer(
        static_cast<RKD(*)[N]>(::malloc(sizeof(RKD) * N)),
        ::free
    );
    
  • we then create a few threads (5 in the sample) to construct all the elements. The items are just constructed in-place and their respective destructors will be invoked at program exit

  • it is, therefore, crucial that all items have been fully initialized before rkd_buffer goes out of scope (the join ensures this here).
  • the threads could synchronize by different means: constructions could e.g. be dispatched via a work queue to a thread pool, where either condition variables, promises, thread barriers (from boost) or even just atomic shared counters could be used for the coordination.

    All these choices are in essence unrelated to the task of getting construction to run in parallel, so I'll leave that to your imagination (or other SO answers)

Live On Coliru

struct RKD {
    RKD() { this_thread::sleep_for(chrono::milliseconds(rand() % 100)); } // expensive
};

int main() {
    static const int N         = 500;
    static const int ChunkSize = 100;
    std::unique_ptr<RKD[N], decltype(&::free)> rkd_buffer(static_cast<RKD(*)[N]>(::malloc(sizeof(RKD) * N)), ::free);

    vector<thread> group;
    for (int chunk = 0; chunk < N/ChunkSize; chunk += ChunkSize)
        group.emplace_back([&] { 
            for (int i=chunk * ChunkSize; i<(ChunkSize + chunk*ChunkSize); ++i)
                new (rkd_buffer.get() + i) RKD;
        });

    for (auto& t:group) if (t.joinable()) t.join();

    // we are responsible for destructing, since we also took responsibility for construction
    for (RKD& v : *rkd_buffer)
        v.~RKD();
}

您可以看到,有5个线程正在处理500个构建任务。每个构建任务平均需要约50毫秒的时间,因此总共需要的时间应该是100*50毫秒,即大约5秒钟。实际上,情况确实如此:
real    0m5.193s
user    0m0.004s
sys 0m0.000s

你的回答(感谢您抽出时间来撰写,我会在能够时点赞)是否与amdn在我的问题下发布的评论(最后一条评论)有任何关联?此外,您能否解释一下我们在std::unique_ptr<RKD[N], dec...中做了什么?我还看到了malloc和free,而不是C++特性(实际上在底层调用C++特性..)。我可以直接复制代码,但我也想学习。我的意思是,因为这个问题,我学到了移动语义、交换-复制惯用语和许多其他东西!我能贪心地再学一两个吗? :D - gsamaras
它创建了一个具有独特所有权语义和自定义删除器的智能指针。这在其他答案中已经涵盖了。 - sehe
https://dev59.com/WGsy5IYBdhLWcg3w2Bo6, http://codereview.stackexchange.com/questions/4679/shared-ptr-and-file-for-wrapping-cstdio-update-also-dlfcn-h, https://dev59.com/OWIk5IYBdhLWcg3wr_9o, 哦,好吧。有很多选项可以通过谷歌获得。还有一个[c++-faq]标签(我应该在什么时候使用哪种智能指针? - sehe
谢谢提供的链接,我会研究它们。你所说的笨重是指我的类缺少移动构造函数吗? - gsamaras
我没有接受你的答案,但我已经为它点了赞。另一个回答更加详细易懂,适合初学者理解。不过,在你的评论中提到了这篇文章中最聪明的一点 ;) 。我在这里提出了一个后续问题:http://stackoverflow.com/questions/27973606/how-many-threads-should-i-create。你可能会觉得很有趣。 - gsamaras
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接