为什么我的并行遍历 Haskell 程序会泄漏内存?

3
请看下面这个Haskell程序(我主要是为了学习而做的):
import qualified Control.Concurrent.MSem as Sem
import System.Environment (getArgs)
import Control.Concurrent (forkIO)
import Control.Monad

-- Traverse with maximum n threads
parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> Sem.with sem (forkIO $ action value)

main :: IO ()
main = do
  args <- getArgs
  let nThreads = read . head $ args :: Int
  parallelTraverse nThreads print [(1::Int)..]

当我运行它时,内存很快就升高到几个GB的大小。我尝试了各种组合,以确保丢弃中间计算的结果(即打印操作)。为什么它仍然在泄漏空间?

你没有防止无限数量的线程同时运行。仔细阅读你的程序。 - Reid Barton
1
@ReidBarton:我想我明白了:forkIO 立即返回,使信号量无用。回到设计阶段 :) - static_rtti
1个回答

6

首先,您在以下部分中存在明显的错误:

Sem.with sem (forkIO $ action value)

您正在主线程中在“fork”操作周围调用信号量,而不是在该操作中进行操作。以下是正确实现的方法:
forkIO (Sem.with sem (action value))

即,在分叉线程的上下文中解决信号量问题。

其次,在以下代码中,您正在对无限列表调用parallelTraverse操作:

parallelTraverse nThreads print [(1::Int)..]

这段文字涉及IT技术相关内容。由于使用forkIO操作会导致线程无限分叉,因此很快就会耗尽资源。为了限制工作线程的数量,with模式并不适用于您的情况。相反,您应该使用waitsignal的显式组合,并不要忘记正确处理异常(如果您期望它们)。例如:
parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> do
    Sem.wait sem
    forkIO $ finally (action value) (Sem.signal sem)

我的目标是限制并发线程的数量,但现在我明白为什么它不起作用了。无限列表是有意设计的,因为我希望我的程序能够处理无限流的操作。 - static_rtti
非常感谢。我只需要一点额外的帮助来理解我需要在forkIO的两侧等待和发出信号,这就是为什么(如果我理解正确)在这种情况下不能使用with的原因。 - static_rtti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接