我希望使用Haskell语言中的Vector
实现Floyd-Warshall全源最短路径算法,以期获得良好的性能表现。
这个实现非常直接,但是我们使用的是一个二维向量而不是一个三维的 |V|×|V|×|V| 矩阵,因为我们只会读取前一个k
值。
因此,该算法实际上只是一系列步骤,其中传入一个2D向量并生成一个新的2D向量。最终的2D向量包含所有节点(i,j)之间的最短路径。
我的直觉告诉我,在每个步骤之前,确保先评估前一个2D向量非常重要,所以我在fw
函数的prev
参数和严格的foldl'
上使用了BangPatterns
:
{-# Language BangPatterns #-}
import Control.DeepSeq
import Control.Monad (forM_)
import Data.List (foldl')
import qualified Data.Map.Strict as M
import Data.Vector (Vector, (!), (//))
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as V hiding (length, replicate, take)
type Graph = Vector (M.Map Int Double)
type TwoDVector = Vector (Vector Double)
infinity :: Double
infinity = 1/0
-- calculate shortest path between all pairs in the given graph, if there are
-- negative cycles, return Nothing
allPairsShortestPaths :: Graph -> Int -> Maybe TwoDVector
allPairsShortestPaths g v =
let initial = fw g v V.empty 0
results = foldl' (fw g v) initial [1..v]
in if negCycle results
then Nothing
else Just results
where -- check for negative elements along the diagonal
negCycle a = any not $ map (\i -> a ! i ! i >= 0) [0..(V.length a-1)]
-- one step of the Floyd-Warshall algorithm
fw :: Graph -> Int -> TwoDVector -> Int -> TwoDVector
fw g v !prev k = V.create $ do -- ← bang
curr <- V.new v
forM_ [0..(v-1)] $ \i ->
V.write curr i $ V.create $ do
ivec <- V.new v
forM_ [0..(v-1)] $ \j -> do
let d = distance g prev i j k
V.write ivec j d
return ivec
return curr
distance :: Graph -> TwoDVector -> Int -> Int -> Int -> Double
distance g _ i j 0 -- base case; 0 if same vertex, edge weight if neighbours
| i == j = 0.0
| otherwise = M.findWithDefault infinity j (g ! i)
distance _ a i j k = let c1 = a ! i ! j
c2 = (a ! i ! (k-1))+(a ! (k-1) ! j)
in min c1 c2
然而,当使用有47978条边的1000个节点图运行此程序时,情况并不理想。内存使用率非常高,程序运行时间过长。该程序是使用ghc -O2
编译的。
我为了进行性能分析重建了程序,并将迭代次数限制为50次:
results = foldl' (fw g v) initial [1..50]
我随后使用
+RTS -p -hc
和 +RTS -p -hd
运行了该程序:
这很有趣,但我猜它表明程序正在累积大量的thunks。这不好。
好的,经过一些试验,我在 fw
中添加了一个 deepseq
来确保 prev
实际上已被评估:
let d = prev `deepseq` distance g prev i j k
现在情况好多了,我可以顺利地运行程序并且内存使用保持不变。很明显,对于
prev
参数的bang操作是不够的。为了与之前的图形进行比较,这里是添加
deepseq
后50次迭代的内存使用情况:
好了,现在情况有所改善,但我仍有一些问题:
- 这是解决空间泄漏的正确方法吗?我感觉插入一个
deepseq
有点丑陋? - 我在这里使用
Vector
的方式是否惯用/正确?我为每次迭代构建一个全新的向量,并希望垃圾收集器会删除旧的Vector
。 - 是否还有其他方法可以通过这种方法使运行速度更快?
graph.txt
: http://sebsauvage.net/paste/?45147f7caf8c5f29#7tiCiPovPHWRm1XNvrSb/zNl3ujF3xB3yehrxhEdVWw=
这是main
:main = do
ls <- fmap lines $ readFile "graph.txt"
let numVerts = head . map read . words . head $ ls
let edges = map (map read . words) (tail ls)
let g = V.create $ do
g' <- V.new numVerts
forM_ [0..(numVerts-1)] (\idx -> V.write g' idx M.empty)
forM_ edges $ \[f,t,w] -> do
-- subtract one from vertex IDs so we can index directly
curr <- V.read g' (f-1)
V.write g' (f-1) $ M.insert (t-1) (fromIntegral w) curr
return g'
let a = allPairsShortestPaths g numVerts
case a of
Nothing -> putStrLn "Negative cycle detected."
Just a' -> do
putStrLn $ "The shortest, shortest path has length "
++ show ((V.minimum . V.map V.minimum) a')
foldl'
和forM_
计算重写为显式循环吗?(例如在此处的test0
中所做的,尽管使用的是数组而不是向量。并且在此处,用循环代替通常的forM
)。 - Will Nessfoldl'
,但似乎没有什么效果。看到你链接的两个示例都充满了unsafe*
函数,有点让人沮丧 - 我真的希望在不使用这些函数的情况下可以实现合理的性能。 :-) - betaTwoDVector
只是矩阵,对吗?你考虑过使用Repa吗?Simon Marlow在几个不同的上下文中都实现了FW算法作为示例,例如这个链接:http://chimera.labs.oreilly.com/books/1230000000929/ch05.html#sec_par-repa-array-ops - Chad Scherrer