我的代码需要启动多个线程并跟踪哪些线程已经完成,哪些仍在运行。我打算使用
以下是我简化后的代码:
waitAny
或者waitAnyCatch
,但是在文档中发现以下内容,让我有些困惑:
如果这真的是这种情况,那么如何可靠地跟踪正在运行或已退出的线程呢?如果有多个 Async 完成或已经完成,则返回的值对应于列表中第一个已完成的 Async。
以下是我简化后的代码:
chan <- newChan
currentThreadsRef <- newIORef []
-- read jobs from a channel, and run them in parallel asyncs/threads,
-- while adding all threads references to currentThreadsRef
async $ do
jobArgs <- readChan chan
jobAsync <- async $ runJob jobArgs
atomicallyModifyIORef' currentThreadsRef $ \x -> (jobAsync:x, ())
-- wait for jobs to be finished, and remove the thread refernece
-- from currentThreadsRef
waitForAllJobs currentJobsRef = do
(readIORef currentJobsRef) >>= \case
[] -> logDebug "All jobs exited"
currentJobs -> do
(exitedJob, jobResult) <- waitAnyCatch currentJobs
atomicallyModifyIORef currentJobsRef $ \x -> (filter (/= exitedjob) x, ())
logDebug $ "Job completed with result=" <> show result
waitForAllJobs currentJobsRef
PS:虽然从我上面简化的代码中可能看不出来,但我不能仅仅在输入数据上使用mapConcurrently
是有原因的。实际上,async-pool
似乎很适合我的用例,但即使这样也存在与waitAny
相同的问题。
waitAny
返回已完成的其中一个(你还想它做什么?),然后你将其从列表中删除。然后你进行递归。只要在运行waitForAllJobs
时没有其他人在currentJobsRef
上写入,我看不出为什么这应该是不可靠的。 (注意:我对async
不是很熟悉) - chimapConcurrently
有原因。你能解释一下这个原因是什么吗? - Joseph Sible-Reinstate MonicawaitAny
的任何Async
已经退出,它会立即报告,下一次迭代将一个接一个地获取其他内容。 - Saurabh Nandaasync
api 可能唯一可行的解决方案是批量生产作业,并在小批次上运行mapConcurrently
。然而,即使如此,这也是低效的,因为它需要等待批处理中的每个作业完成后才开始下一批。 - Saurabh Nanda