Python3.x 运行时错误:事件循环已关闭。

6

我在使用aiohttp/asyncio时遇到了问题。如果我只运行下面这段代码一次,它可以正常工作,但是当我尝试从另一个文件中使用循环来调用run_my_job()时:

main.py
========================================
 count = 0
    batch_count = math.ceil((abc.get_count()/100))
    print("there are {} batches to complete.".format(batch_count))
    while count < batch_count:
        print("starting batch {}...".format(count))
        abc.run_my_job()
        print("batch {} completed...".format(count))
        count += 1


abc.py
===============================
def run_my_job(self):
    self.queue_manager(self.do_stuff(all_the_tasks))

def queue_manager(self, method):
    print('starting event queue')
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(method)
    loop.run_until_complete(future)
    loop.close()

async def async_post(self, resource, session, data):
    async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
        resp = await response.read()
    return resp

async def do_stuff(self, data):
    print('queueing tasks')

    tasks = []
    async with aiohttp.ClientSession() as session:
        for row in data:
            task = asyncio.ensure_future(self.async_post('my_api_endpoint', session, row))
            tasks.append(task)
        result = await asyncio.gather(*tasks)
        self.load_results(result)
# goes on to load_results() method that parses json and updates the DB.

I get these errors:

Traceback (most recent call last):
  File "C:/usr/PycharmProjects/api_framework/api_framework.py", line 37, in <module>
starting event queue
    abc.run_my_job()
  File "C:\usr\PycharmProjects\api_framework\api\abc\abc.py", line 77, in run_eligibility
    self.queue_manager(self.verify_eligibility(json_data))
  File "C:\usr\PycharmProjects\api_framework\api\abc\abc.py", line 187, in queue_manager
    future = asyncio.ensure_future(method)
  File "C:\Python36x64\lib\asyncio\tasks.py", line 512, in ensure_future
    task = loop.create_task(coro_or_future)
  File "C:\Python36x64\lib\asyncio\base_events.py", line 282, in create_task
    self._check_closed()
  File "C:\Python36x64\lib\asyncio\base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
sys:1: RuntimeWarning: coroutine 'Consumer.run_my_job' was never awaited
1个回答

12

看这个函数:

def queue_manager(self, method):
    print('starting event queue')
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(method)
    loop.run_until_complete(future)
    loop.close()

这就是你调度每个任务所需要的内容。在函数末尾,关闭事件循环。因此,第一个任务运行后,您会关闭事件循环。

如果你试图在此之后运行更多任务,显然是在关闭的事件循环上运行它们。(而且你有些任务还继续运行其他任务)。错误信息因此产生:

RuntimeError: Event loop is closed

只需删除loop.close(),问题就会消失。

我不确定这是否足以使您的程序正常工作,因为您没有给出任何可运行的示例。此外,在您的真实代码中,run_my_job 显然是一个协程,但在您在这里发布的代码中并非如此。我在您发布的内容中没有看到其他明显的错误,但我不知道这意味着多少。


1
搞定了。我原本以为需要关闭循环以避免问题。那么我到底需要关闭循环吗? - hyphen
4
你想要闭合循环,但是在程序的最后,当所有任务完成之后,而不是在每个任务完成之后。 - abarnert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接