运行时错误:超时上下文管理器应在任务内使用。

36

背景:我正在托管一个Flask服务器和一个Discord客户端

Flask服务器只需要将客户端的消息传递到Discord,并将Discord的消息传递到客户端。

当我调用 loop.run_until_complete(sendMsg(request)) 时,我遇到了错误。

我已经尝试在 sendMsg 中使用 wait_forwait_for loop.run_until_complete()

我已经到处查找,但没有找到任何内容,所以任何帮助都将不胜感激。

代码:

import discord
import json
import os
import asyncio
from flask import Flask, request, render_template
from async_timeout import timeout
from threading import Thread
from time import sleep

client = discord.Client()
messages = []
app = Flask(__name__)

def startClient():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    client.run('token')


#
# Discord Events
#
@client.event
async def on_ready():
    print('Discord Client Ready')

@client.event
async def on_message(message):
    global messages
    message.append(message)


#
# Flask Stuff
#
async def sendMsg(request):
    await client.send_message(discord.Object('channel id'), request.form['message'])


@app.route("/chat/", methods=['GET', 'POST'])
def chatPage():
    global messages

    if request.method == 'GET':
        return render_template('main.html')

    elif request.method == 'POST':
        loop = asyncio.new_event_loop()
        loop.run_until_complete(sendMsg(request))
        return ''

@app.route("/chat/get", methods=['GET'])
def chatGet():
    return json.dumps(messages[int(request.args['lastMessageId']):])


# Start everything
os.environ["WERKZEUG_RUN_MAIN"] = 'true'
print('Starting discord.py client')
Thread(target=startClient).start()
print('Starting flask')
app.run(host='0.0.0.0', debug=True)

跟踪:

Traceback (most recent call last):
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/app.py", line 2309, in __call__
    return self.wsgi_app(environ, start_response)
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/app.py", line 2295, in wsgi_app
    response = self.handle_exception(e)
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/app.py", line 1741, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/_compat.py", line 35, in reraise
    raise value
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/app.py", line 2292, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/app.py", line 1815, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/app.py", line 1718, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/_compat.py", line 35, in reraise
    raise value
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/app.py", line 1813, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/mnt/c/Users/SuperKooks/Documents/Coding/HTML/kindle-discord/app.py", line 51, in chatPage
    loop.run_until_complete(sendMsg(request))
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/mnt/c/Users/SuperKooks/Documents/Coding/HTML/kindle-discord/app.py", line 39, in sendMsg
    await client.send_message(discord.Object('382416348007104513'), request.form['message'])
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/discord/client.py", line 1152, in send_message
    data = yield from self.http.send_message(channel_id, content, guild_id=guild_id, tts=tts, embed=embed)
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/discord/http.py", line 137, in request
    r = yield from self.session.request(method, url, **kwargs)
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/aiohttp/client.py", line 555, in __iter__
    resp = yield from self._coro
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/aiohttp/client.py", line 197, in _request
    with Timeout(timeout, loop=self._loop):
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/async_timeout/__init__.py", line 39, in __enter__
    return self._do_enter()
  File "/home/SuperKooks/.local/lib/python3.5/site-packages/async_timeout/__init__.py", line 76, in _do_enter
    raise RuntimeError('Timeout context manager should be used '
RuntimeError: Timeout context manager should be used inside a task
5个回答

38

aiohttp.ClientSession() 需要在协程中调用。 请将您的 Client() 初始化程序移动到任何一个 async def 函数中。


4
这正是我的问题,虽然我不确定它是否回答了上面的问题。 - Alex Lowe
1
那么每次调用 async def 函数时,Client() 对象都会被创建吗? - alper
是的,每次调用它时都会创建。如果这代表与服务器的单个“会话”,那么没问题。但是,如果您想继续使用它,请考虑将其放在某个地方。或者更好的方法是重复使用 cookies! - kolypto

12

这个问题看起来可能是由以下原因引起的:

elif request.method == 'POST':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(sendMsg(request))

这将创建一个新的事件循环并在其中运行sendMsg(request)。然而,sendMsg调用了client对象上的方法,在其自己的事件循环中运行。为了实现这一点,您需要:

  • 公开在startClient中创建的事件循环,例如公开一个名为client_loop的全局变量;
  • 用调用asyncio.run_coroutine_threadsafe的方式替换loop = asyncio.new_event_loop(); loop.run_until_complete(sendMsg(request)),以将协程提交到在其他线程中已经运行的事件循环中。

提交代码如下所示:

elif request.method == 'POST':
    # submit the coroutine to the event loop thread
    send_fut = asyncio.run_coroutine_threadsafe(sendMsg(request), client_loop)
    # wait for the coroutine to finish
    send_fut.result()

我现在正在使用aiohttp,但是我仍然遇到错误。我尝试了这段代码,但是每当我触发新的代码时,它会挂起进程。 - SuperKooks
@SuperKooks 你尝试过调试吗?sendMsg 协程是否开始执行?是否执行完成?如果它挂起了,具体是在哪里挂起的?提供一个能够重现问题的最小示例也会有所帮助。 - user4815162342

7
我通过将所有调用 asyncio.run 的代码替换为下面的 asyncio_run 来解决了这个问题。对我来说,它解决了这两个错误:
  • RuntimeError: Timeout context manager should be used inside a task
  • RuntimeError: This event loop is already running
pip install nest-asyncio

import asyncio
import nest_asyncio

def asyncio_run(future, as_task=True):
    """
    A better implementation of `asyncio.run`.

    :param future: A future or task or call of an async method.
    :param as_task: Forces the future to be scheduled as task (needed for e.g. aiohttp).
    """

    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:  # no event loop running:
        loop = asyncio.new_event_loop()
        return loop.run_until_complete(_to_task(future, as_task, loop))
    else:
        nest_asyncio.apply(loop)
        return asyncio.run(_to_task(future, as_task, loop))


def _to_task(future, as_task, loop):
    if not as_task or isinstance(future, asyncio.Task):
        return future
    return loop.create_task(future)

该目标的次要目标是将asyncio.run视为来自JS世界的promise.resolve或.NET世界的Task.Wait


如果nest_asyncio是一个外部库,您应该包括pip安装命令和导入。此外,请指定Task是什么以及应该从哪里导入它。 - Abraham Murciano Benzadon
@AbrahamMurcianoBenzadon。这个任务来自.NET世界。它是作为类比而不是直接应用/导入的东西。 - JBSnorro
1
我在Python代码中的意思是isinstance(future, Task) - Abraham Murciano Benzadon
1
除了asyncio.get_running_loop()在Python 3.6和asyncio 3.4.3中不可用之外,其他都很好。但幸运的是,将其替换为asyncio.get_event_loop()也可以正常工作。 - Doluk

6

我有一种生产可靠的技术可以防止这两个错误:

  • RuntimeError: Timeout context manager should be used inside a task
  • RuntimeError: This event loop is already running

思路是使用一个专用的后台线程来运行事件循环,就像在传统的 UI 设置中一样,但这里不是用于 UI 消息,而是用于 API 消息(即请求)。我们只需在模块导入时启动该线程,或者在任何地方都可以。此外,我们仅调用 asyncio_run 而不是 asyncio.run ,以及 asyncio_gather 而不是 asyncio.gather

你可以参考这个:

import asyncio
import threading
from typing import Awaitable, TypeVar
T = TypeVar("T")

def _start_background_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

_LOOP = asyncio.new_event_loop()
_LOOP_THREAD = threading.Thread(
    target=_start_background_loop, args=(_LOOP,), daemon=True
)
_LOOP_THREAD.start()

def asyncio_run(coro: Awaitable[T], timeout=30) -> T:
    """
    Runs the coroutine in an event loop running on a background thread,
    and blocks the current thread until it returns a result.
    This plays well with gevent, since it can yield on the Future result call.

    :param coro: A coroutine, typically an async method
    :param timeout: How many seconds we should wait for a result before raising an error
    """
    return asyncio.run_coroutine_threadsafe(coro, _LOOP).result(timeout=timeout)


def asyncio_gather(*futures, return_exceptions=False) -> list:
    """
    A version of asyncio.gather that runs on the internal event loop
    """
    async def gather():
        return await asyncio.gather(*futures, return_exceptions=return_exceptions)

    return asyncio.run_coroutine_threadsafe(gather(), loop=_LOOP).result()

你可能有这个更更新的版本吗?显然,在Python 3.10中,asyncio.gatherloop参数已被弃用。我对asyncio的工作原理只有非常基本的了解,但是你在这里的代码对我非常有效。 - Ethan Posner
@EthanPosner 如果我们升级到 3.10,我会进行更新,但在此之前请见谅。即便它被标记为废弃,它仍然很可能可以正常工作。 - JBSnorro
请您发布完整的示例代码吗? - Serhii Kushchenko

2

这个问题有一个非常简单的解决方案,原来discord.py客户端有自己的线程在运行,所以你只需要将它添加到第二个答案提到的协程中即可。因此不需要任何额外的循环。

  1. 移除启动客户端时的事件循环
def startClient():
    # no loops here
    client.run('token')
  1. 为Discord的循环添加内置变量,在/chat/路由中:
elif request.method == 'POST':
    # using discord client's global
    msg = asyncio.run_coroutine_threadsafe(sendMsg(request), client.loop)
    msg.result()

就是这样,希望这能帮到你。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接