Repa数组上的并行mapM

90

最近在使用 Gibbs采样 进行工作时,我发现 RVar 提供了接近随机数生成理想接口的优秀支持。可惜地是,由于无法在映射中使用单调操作,我无法利用Repa。

虽然一般来说,单态映射不能被并行化,但在我看来,RVar 可能至少是可以安全并行化效果的单子(至少在原则上如此;我对 RVar 的内部运作不是非常熟悉)。也就是说,我想要写出像以下这样的代码:

drawClass :: Sample -> RVar Class
drawClass = ...

drawClasses :: Array U DIM1 Sample -> RVar (Array U DIM1 Class)
drawClasses samples = A.mapM drawClass samples

这里的A.mapM大致会是这样的:

mapM :: ParallelMonad m => (a -> m b) -> Array r sh a -> m (Array r sh b)

尽管如何工作取决于RVar和其底层RandomSource的实现,但原则上每个线程启动时会生成一个新的随机种子并像往常一样继续执行。

直观地看,这个想法可能可以推广到其他一些monad上。

那么,我的问题是:是否可以构造一个ParallelMonad类的monad,使效果可以安全地并行化(至少包含RVar)?

它会是什么样子?有哪些其他monad会属于这个类别?其他人是否考虑过在Repa中如何实现这一点?

最后,如果这种并行monad操作的概念无法推广,是否有人看到任何不错的方法可以使其在特定情况下(例如需要并行性的RVar)起作用?放弃RVar以换取并行性是一个非常困难的权衡。


1
我猜关键点在于“为每个线程生成一个新的随机种子”——这一步应该如何工作,所有线程返回后种子应该如何合并? - Daniel Wagner
1
RVar接口几乎肯定需要一些添加来适应使用给定种子生成新的生成器。不可否认,这个机制还不清楚,似乎相当于“RandomSource”特定。我天真地尝试绘制种子,例如在(“mwc-random”)的情况下绘制元素向量,并为第一个工作人员添加1,为第二个工作人员添加2等等。如果您需要加密质量的熵,则非常不足;如果您只需要随机行走,则希望可以正常运行。 - bgamari
我已经能够使用 fillChunkedIOP 做出类似于您所要求的功能。 - kosmikus
3
我在尝试解决一个类似的问题时遇到了这个问题。我正在同时使用MonadRandom和System.Random进行单子随机计算。这只有在使用System.Random的"split"函数时才可能实现。虽然它由于"split"的特性而产生不同的结果(这是缺点),但确实可行。但是,我试图将其扩展到Repa数组上,并没有什么好运气。你有在这方面取得进展吗,还是这是一条死路? - Tom Savage
1
在编程中,“无顺序和计算之间的依赖关系”的Monad听起来更像是Applicative。 - John Tyree
1
我有些犹豫。正如Tom Savage所指出的,split提供了必要的基础,但请注意有关split实现方式的注释:“--没有统计基础!”。我倾向于认为,任何一种PRNG分裂方法都会在其分支之间留下可利用的相关性,但我没有统计背景来证明这一点。关于这个普遍问题,我不确定... - isturdy
2个回答

8
自从提出这个问题已经7年了,似乎还没有人提出一个好的解决方案。Repa没有像mapM/traverse这样的函数,即使有一个可以不并行运行的函数也不行。此外,考虑到最近几年的进展,似乎也不太可能会出现这种情况。
由于Haskell中许多数组库的陈旧状态以及我对它们的功能集的总体不满意,我已经投入了几年的时间研究了一个数组库massiv,它借鉴了一些来自Repa的概念,但将其提升到了完全不同的水平。介绍就到这里。
在今天之前,massiv中有三个类似于单子映射的函数(不包括类似函数的同义词:imapMforM等)。
  • mapM - 任意Monad中的常规映射。由于明显的原因,不可并行化,并且速度也有点慢(类似于通常的mapM在列表上的速度慢)。
  • traversePrim - 在这里,我们受限于PrimMonad,它比mapM快得多,但是这个原因对于本讨论并不重要。
  • mapIO - 正如名称所示,此函数仅限于IO(或者更确切地说是MonadUnliftIO,但这与本讨论无关)。因为我们在IO中,所以我们可以自动将数组分成与核心数相同的许多块,并使用单独的工作线程将IO操作映射到这些块中的每个元素上。与纯的fmap不同,后者也可以并行化,但由于调度的不确定性和映射操作的副作用,我们必须在这里使用IO
所以,一旦我读到这个问题,我就想到了在massiv中几乎已经解决了这个问题,但并不是那么快。随机数生成器,例如mwc-randomrandom-fu中的其他生成器不能跨越多个线程使用相同的生成器。这意味着,我唯一缺失的拼图是:"为每个线程生成一个新的随机种子并像往常一样继续进行"。换句话说,我需要两件事:
  • 一个函数,将初始化与工作线程数量相同的生成器
  • 一个抽象层,根据操作正在运行的线程,无缝地提供正确的生成器给映射函数。

这正是我所做的。

首先,我将使用特别设计的randomArrayWSinitWorkerStates函数来举例,因为它们与问题更相关,然后再转向更一般的单子映射。这是它们的类型签名:

randomArrayWS ::
     (Mutable r ix e, MonadUnliftIO m, PrimMonad m)
  => WorkerStates g -- ^ Use `initWorkerStates` to initialize you per thread generators
  -> Sz ix -- ^ Resulting size of the array
  -> (g -> m e) -- ^ Generate the value using the per thread generator.
  -> m (Array r ix e)

initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s)

对于那些不熟悉massiv的人来说, Comp参数是一个计算策略,可以使用以下重要的构造函数:

  • Seq - 顺序运行计算,不分叉线程
  • Par - 启动与能力一样多的线程,并使用这些线程来完成工作。

我将首先以mwc-random包为例,然后转向RVarT:

λ> import Data.Massiv.Array
λ> import System.Random.MWC (createSystemRandom, uniformR)
λ> import System.Random.MWC.Distributions (standard)
λ> gens <- initWorkerStates Par (\_ -> createSystemRandom)

上面我们使用系统随机性为每个线程初始化了一个单独的生成器,但是我们同样可以通过从WorkerId参数派生出唯一的线程种子来实现。该参数仅仅是工作线程的Int索引。现在,我们可以使用这些生成器创建一个具有随机值的数组:
λ> randomArrayWS gens (Sz2 2 3) standard :: IO (Array P Ix2 Double)
Array P Par (Sz (2 :. 3))
  [ [ -0.9066144845415213, 0.5264323240310042, -1.320943607597422 ]
  , [ -0.6837929005619592, -0.3041255565826211, 6.53353089112833e-2 ]
  ]

通过使用Par策略,scheduler库将在可用工作线程之间平均分配生成工作,并且每个工作线程将使用自己的生成器,从而使其线程安全。只要不并发地重复使用相同的WorkerStates,就没有任何阻止措施,否则会导致异常:
λ> randomArrayWS gens (Sz1 10) (uniformR (0, 9)) :: IO (Array P Ix1 Int)
Array P Par (Sz1 10)
  [ 3, 6, 1, 2, 1, 7, 6, 0, 8, 8 ]

现在将mwc-random放在一边,我们可以通过使用诸如generateArrayWS之类的函数来重用相同的概念,以便针对其他可能的用例:

generateArrayWS ::
     (Mutable r ix e, MonadUnliftIO m, PrimMonad m)
  => WorkerStates s
  -> Sz ix --  ^ size of new array
  -> (ix -> s -> m e) -- ^ element generating action
  -> m (Array r ix e)

mapWS

mapWS ::
     (Source r' ix a, Mutable r ix b, MonadUnliftIO m, PrimMonad m)
  => WorkerStates s
  -> (a -> s -> m b) -- ^ Mapping action
  -> Array r' ix a -- ^ Source array
  -> m (Array r ix b)

这里是使用 rvar, random-fumersenne-random-pure64 库的功能示例,我们也可以在这里使用 randomArrayWS,但为了举例说明,假设我们已经有一个包含不同 RVarT 的数组,在这种情况下我们需要使用 mapWS
λ> import Data.Massiv.Array
λ> import Control.Scheduler (WorkerId(..), initWorkerStates)
λ> import Data.IORef
λ> import System.Random.Mersenne.Pure64 as MT
λ> import Data.RVar as RVar
λ> import Data.Random as Fu
λ> rvarArray = makeArrayR D Par (Sz2 3 9) (\ (i :. j) -> Fu.uniformT i j)
λ> mtState <- initWorkerStates Par (newIORef . MT.pureMT . fromIntegral . getWorkerId)
λ> mapWS mtState RVar.runRVarT rvarArray :: IO (Array P Ix2 Int)
Array P Par (Sz (3 :. 9))
  [ [ 0, 1, 2, 2, 2, 4, 5, 0, 3 ]
  , [ 1, 1, 1, 2, 3, 2, 6, 6, 2 ]
  , [ 0, 1, 2, 3, 4, 4, 6, 7, 7 ]
  ]

重要的是要注意,尽管上面的示例中使用了Mersenne Twister的纯实现,但我们无法避免IO。这是因为非确定性调度,这意味着我们永远不知道哪个工作人员将处理数组的哪个块,因此哪个生成器将用于数组的哪个部分。好消息是,如果生成器是纯的且可分割的,例如splitmix,那么我们可以使用纯的、确定性的和可并行化的生成函数:randomArray,但这已经是一个单独的故事了。

如果您想查看一些基准测试数据,请访问以下链接:https://alexey.kuleshevi.ch/blog/2019/12/21/random-benchmarks/ - lehins

4

由于伪随机数生成器( PRNGs )的本质顺序性,这样做可能不是一个好主意。相反,您可能希望按照以下步骤转换代码:

  1. 声明一个IO函数(main或其他函数)。
  2. 读取所需数量的随机数。
  3. 将(现在纯净的)数字传递给repa函数。

能否在每个并行线程中烧入每个PRNG以创建统计独立性? - J. Abrahamson
@J.Abrahamson 是的,这是可能的。请看我的回答。 - lehins

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接