自从提出这个问题已经7年了,似乎还没有人提出一个好的解决方案。Repa没有像
mapM
/
traverse
这样的函数,即使有一个可以不并行运行的函数也不行。此外,考虑到最近几年的进展,似乎也不太可能会出现这种情况。
由于Haskell中许多数组库的陈旧状态以及我对它们的功能集的总体不满意,我已经投入了几年的时间研究了一个数组库
massiv
,它借鉴了一些来自Repa的概念,但将其提升到了完全不同的水平。介绍就到这里。
在今天之前,
massiv
中有三个类似于单子映射的函数(不包括类似函数的同义词:
imapM
、
forM
等)。
mapM
- 任意Monad
中的常规映射。由于明显的原因,不可并行化,并且速度也有点慢(类似于通常的mapM
在列表上的速度慢)。
traversePrim
- 在这里,我们受限于PrimMonad
,它比mapM
快得多,但是这个原因对于本讨论并不重要。
mapIO
- 正如名称所示,此函数仅限于IO
(或者更确切地说是MonadUnliftIO
,但这与本讨论无关)。因为我们在IO
中,所以我们可以自动将数组分成与核心数相同的许多块,并使用单独的工作线程将IO
操作映射到这些块中的每个元素上。与纯的fmap
不同,后者也可以并行化,但由于调度的不确定性和映射操作的副作用,我们必须在这里使用IO
。
所以,一旦我读到这个问题,我就想到了在
massiv
中几乎已经解决了这个问题,但并不是那么快。随机数生成器,例如
mwc-random
和
random-fu
中的其他生成器不能跨越多个线程使用相同的生成器。这意味着,我唯一缺失的拼图是:"为每个线程生成一个新的随机种子并像往常一样继续进行"。换句话说,我需要两件事:
- 一个函数,将初始化与工作线程数量相同的生成器
- 一个抽象层,根据操作正在运行的线程,无缝地提供正确的生成器给映射函数。
这正是我所做的。
首先,我将使用特别设计的randomArrayWS
和initWorkerStates
函数来举例,因为它们与问题更相关,然后再转向更一般的单子映射。这是它们的类型签名:
randomArrayWS ::
(Mutable r ix e, MonadUnliftIO m, PrimMonad m)
=> WorkerStates g
-> Sz ix
-> (g -> m e)
-> m (Array r ix e)
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s)
对于那些不熟悉massiv
的人来说, Comp
参数是一个计算策略,可以使用以下重要的构造函数:
Seq
- 顺序运行计算,不分叉线程
Par
- 启动与能力一样多的线程,并使用这些线程来完成工作。
我将首先以mwc-random
包为例,然后转向RVarT
:
λ> import Data.Massiv.Array
λ> import System.Random.MWC (createSystemRandom, uniformR)
λ> import System.Random.MWC.Distributions (standard)
λ> gens <- initWorkerStates Par (\_ -> createSystemRandom)
上面我们使用系统随机性为每个线程初始化了一个单独的生成器,但是我们同样可以通过从
WorkerId
参数派生出唯一的线程种子来实现。该参数仅仅是工作线程的
Int
索引。现在,我们可以使用这些生成器创建一个具有随机值的数组:
λ> randomArrayWS gens (Sz2 2 3) standard :: IO (Array P Ix2 Double)
Array P Par (Sz (2 :. 3))
[ [ -0.9066144845415213, 0.5264323240310042, -1.320943607597422 ]
, [ -0.6837929005619592, -0.3041255565826211, 6.53353089112833e-2 ]
]
通过使用
Par
策略,
scheduler
库将在可用工作线程之间平均分配生成工作,并且每个工作线程将使用自己的生成器,从而使其线程安全。只要不并发地重复使用相同的
WorkerStates
,就没有任何阻止措施,否则会导致异常:
λ> randomArrayWS gens (Sz1 10) (uniformR (0, 9)) :: IO (Array P Ix1 Int)
Array P Par (Sz1 10)
[ 3, 6, 1, 2, 1, 7, 6, 0, 8, 8 ]
现在将mwc-random
放在一边,我们可以通过使用诸如generateArrayWS
之类的函数来重用相同的概念,以便针对其他可能的用例:
generateArrayWS ::
(Mutable r ix e, MonadUnliftIO m, PrimMonad m)
=> WorkerStates s
-> Sz ix
-> (ix -> s -> m e)
-> m (Array r ix e)
和 mapWS
:
mapWS ::
(Source r' ix a, Mutable r ix b, MonadUnliftIO m, PrimMonad m)
=> WorkerStates s
-> (a -> s -> m b)
-> Array r' ix a
-> m (Array r ix b)
这里是使用
rvar
,
random-fu
和
mersenne-random-pure64
库的功能示例,我们也可以在这里使用
randomArrayWS
,但为了举例说明,假设我们已经有一个包含不同
RVarT
的数组,在这种情况下我们需要使用
mapWS
。
λ> import Data.Massiv.Array
λ> import Control.Scheduler (WorkerId(..), initWorkerStates)
λ> import Data.IORef
λ> import System.Random.Mersenne.Pure64 as MT
λ> import Data.RVar as RVar
λ> import Data.Random as Fu
λ> rvarArray = makeArrayR D Par (Sz2 3 9) (\ (i :. j) -> Fu.uniformT i j)
λ> mtState <- initWorkerStates Par (newIORef . MT.pureMT . fromIntegral . getWorkerId)
λ> mapWS mtState RVar.runRVarT rvarArray :: IO (Array P Ix2 Int)
Array P Par (Sz (3 :. 9))
[ [ 0, 1, 2, 2, 2, 4, 5, 0, 3 ]
, [ 1, 1, 1, 2, 3, 2, 6, 6, 2 ]
, [ 0, 1, 2, 3, 4, 4, 6, 7, 7 ]
]
重要的是要注意,尽管上面的示例中使用了Mersenne Twister的纯实现,但我们无法避免IO。这是因为非确定性调度,这意味着我们永远不知道哪个工作人员将处理数组的哪个块,因此哪个生成器将用于数组的哪个部分。好消息是,如果生成器是纯的且可分割的,例如
splitmix
,那么我们可以使用纯的、确定性的和可并行化的生成函数:
randomArray
,但这已经是一个单独的故事了。
fillChunkedIOP
做出类似于您所要求的功能。 - kosmikussplit
提供了必要的基础,但请注意有关split
实现方式的注释:“--没有统计基础!”。我倾向于认为,任何一种PRNG分裂方法都会在其分支之间留下可利用的相关性,但我没有统计背景来证明这一点。关于这个普遍问题,我不确定... - isturdy