运行时错误:在async + apscheduler中的线程中没有当前事件循环。

117

我有一个异步函数,需要每N分钟使用apscheduler运行它。下面是Python代码:

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a url, using specified ClientSession."""
    async with session.get(url) as response:
        resp = await response.read()
        print(resp)

if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

但是当我试图运行它时,出现了下一个错误信息。

Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.

你能帮我一下吗?Python 3.6,APScheduler 3.3.1。

8个回答

205

在你的 def demo_async(urls) 中,尝试替换:

loop = asyncio.get_event_loop()

使用:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

31
不要这样做!这只会导致第二个事件循环的运行。考虑到APScheduler对协程函数的本地支持,这是完全没有意义的。如果您尝试使在一个事件循环中运行的事物与第一个事件循环交互,将会发生糟糕的事情。 - Alex Grönholm
26
有时候你需要第二个事件循环,有时候则不需要。 - Erik Aronesty
20
在单独的线程中运行循环时,我发现这是必要的。 - Chen A.
7
嗨,@AlexGrönholm,你能否解释一下为什么这很糟糕?那会非常有价值。 - Anton Daneyko
4
@AlexGrönholm,你能否详细阐述一下你的第二条评论,并且可以将我引导到任何博客或文档中。我需要在我的新进程中使用get_event_loop,但我遇到了这个错误。上面的答案会解决我的问题。 - Ja8zyjits
显示剩余5条评论

46

还没有提到的重要问题是为什么会发生错误。对于我个人来说,知道错误发生的原因和解决实际问题一样重要。

让我们看一下BaseDefaultEventLoopPolicyget_event_loop的实现:

class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    ...

    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

如果满足以下所有条件,则只会执行self.set_event_loop(self.new_event_loop())

  • self._local._loop is None - _local._loop未设置
  • not self._local._set_called - 尚未调用set_event_loop
  • isinstance(threading.current_thread(), threading._MainThread) - 当前线程是主线程(在你的情况下不是True)

因此,引发异常,因为未在当前线程中设置循环:

if self._local._loop is None:
    raise RuntimeError('There is no current event loop in thread %r.'
                       % threading.current_thread().name)

1
你明白为什么 isinstance... 条件很重要吗?为什么我们只在 _MainThread 中创建事件循环? - Anton Daneyko
3
因为在Linux上,SIGCHLD只会被发送到主线程,所以只有在主线程中运行的事件循环才能确认已终止的子进程。 - Alex Grönholm
这非常有帮助。因为asyncio.get_event_loop()会抛出RuntimeException("There is no current event loop in thread 'MainThread'.")异常,我的解决方法是在try块中调用它,然后跟着except RuntimeException: loop = asyncio.new_event_loop() - Kevin-Prichard

16

只需将fetch_all直接传递给scheduler.add_job()即可。 asyncio调度程序支持协程函数作为作业目标。

如果目标可调用对象不是协程函数,则会在工作线程中运行(由于历史原因),因此引发异常。


1
你好,感谢回复。您能解释一下为什么调度程序添加作业比添加第二个事件循环更好吗?@radzak在他的回复中提到问题存在的原因是他不在主线程中。 - partizanos
2
第二个事件循环只会增加更多的复杂性,却没有任何好处。 - Alex Grönholm
1
谢谢您的评论,您能详细说明一下添加第二个事件循环的理想情况吗? 如果我理解正确,它会添加第二个进程来处理并行请求,而调度程序则采用多线程解决方案(在Cpython中由于GIL的原因不能并行处理,但可以并发处理)。 - partizanos

15

我曾遇到一个类似的问题,想让我的asyncio模块能够从非asyncio脚本中调用(该脚本在gevent下运行...别问为什么...)。以下代码解决了我的问题,因为它尝试获取当前事件循环,但如果当前线程中没有事件循环,则会创建一个。在python 3.9.11中进行了测试。

try:
    loop = asyncio.get_event_loop()
except RuntimeError as e:
    if str(e).startswith('There is no current event loop in thread'):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        raise

这个答案与得票最高的答案完全相同(只是添加了try-except),该答案受到批评(请参见其评论):https://dev59.com/pFYN5IYBdhLWcg3w4bv1#46750562 - alelom
3
@alelom,只有你批评了这个答案,而且没有任何理由。而且这个答案也是有用的,在某些情况下,你想获取默认事件循环的引用,但你不知道是否有当前正在运行的循环。看起来你不理解这两种情况下的问题或代码目的。 - Paul Cornelius
仅供参考,我从未批评过我上面链接的答案。我还想提醒用户,SO的规则包括彼此友好相处,在撰写不同意的评论时请遵守这些规则。 - alelom
@PaulCornelius 这样做。至少不会有两个循环,AFAICT? - mhvelplund

7

使用asyncio.run()代替直接使用事件循环。它会创建一个新的循环并在完成后关闭。

'run' 的用法如下:

if events._get_running_loop() is not None:
    raise RuntimeError(
        "asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()

1
我正在使用 asyncio.run(),但仍然存在这个问题... - luochen1990
1
在主脚本中使用asyncio.run(),并尝试在另一个类中调用asyncio.get_event_loop()将导致出现此错误。 - HumbleBee

4
自从这个问题一直出现在第一页,我会在这里写下我的问题和我的答案。使用 flask-socketioBleak 时,我遇到了 RuntimeError:没有当前事件循环在线程 'Thread-X' 中。
编辑:好的,我重构了我的文件并创建了一个类。
我在构造函数中初始化了循环,现在一切都正常工作:
class BLE:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    # function example, improvement of
    # https://github.com/hbldh/bleak/blob/master/examples/discover.py :
    def list_bluetooth_low_energy(self) -> list:
        async def run() -> list:
            BLElist = []
            devices = await bleak.discover()
            for d in devices:
                BLElist.append(d.name)
            return 'success', BLElist
        return self.loop.run_until_complete(run())

使用方法:

ble = path.to.lib.BLE()
list = ble.list_bluetooth_low_energy()

原始答案:
解决方案很愚蠢。我没有注意到自己在做什么,但我把一些 import 从函数中移出来了,就像这样:
import asyncio, platform
from bleak import discover

def listBLE() -> dict:
    async def run() -> dict:
        # my code that keep throwing exceptions.

    loop = asyncio.get_event_loop()
    ble_list = loop.run_until_complete(run())
    return ble_list

所以我认为我需要在我的代码中做出一些改变,然后我使用这段代码创建了一个新的事件循环,就在调用get_event_loop()的那一行之前。
loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop()

此时我感到相当高兴,因为我的循环正在运行。
但是没有响应。而且我的代码依赖于超时来返回一些值,这对我的应用程序非常不利。
我花了近两个小时才发现问题在于import,以下是我的(有效的)代码:
def list() -> dict:
    import asyncio, platform
    from bleak import discover

    async def run() -> dict:
        # my code running perfectly

    loop = asyncio.get_event_loop()
    ble_list  = loop.run_until_complete(run())
    return ble_list

RuntimeError: There is no current event loop in thread 'MainThread'. - Gray Programmerz

3

阅读给定的答案后,我只能通过使用此页面上https://dev59.com/pFYN5IYBdhLWcg3w4bv1#46750562中的提示(尝试替换)来修复我的WebSocket线程。

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
BaseDefaultEventLoopPolicy的文档说明如下:

BaseDefaultEventLoopPolicy是访问事件循环的默认策略实现。在此策略中,每个线程都有自己的事件循环。然而,默认情况下,我们只为主线程自动创建事件循环;其他线程默认情况下没有事件循环。

因此,在使用线程时,需要创建事件循环。
我不得不重新排列我的代码,以便最终的代码。
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    # !!! Place code after setting the loop !!!
    server = Server()
    start_server = websockets.serve(server.ws_handler, 'localhost', port)

-1
在我的情况下,这行代码是这样的。
asyncio.get_event_loop().run_until_complete(test())

我用这行代码替换了上面的那行,解决了我的问题。

asyncio.run(test())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接