如何在Haskell中使用并行策略

10

我有一个函数frequencyBy,我想并行化它。以下是一个简单的测试案例:

import Control.Parallel.Strategies
import Control.DeepSeq
import System.Environment

frequencyBy :: (a -> b -> Bool) -> [a] -> [b] -> [(a,Int)]
frequencyBy f as bs = map 
    (\a ->(a, foldr (\b -> if f a b then (+) 1 else id) 0 bs)) as

main :: IO ()
main = do
  x:xs <- getArgs
  let result = frequencyBy (==) [1::Int .. 10000] [1 .. (read x)] `using` 
                 parList rdeepseq
  print $ product $ map snd $ result

我想并行运行frequencyBy中的map。我试图使用parList rdeepseq来实现这一点(main中的所有其他内容只是为了确保不会优化掉所有东西)。然而,这并不起作用,两个线程在相同的时间内做的工作量是一个线程的两倍。我不理解我在这里做错了什么。


3
如果两个线程在同样的时间内完成的工作量是一个线程的两倍,那么这是否意味着它正正确地进行并行处理呢? - ehird
2个回答

11
可能是开销导致速度变慢,具体取决于x的大小;如果每个spark的工作量与产生每个spark的时间相当(当然还有调度开销等),那么就会遇到问题。您可以尝试使用parListChunk,例如parListChunk 64 rdeepseq,您需要进行实验以确定要使用的块大小。虽然当前策略为列表中的每个元素创建一个spark,但parListChunk将为列表中特定大小的每个块创建一个spark,并且使用您指定的策略顺序地对该块中的每个元素进行操作。顺便说一下,在frequencyBy中的foldr可能会因过多的thunk创建而减慢速度;可以使用类似以下的方式:
frequencyBy :: (a -> b -> Bool) -> [a] -> [b] -> [(a,Int)]
frequencyBy f as bs = map (\a -> (a, sum . map (const 1) . filter (f a) $ bs)) as

应该修复它。

当然,像往常一样,确保使用 -O2 编译并使用 +RTS -N 运行。


这不是相同的代码;OP的函数等同于 sum . map (const 1) $ filter (f a) bs 或者 length $ filter (f a) bs,但对我来说都不是改进(使用length也更慢)。 - John L
parListChunk 2 rdeepseq 已经解决了问题,并确保在两个线程上只需要一半的时间(与一个线程相比)。不过这似乎很奇怪,为什么对于大小为1的块进行评估会产生太多的开销,而大小为2的块则可以实现完美的并行化呢? - user362382
我之前使用了 sum . map (const 1) $ filter (f a) bs,但是我发现手动融合到 foldr 中速度更快。 - user362382
@user352382:你在使用 x 的什么值? - John L
@JohnL:已修复,谢谢!@user352382:如果一个Spark需要花费一定的时间来生成,那么你在这个Spark中所做的工作必须足够昂贵,否则生成所需的时间会导致过多的开销。如果“parListChunk 2”运行良好,那么2在这里就足够了 :) - ehird
顺便说一下,使用 GHC 时,我认为 [Int] 上的 sum 将使用 foldl' 而不是 foldr,这就是我建议它的原因。 - ehird

7
我认为你的并行度太细了。 parList 尝试并行评估每个元素,但实际上每个元素的工作量并不大。
当我从 parList 改为 parListChunk 500 时,执行时间增加了近50%;因为我的机器是双核的,所以这已经是最好的情况了。
参考一下,我正在测试 x=20000

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接