如何优化并行排序以提高时间性能?

9

我有一个算法可以并行排序给定长度的列表:

import Control.Parallel (par, pseq)
import Data.Time.Clock (diffUTCTime, getCurrentTime)
import System.Environment (getArgs)
import System.Random (StdGen, getStdGen, randoms)


parSort :: (Ord a) => [a] -> [a]
parSort (x:xs)    = force greater `par` (force lesser `pseq`
                                         (lesser ++ x:greater))
    where lesser  = parSort [y | y <- xs, y <  x]
          greater = parSort [y | y <- xs, y >= x]
parSort _         = []

sort :: (Ord a) => [a] -> [a]
sort (x:xs) = lesser ++ x:greater
    where lesser  = sort [y | y <- xs, y <  x]
          greater = sort [y | y <- xs, y >= x]
sort _ = []

parSort2 :: (Ord a) => Int -> [a] -> [a]
parSort2 d list@(x:xs)
  | d <= 0     = sort list
  | otherwise = force greater `par` (force lesser `pseq`
                                     (lesser ++ x:greater))
      where lesser      = parSort2 d' [y | y <- xs, y <  x]
            greater     = parSort2 d' [y | y <- xs, y >= x]
            d' = d - 1
parSort2 _ _              = []

force :: [a] -> ()
force xs = go xs `pseq` ()
    where go (_:xs) = go xs
          go [] = 1


randomInts :: Int -> StdGen -> [Int]
randomInts k g = let result = take k (randoms g)
                 in force result `seq` result

testFunction = parSort

main = do
  args <- getArgs
  let count | null args = 500000
            | otherwise = read (head args)
  input <- randomInts count `fmap` getStdGen
  start <- getCurrentTime
  let sorted = testFunction input
  putStrLn $ "Sort list N = " ++ show (length sorted)
  end <- getCurrentTime
  putStrLn $ show (end `diffUTCTime` start) 

我希望能够在2、3和4个处理器核心上执行并行排序的时间少于1个核心。 目前,我无法实现这个结果。 以下是我的程序启动:

1. SortList +RTS -N1 -RTS 10000000
time = 41.2 s
2.SortList +RTS -N3 -RTS 10000000
time = 39.55 s
3.SortList +RTS -N4 -RTS 10000000
time = 54.2 s

我能做些什么?

更新 1:

testFunction = parSort2 60

2
密切相关:https://dev59.com/CmIj5IYBdhLWcg3w8JNZ - leftaroundabout
2
force 并不是你想象中的那样:它只强制给定列表的脊柱,而不是内容。对于随机数生成,强制内容才是真正起作用的。此外,force result `seq` result 并不能做到你所希望的。在开始计时之前,在 main 中尝试将 evaluate (last input) 放入其中,你会立即看到差异。我还建议使用 mkStdGen 0(或其他常量)代替 getStdGen,以使你的计时更加一致。(但这两个都没有解决你实际的问题。) - Daniel Wagner
4
关于你实际的问题:你可能产生了太多的火花,而跟踪火花的开销占据了运行时。你可以尝试一个更简单的策略:将列表分成一些小块,比如60个,或者可能是传递给“-N”的数字,然后并行排序每个块,最后合并结果。我自己没有测试过,所以我不会将其写成答案,因为我不能确定它是否正确,但我敢打赌这种方法会表现得更好。 - Daniel Wagner
2
快速排序并行化的问题之一是它不会构建平衡的子列表。你可以通过并行化归并排序来获得更好的结果,因为它在进入时平衡,在离开时排序。 - Rein Henrichs
2
@ReinHenrichs 是的,快速排序不太适合并行处理,但即使平衡性不佳,仍应该可以获得一些性能提升,特别是对于完全随机的数字列表。 - leftaroundabout
显示剩余12条评论
1个回答

2

这里有一个你可以尝试的想法,使用 Data.Map。为了简单和性能,我假设元素类型具有可替代性,因此我们可以计算出现次数而不是存储元素列表。我相信你可以使用一些花哨的数组算法获得更好的结果,但这个方法简单且(基本上)功能齐全。

在编写并行算法时,我们希望最小化必须按顺序完成的工作量。在对列表进行排序时,有一件事情我们真正无法避免按顺序完成:将列表分成多个线程要处理的部分。我们希望尽可能少地付出努力来完成这项工作,然后尝试从那时起大部分时间都在并行工作。

让我们从一个简单的顺序算法开始。

{-# language BangPatterns, TupleSections #-}
import qualified Data.Map.Strict as M
import Data.Map (Map)
import Data.List
import Control.Parallel.Strategies

type Bag a = Map a Int

ssort :: Ord a => [a] -> [a]
ssort xs =
  let m = M.fromListWith (+) $ (,1) <$> xs
  in concat [replicate c x | (x,c) <- M.toList m]

我们如何实现并行化?首先,让我们将列表分成几个部分。有许多方法可以做到这一点,但都不太好。假设我们只有少量的资源,我认为让它们各自遍历列表是合理的。当然,也可以尝试其他方法来进行实验。
-- | Every Nth element, including the first
everyNth :: Int -> [a] -> [a]
everyNth n | n <= 0 = error "What you doing?"
everyNth n = go 0 where
  go !_ [] = []
  go 0 (x : xs) = x : go (n - 1) xs
  go k (_ : xs) = go (k - 1) xs

-- | Divide up a list into N pieces fairly. Walking each list in the
-- result will walk the original list.
splatter :: Int -> [a] -> [[a]]
splatter n = map (everyNth n) . take n . tails

现在我们有了列表的片段,我们启动线程将它们转换为袋子。

parMakeBags :: Ord a => [[a]] -> Eval [Bag a]
parMakeBags xs = 
  traverse (rpar . M.fromListWith (+)) $ map (,1) <$> xs

现在我们可以重复合并袋子,直到只剩下一个。
parMergeBags_ :: Ord a => [Bag a] -> Eval (Bag a)
parMergeBags_ [] = pure M.empty
parMergeBags_ [t] = pure t
parMergeBags_ q = parMergeBags_ =<< go q where
  go [] = pure []
  go [t] = pure [t]
  go (t1:t2:ts) = (:) <$> rpar (M.unionWith (+) t1 t2) <*> go ts

但是...有一个问题。在每一轮的合并中,我们使用的能力只有前一轮的一半,并且最后一次合并只使用一个能力。哎呀!为了解决这个问题,我们需要并行化unionWith。幸运的是,这很容易!
import Data.Map.Internal (Map (..), splitLookup, link)

parUnionWith
  :: Ord k
  => (v -> v -> v)
  -> Int -- Number of threads to spark
  -> Map k v
  -> Map k v
  -> Eval (Map k v)
parUnionWith f n t1 t2 | n <= 1 = rseq $ M.unionWith f t1 t2
parUnionWith _ !_ Tip t2 = rseq t2
parUnionWith _ !_ t1 Tip = rseq t1
parUnionWith f n (Bin _ k1 x1 l1 r1) t2 = case splitLookup k1 t2 of
  (l2, mb, r2) -> do
    l1l2 <- parEval $ parUnionWith f (n `quot` 2) l1 l2
    r1r2 <- parUnionWith f (n `quot` 2) r1 r2
    case mb of
      Nothing -> rseq $ link k1 x1 l1l2 r1r2
      Just x2 -> rseq $ link k1 fx1x2 l1l2 r1r2
        where !fx1x2 = f x1 x2

现在我们可以完全并行化袋子合并:

-- Uses the given number of capabilities per merge, initially,
-- doubling for each round.
parMergeBags :: Ord a => Int -> [Bag a] -> Eval (Bag a)
parMergeBags !_ [] = pure M.empty
parMergeBags !_ [t] = pure t
parMergeBags n q = parMergeBags (n * 2) =<< go q where
  go [] = pure []
  go [t] = pure [t]
  go (t1:t2:ts) = (:) <$> parEval (parUnionWith (+) n t1 t2) <*> go ts

我们可以像这样实现并行合并:
parMerge :: Ord a => [[a]] -> Eval [a]
parMerge xs = do
  bags <- parMakeBags xs
  -- Why 2 and not one? We only have half as many
  -- pairs as we have lists (capabilities we want to use)
  -- so we double up.
  m <- parMergeBags 2 bags
  pure $ concat [replicate c x | (x,c) <- M.toList m]

将所有的部分组合在一起,

parSort :: Ord a => Int -> [a] -> Eval [a]
parSort n = parMerge . splatter n

pSort :: Ord a => Int -> [a] -> [a]
pSort n = runEval . parMerge . splatter n

只剩下一个连续的部分可以并行化:将最后一个袋子转换为列表。这个值得并行化吗?我相信在实践中并不是很值得。但是,让我们为了好玩而这样做!为了避免相当大的额外复杂性,我假设没有大量相等的元素;结果中重复的元素将导致一些工作(thunks)留在结果列表中。

我们需要一个基本的部分列表脊柱强制器:

-- | Force the first n conses of a list
walkList :: Int -> [a] -> ()
walkList n _ | n <= 0 = ()
walkList _ [] = ()
walkList n (_:xs) = walkList (n - 1) xs

现在,我们可以将袋子并行地转换为列表,而无需支付连接的费用:
-- | Use up to the given number of threads to convert a bag
-- to a list, appending the final list argument.
parToListPlus :: Int -> Bag k -> [k] -> Eval [k]
parToListPlus n m lst | n <= 1 = do
  rseq (walkList (M.size m) res)
  pure res
  -- Note: the concat and ++ should fuse away when compiling with
  -- optimization.
  where res = concat [replicate c x | (x,c) <- M.toList m] ++ lst
parToListPlus _ Tip lst = pure lst
parToListPlus n (Bin _ x c l r) lst = do
  r' <- parEval $ parToListPlus (n `quot` 2) r lst
  res <- parToListPlus (n `quot` 2) l $ replicate c x ++ r'
  rseq r' -- make sure the right side is finished
  pure res

然后我们相应地修改合并程序:

parMerge :: Ord a => Int -> [[a]] -> Eval [a]
parMerge n xs = do
  bags <- parMakeBags xs
  m <- parMergeBags 2 bags
  parToListPlus n m []

我们能否有一些基准来检查这是否实际上通过并行化导致了性能的提高? - Dan Robertson
1
@DanRobertson,我进行了一些非正式的基准测试,似乎它确实有效(在像[1,-1,2,-2,...,2 * 10 ^ 6]这样的列表上,pSort 2pSort 1快得多),但我的笔记本电脑性能中等,不太适合这种事情。Threadscope似乎表明负载相当平衡,但我以前从未使用过,因此无法保证我正确解释了结果。 - dfeuer
1
@DanRobertson,使用伪随机输入至少可以很好地扩展到4个内核。我认为还有更多的调整需要完成(例如,可能最好使用通过“monad-par”可用的更细粒度并行性控制,而不是依赖于内置火花池),但一般的方法似乎是不错的。 - dfeuer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接