可靠地等待多个异步操作?

3
我的代码需要启动多个线程并跟踪哪些线程已经完成,哪些仍在运行。我打算使用waitAny或者waitAnyCatch,但是在文档中发现以下内容,让我有些困惑:

如果有多个 Async 完成或已经完成,则返回的值对应于列表中第一个已完成的 Async。

如果这真的是这种情况,那么如何可靠地跟踪正在运行或已退出的线程呢?
以下是我简化后的代码:
chan <- newChan
currentThreadsRef <- newIORef []

-- read jobs from a channel, and run them in parallel asyncs/threads,
-- while adding all threads references to currentThreadsRef
async $ do
  jobArgs <- readChan chan
  jobAsync <- async $ runJob jobArgs
  atomicallyModifyIORef' currentThreadsRef $ \x -> (jobAsync:x, ())

-- wait for jobs to be finished, and remove the thread refernece
-- from currentThreadsRef 
waitForAllJobs currentJobsRef = do
  (readIORef currentJobsRef) >>= \case
    [] -> logDebug "All jobs exited"
    currentJobs -> do
      (exitedJob, jobResult) <- waitAnyCatch currentJobs
      atomicallyModifyIORef currentJobsRef $ \x -> (filter (/= exitedjob) x, ())
      logDebug $ "Job completed with result=" <> show result
      waitForAllJobs currentJobsRef

PS:虽然从我上面简化的代码中可能看不出来,但我不能仅仅在输入数据上使用mapConcurrently是有原因的。实际上,async-pool似乎很适合我的用例,但即使这样也存在与waitAny相同的问题。


1
我不理解这个问题。waitAny 返回已完成的其中一个(你还想它做什么?),然后你将其从列表中删除。然后你进行递归。只要在运行 waitForAllJobs 时没有其他人在 currentJobsRef 上写入,我看不出为什么这应该是不可靠的。 (注意:我对 async 不是很熟悉) - chi
PS:虽然从我上方的简化代码中可能不明显,但我不能仅仅在输入数据上使用mapConcurrently有原因。你能解释一下这个原因是什么吗? - Joseph Sible-Reinstate Monica
是的,你说得对。如果传递给waitAny的任何Async已经退出,它会立即报告,下一次迭代将一个接一个地获取其他内容。 - Saurabh Nanda
@JosephSible-ReinstateMonica 有一个生产者正在生产昂贵的作业,其中生产和运行作业都很昂贵。我们不能一次性生产所有作业,也不能同时并发地执行它们。使用标准的 async api 可能唯一可行的解决方案是批量生产作业,并在小批次上运行 mapConcurrently。然而,即使如此,这也是低效的,因为它需要等待批处理中的每个作业完成后才开始下一批。 - Saurabh Nanda
1个回答

4

以下是一个程序,它启动了1000个异步操作,每个操作都设置在一秒钟内终止,并在循环中等待它们全部完成。使用ghc -O2 -threaded编译并使用+RTS -N运行,它大约需要1.5秒的时间运行,没有任何异步操作会丢失:

import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Set as Set

main :: IO ()
main = do
  let n = 1000 :: Int
  asyncs0 <- mapM (\i -> async (threadDelay 1000000 >> return i)) [1..n]
  let loop :: Set.Set (Async Int) -> IO ()
      loop asyncs | null asyncs = return ()
                  | otherwise = do
                      (a, _i) <- waitAny (Set.toList asyncs)
                      loop (Set.delete a asyncs)
  loop (Set.fromList asyncs0)

如评论所述,文档涉及到的是在提供的异步列表中,第一个完成的异步将被“返回”,但如果有多个异步完成,则其他异步并不会被“遗忘”。您只需要从列表中删除已返回的异步并重新轮询,最终您将获得所有异步。

因此,使用waitAny等待多个异步不应该有任何问题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接