在不同的事件循环中同时执行两个任务

3
我正在使用Docker SDK,并尝试在一段时间后超时的赛跑任务与等待Docker容器完成的另一个任务之间进行竞争。实际上,我想知道给定的容器是否在我设置的超时时间内完成。
我有以下代码来实现它(改编自这篇文章):
container =  # ... create container with Docker SDK
timeout =  # ... some int
killed = None

# our tasks
async def __timeout():
  await asyncio.sleep(timeout)
  return True
async def __run():
  container.wait()
  return False

# loop and runner
wait_loop = asyncio.new_event_loop()
done, pending = wait_loop.run_until_complete(
  asyncio.wait({__run(), __timeout()}, return_when=asyncio.FIRST_COMPLETED)
)

# result extraction
for task in done:
  if killed is None:
    killed = task.result()
    # ... do something with result

# clean up
for task in pending:
  task.cancel()
  with contextlib.suppress(asyncio.CancelledError):
    wait_loop.run_until_complete(task)
wait_loop.close()

不幸的是,我一直收到以下错误:

  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/usr/lib/python3.5/asyncio/tasks.py", line 347, in wait
    return (yield from _wait(fs, timeout, return_when, loop))
  File "/usr/lib/python3.5/asyncio/tasks.py", line 430, in _wait
    yield from waiter
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
RuntimeError: Task <Task pending coro=<wait() running at /usr/lib/python3.5/asyncio/tasks.py:347> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:164]> got Future <Future> pending> attached to a different loop

看来我无法与等待任务竞争,因为它属于不同的循环。有没有什么方法可以解决这个错误,以便我可以确定哪个任务先完成?


你是否已经开启了 Debug Mode 以便查看正在进行的操作? - amanb
我没有看到调试模式,只有一个分析器。那对解决像这样的asyncio问题有帮助吗? - nmagerko
文档提到了调试模式。用法:asyncio.run(main(), debug=True)。这很有帮助,尽管分析也很有用。 - amanb
你为什么要创建一个新的事件循环?你是否真的在任何时候都使用两个循环?(你可能不应该这样做。)如果你需要创建新的事件循环,那么尽可能早地创建它,并使用asyncio.set_event_loop(wait_loop)让asyncio意识到它。 - user4815162342
最终,我创建了一个新的事件循环,因为我认为我需要它。我正在尝试将其作为模块的一部分,因此我无法确定使用我的模块的用户是否也会创建事件循环。 - nmagerko
1个回答

2
问题很简单,每个线程中都有一个默认的循环。这是通过asyncio.set_event_loop(loop)设置的。然后你可以通过loop = asyncio.get_event_loop()获取此循环。
所以问题在于,大多数情况下,一些软件包默认使用asyncio.get_event_loop()来获取当前正在运行的循环。以aiohttp为例:
class aiohttp.ClientSession(*, connector=None, loop=None, cookies=None, headers=None, skip_auto_headers=None, auth=None, json_serialize=json.dumps, version=aiohttp.HttpVersion11, cookie_jar=None, read_timeout=None, conn_timeout=None, timeout=sentinel, raise_for_status=False, connector_owner=True, auto_decompress=True, requote_redirect_url=False, trust_env=False, trace_configs=None)

正如您所看到的,它接受一个“loop”参数来指定运行循环。但是,您也可以将其留空以使用默认的“asyncio.get_event_loop()”。
您的问题在于,您正在新创建的循环中启动协程。但是您无法确认所有内部操作是否也正在使用此新创建的循环。因为它们可能会使用“asyncio.get_event_loop()”,从而附加到当前线程中的另一个循环,而这个循环是默认循环。
就我所知,您并不真正需要创建一个新的循环,而是让用户这样做。就像上面的例子一样,您接受一个“loop”参数,如果它是“None”,则使用默认值。
或者您需要仔细检查代码,以确保每个可能的协程都在使用您创建的循环。

有趣的是,使用默认循环确实完全符合我的需求。我想我以为我需要一个新的来分离事物,但事实并非如此。 - nmagerko
@nmagerko 我觉得你可能误解了它的工作方式。你是不是认为你可以并发地运行两个不同的循环?实际上并不是这样的 :( 同一时间只能有一个循环在运行。因此,关于“分离事物”的概念是不可能的。 - Sraw
1
@Sraw 从技术上讲,可以在不同的线程中同时运行单独的循环,但是它们无法进行通信(使用asyncio同步原语),更重要的是,几乎没有尝试这样做的理由。 - user4815162342
1
@user4815162342 您肯定是正确的。但实际上我是指在同一个线程中。我们绝对可以使用线程同时运行多个任务。 - Sraw

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接