Python - 取消 asyncio 中的任务?

4

我已经写了以下异步池的代码。在__aexit__中,当任务完成后,我会取消工作任务。但是当我运行代码时,工作任务没有被取消,代码一直在运行。这是任务的样子:<Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>>。虽然asyncio.wait_for被取消了,但工作任务并没有被取消。

class AsyncPool:
    def __init__(self,coroutine,no_of_workers,timeout):
        self._loop           = asyncio.get_event_loop()
        self._queue          = asyncio.Queue()
        self._no_of_workers  = no_of_workers
        self._coroutine      = coroutine
        self._timeout        = timeout
        self._workers        = None

    async def _worker(self): 
        while True:
            try:
                ret = False
                queue_item           = await self._queue.get()
                ret = True
                result               = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)
            except Exception as e:
                print(e)
            finally:
                if ret:
                    self._queue.task_done()


    async def push_to_queue(self,item):
        self._queue.put_nowait(item)
    
    async def __aenter__(self):
        assert self._workers == None
        self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]
        return self
    
    async def __aexit__(self,type,value,traceback):
        await self._queue.join()

        for worker in self._workers:
            worker.cancel()

        await asyncio.gather(*self._workers, loop=self._loop, return_exceptions =True)

使用Asyncpool:

async def something(item):
    print("got", item)
    await asyncio.sleep(item)
 
async def main():
    async with AsyncPool(something, 5, 2) as pool:
        for i in range(10):
            await pool.push_to_queue(i)
 
asyncio.run(main())

我终端中的输出: 在此输入图片描述

请编辑问题,包括可重现问题的测试代码。当我尝试通过添加一个小的 main() 像这样 来复制它时,我得到了一个 CancelledError。这是因为 gather() 等待取消的任务。在 gather 调用中添加 return_exceptions=True 可以修复此错误,并且不会打印任何警告。 - user4815162342
@user4815162342,抱歉,我用了你的例子编辑了问题。 - user12066563
没有任何警告,任务仍在运行。@user4815162342。我想取消这些任务,但没有成功。 - user12066563
你如何得出任务正在运行的结论?当我按照提示运行代码时,任务会被取消(通过 gather 完成),程序退出,因此任务显然无法继续运行。你能否编辑问题以包含可以复现问题的可运行的代码? - user4815162342
让我们在聊天中继续这个讨论 - user12066563
显示剩余3条评论
3个回答

1
问题在于你的except Exception异常子句也捕获了取消操作并忽略了它。更让人困惑的是,print(e)在出现CancelledError时只会打印一个空行,这就是输出中空行的来源。 (将其更改为print(type(e))可以显示正在发生的情况。)
要纠正这个问题,请将except Exception更改为更具体的内容,例如except asyncio.TimeoutError。在Python 3.8中不需要进行此更改,因为asyncio.CancelledError不再派生自Exception,而是从BaseException派生,因此except Exception无法捕获它。

我正在使用AsyncPool从TMDB拉取数据。但是我遇到了一个问题,即https://stackoverflow.com/questions/63764830/asyncio-gather-stuck-and-how-to-handle-exceptions 如果可能的话,您能否查看一下这个问题?谢谢。 - user12066563
@Madvillainy 我看到了那个问题,但我不知道它出了什么问题。如果是我,我会尝试尽可能多地放置打印语句,以查看程序在哪里卡住了。一旦你明白了程序卡住的位置,就有希望弄清楚为什么会发生这种情况以及如何解决它。 - user4815162342
我会试一下。谢谢。 - user12066563

0

这似乎有效。 event 是一个计数器,当它到期时,它会取消任务。

import asyncio
from datetime import datetime as dt
from datetime import timedelta as td
import random
import time

class Program:
    def __init__(self):
        self.duration_in_seconds = 20
        self.program_start = dt.now()
        self.event_has_expired = False
        self.canceled_success = False

        

    async def on_start(self):
        print("On Start Event Start! Applying Overrides!!!")
        await asyncio.sleep(random.randint(3, 9))


    async def on_end(self):
        print("On End Releasing All Overrides!")
        await asyncio.sleep(random.randint(3, 9))
        

    async def get_sensor_readings(self):
        print("getting sensor readings!!!")
        await asyncio.sleep(random.randint(3, 9))   

 
    async def evauluate_data(self):
        print("checking data!!!")
        await asyncio.sleep(random.randint(3, 9))   


    async def check_time(self):
        if (dt.now() - self.program_start > td(seconds = self.duration_in_seconds)):
            self.event_has_expired = True
            print("Event is DONE!!!")

        else:
            print("Event is not done! ",dt.now() - self.program_start)



    async def main(self):
        # script starts, do only once self.on_start()
        await self.on_start()
        print("On Start Done!")

        while not self.canceled_success:

            readings = asyncio.ensure_future(self.get_sensor_readings())
            analysis = asyncio.ensure_future(self.evauluate_data())
            checker = asyncio.ensure_future(self.check_time())
            
            if not self.event_has_expired:
                await readings   
                await analysis           
                await checker
                
            else:
                # close other tasks before final shutdown
                readings.cancel()
                analysis.cancel()
                checker.cancel()
                self.canceled_success = True
                print("cancelled hit!")


        # script ends, do only once self.on_end() when even is done
        await self.on_end()
        print('Done Deal!')


async def main():
    program = Program()
    await program.main()

0

当您创建并取消asyncio任务时,仍然有需要“回收”的活动任务。因此,您要await worker等待它。但是,一旦您await这样的取消任务,由于它永远不会给出期望的返回值,asyncio.CancelledError将被引发,您需要在某个地方捕获它。

由于这种行为,我认为您不应该gather它们,而应该await每个已取消的任务,因为它们应该立即返回:

async def __aexit__(self,type,value,traceback):
    await self._queue.join()

    for worker in self._workers:
        worker.cancel()
    for worker in self._workers:
        try:
           await worker
        except asyncio.CancelledError:
           print("worker cancelled:", worker)

等待工作进程也不起作用,我得到了同样的东西,即<Task pending coro=<AsyncPool._worker() running at async_pool.py:17> wait_for=<Future cancelled>>如果我在await之前打印它,代码仍在运行并等待工作进程。 - user12066563

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接