并行计算 - 洗牌

5
我想要并行地对一个数组进行洗牌。我发现使用类似于双调排序算法的算法,但是采用随机(50/50)重新排序的方式可以得到均匀分布的结果,但前提是数组的长度必须是2的幂次方。我考虑过Yates Fisher洗牌算法,但是我无法看出如何将其并行化以避免O(N)的计算量。
有什么建议吗?
谢谢!
2个回答

2

这篇最近的论文在这里有很好的清晰阐述,特别是Shun等人2015年的参考文献值得一读。

但基本上,您可以使用与sort -R相同的方法来进行洗牌:通过为每行分配随机键值并按该键排序来进行洗牌。而且有许多方法可以进行良好的并行分布式排序。

这是一个基本的Python + MPI版本,使用奇偶排序;如果P是处理器数量,则需要进行P次通信步骤。您可以做得比这更好,但这很容易理解; 在这个问题中进行了讨论。

from __future__ import print_function
import sys
import random
from mpi4py import MPI

comm = MPI.COMM_WORLD

def exchange(localdata, sendrank, recvrank):
    """
    Perform a merge-exchange with a neighbour;
    sendrank sends local data to recvrank,
    which merge-sorts it, and then sends lower
    data back to the lower-ranked process and
    keeps upper data
    """
    rank = comm.Get_rank()
    assert rank == sendrank or rank == recvrank
    assert sendrank < recvrank

    if rank == sendrank:
        comm.send(localdata, dest=recvrank)
        newdata = comm.recv(source=recvrank)
    else:
        bothdata = list(localdata)
        otherdata = comm.recv(source=sendrank)
        bothdata = bothdata + otherdata
        bothdata.sort()
        comm.send(bothdata[:len(otherdata)], dest=sendrank)
        newdata = bothdata[len(otherdata):]
    return newdata

def print_by_rank(data, rank, nprocs):
    """ crudely attempt to print data coherently """
    for proc in range(nprocs):
        if proc == rank:
            print(str(rank)+": "+str(data))
            comm.barrier()
    return

def odd_even_sort(data):
    rank = comm.Get_rank()
    nprocs = comm.Get_size()
    data.sort()
    for step in range(1, nprocs+1):
        if ((rank + step) % 2) == 0:
            if rank < nprocs - 1:
                data = exchange(data, rank, rank+1)
        elif rank > 0:
            data = exchange(data, rank-1, rank)
    return data

def main():
    # everyone get their data
    rank = comm.Get_rank()
    nprocs = comm.Get_size()
    n_per_proc = 5
    data = list(range(n_per_proc*rank, n_per_proc*(rank+1)))

    if rank == 0:
        print("Original:")
    print_by_rank(data, rank, nprocs)

    # tag your data with random values
    data = [(random.random(), item) for item in data]

    # now sort it by these random tags
    data = odd_even_sort(data)

    if rank == 0:
        print("Shuffled:")
    print_by_rank([x for _, x in data], rank, nprocs)

    return 0


if __name__ == "__main__":
    sys.exit(main())

运行给出:

$ mpirun -np 5 python mergesort_shuffle.py
Original:
0: [0, 1, 2, 3, 4]
1: [5, 6, 7, 8, 9]
2: [10, 11, 12, 13, 14]
3: [15, 16, 17, 18, 19]
4: [20, 21, 22, 23, 24]

Shuffled:
0: [19, 17, 4, 20, 9]
1: [23, 12, 3, 2, 8]
2: [14, 6, 13, 15, 1]
3: [11, 0, 22, 16, 18]
4: [5, 10, 21, 7, 24]

0
这是使用OpenMP的C++多线程洗牌。它使用了一个较慢的STL随机数生成器 - 你可以通过将其替换为xorshift+来提高性能。对于T中的原始类型(例如,在64位CPU上的int64_t),你可以用std::compare_exchange_weak()调用来替换锁定操作。
#include <cstdint>
#include <cstdlib>
#include <random>
#include <atomic>
#include <thread>
#include <algorithm>
#include <omp.h>

namespace detail {

template <typename F>
struct FinalAction {
  FinalAction(F&& f) : clean_{std::move(f)} {}
  ~FinalAction() {
    if (enabled_) clean_();
  }
  void Disable() { enabled_ = false; };

 private:
  F clean_;
  bool enabled_{true};
};

}  // namespace detail

template <typename F>
detail::FinalAction<F> Finally(F&& f) {
  return detail::FinalAction<F>(std::move(f));
}

template <typename T>
void ParallelShuffle(T* data, const size_t count) {
  const uint32_t nThreads = std::thread::hardware_concurrency();

  std::atomic_flag* syncs = static_cast<std::atomic_flag*>(malloc(count * sizeof(std::atomic_flag)));
  auto clean_syncs = Finally([&]() { free(syncs); });
#pragma omp parallel for num_threads(nThreads)
  for (size_t i = 0; i < count; i++) {
    new (syncs + i) std::atomic_flag(ATOMIC_FLAG_INIT);
  }

  const size_t nPerThread = (count + nThreads - 1) / nThreads;
#pragma omp parallel for num_threads(nThreads)
  for (size_t i = 0; i < nThreads; i++) {
    std::random_device rd;
    std::mt19937_64 rng(rd());
    std::uniform_int_distribution<size_t> dist(0, count - 1);
    const size_t iFirst = nPerThread * i;
    const size_t iLimit = std::min(nPerThread + iFirst, count);
    if (iLimit <= iFirst) {
      continue;
    }
    for (size_t j = iFirst; j < iLimit; j++) {
      const size_t fellow = dist(rng);
      if (fellow == j) {
        continue;
      }
      const size_t sync1 = std::min(j, fellow);
      const size_t sync2 = std::max(j, fellow);
      while (syncs[sync1].test_and_set(std::memory_order_acq_rel)) {
        std::this_thread::yield();
      }
      while (syncs[sync2].test_and_set(std::memory_order_acq_rel)) {
        std::this_thread::yield();
      }
      std::swap(data[sync1], data[sync2]);
      syncs[sync2].clear(std::memory_order_release);
      syncs[sync1].clear(std::memory_order_release);
    }
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接