异步IO:仅当所有其他任务都在等待时才运行任务

10

我目前正在使用asyncio.wait执行一些无限的任务

当所有其他任务处于await状态时,我需要运行一个特殊的函数

import asyncio 

async def special_function():
    while True:
        # does some work, 
        # Passes control back to controller to run main_tasks
        # if they are no longer waiting.
        await asyncio.sleep(0)

async def handler():
    tasks = [task() for task in main_tasks]

    # Adding the task that I want to run when all main_tasks are awaiting:
    tasks.append(special_function())

    await asyncio.wait(tasks)

asyncio.get_event_loop().run_until_complete(handler())
我如何才能使 special_function 只在所有 main_tasks 都在 await 时运行?
修改: 我的意思是“所有的main_tasks都没有准备好继续执行,例如处于asyncio.sleep(100)或I/O绑定状态,仍在等待数据。”
因此,在任务处于这种状态时,main_tasks无法继续执行,事件循环会运行special_function,而不是每次事件循环迭代。
用例: main_tasks正在使用来自Web套接字的新数据更新数据结构。 special_function会在收到来自该进程的更新信号时将数据传输到另一个进程(具有共享变量和数据结构的multiprocessing)。 它需要在传输时是最新的数据,不能有待处理的主任务更新。 这就是为什么我只想在没有可处理的新数据的主任务时运行special_function

special_function 的语法不正确,而且在 asyncio.sleep(0) 前面也缺少了一个 await。一旦你修复了这个问题,它就会在事件循环的每次迭代中运行“做一些工作”的部分。由于 asyncio 是单线程的,所以当它运行时,所有其他任务都在等待某些东西。你不需要做任何特殊的安排来实现这一点,这就是 asyncio 的工作原理 - 如果一个任务没有等待任何东西,那么它就意味着它正在运行(或已经完成)。 - user4815162342
@user4815162342 感谢您发现错误!已经修复。我还在上面的描述中添加了一些内容,以更好地解释目标,我不想在事件循环的每次迭代中运行该函数,只有当所有主任务都在等待并且当前时刻无法继续时才运行。谢谢! - Zak Stucke
我的建议是:这似乎是一个并发问题,应该使用线程来解决,而不是asyncio。 - Damián Montenegro
让我更好地解释一下我的观点,您可以使用优先锁(https://dev59.com/BlkS5IYBdhLWcg3w8am-),这样主要任务就具有优先级,因此只有在其他人没有等待锁时,特殊功能才会运行。 - Damián Montenegro
5个回答

7

我试图为“任务未准备好运行”的条件编写一个测试。我认为asyncio没有暴露调度程序的详细信息。开发人员明确表示,他们希望保持更改asyncio内部结构而不破坏向后兼容性的自由。

asyncio.Task中有这样一条注释(注意:_step()将任务协程运行到下一个await):

# An important invariant maintained while a Task not done:
#   
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.

当然,那个内部变量不在API中。

你可以通过阅读repr(task)的输出来获取对_fut_waiter的有限访问权限,但是这种格式似乎也不可靠,因此我不会依赖这样的东西:

PENDINGMSG = 'wait_for=<Future pending '

if all(PENDINGMSG in repr(t) for t in monitored_tasks):
    do_something()

无论如何,我认为你试图追求过于完美。你想知道其他任务中是否有新数据。如果数据在asyncio缓冲区、内核缓冲区或网络卡接收缓冲区中怎么办?…… 如果下一毫秒有新数据到达,你永远也不会知道。

我的建议是:将所有更新写入单个队列。仅检查该队列作为更新的唯一来源。如果队列为空,则发布上一个状态。


看了一眼,这似乎是有效的,但正如你自己所说,依赖repr并不理想。我会用它来继续寻找更具体的解决方案!谢谢!我也明白你的观点,数据可能也存在于其他缓冲区位置,我的目标可能过于完美,你说得对! - Zak Stucke
这个答案的最后一段可能是使用asyncio解决此类问题的预期方式。 - user4815162342

2
这是我的建议:
  1. 我不会使用你的特殊函数。
  2. 每个数据更新都需要一个单独的生成ID(4字节整数),我只会将ID放入共享内存中。
两个进程是独立运行的,我假设。
  1. 订阅者将生成ID保留为本地变量。当它注意到共享内存中的生成ID已更改时,它就从文件中读取新数据。
  2. 数据存储在tmpfs(/tmp)上,因此在内存中。如果适合的话,您可以创建自己的tmpfs。速度很快。
这是为什么:
  • 为了确保订阅者不会获取到未完成的共享内存数据,必须使用信号量进行保护。这很麻烦。
  • 通过使用文件,您可以携带可变大小的数据。这可能不适用于您。在使用共享内存时要解决的难题之一是具有足够的空间但不浪费空间。使用文件可以解决此问题。
  • 通过使用4字节int生成ID,更新ID是原子的。这是一个巨大的优势。
因此,当您的任务接收到新数据时,打开文件,写入数据,并在关闭文件描述符后将生成ID写入共享内存。在更新生成ID之前,您可以安全地删除文件。如果订阅者已经打开了文件,则会完成读取文件,如果它尝试打开文件,则无法打开,因此它必须等待下一代。如果机器崩溃,/tmp就消失了,因此您不需要担心清理文件。您甚至可以编写一个新任务,其独立工作是删除/tmp中旧的生成ID文件。

虽然这并没有直接解决我的问题,但这是另一种非常有趣的方法,我一定会深入研究!我确实有静态大小共享数据的问题,所以这将解决这个问题。谢谢! - Zak Stucke
我在工作中使用共享内存有很多经验,而在共享内存中携带有效载荷是需要避免的。一旦使用信号量来保证内容的正确性,作为IPC管理信号量并不容易。有时,它会被锁定,你最终不得不重新启动机器来清理它。确保这个方法工作的唯一途径是拥有多个有效载荷的副本,以便在写入时不访问新的有效载荷。如果不是更多,它会增加复杂性和共享内存大小至少2倍。 - Naoyuki Tai

1
当事件循环运行某个任务时,该任务将一直被执行,直到将控制权返回给事件循环。任务通常只有一个原因要将控制权返回给事件循环:任务面临阻塞操作(因此“无法继续”)。
这意味着“每次事件循环迭代”通常等于“所有main_tasks都处于await状态”。您已经拥有的代码将(大多数情况下)按照您想要的方式工作。您唯一需要做的是使special_function()成为任务。

有些情况下,任务在遇到“真正”的阻塞调用之前已经将控制权返回给事件循环,通常会出现 await asyncio.sleep(0) 这样的代码(就像您在 special_function 中所做的一样)。这意味着任务希望确保在继续执行之前调用所有其他任务:您可能希望尊重这一点。


嗨,Mikhail,感谢您的回答!不幸的是,这并没有解决问题。如果任务被阻塞并返回到控制器,但在事件循环到达特殊函数之前已准备好继续执行,则我需要它不运行特殊函数,而是继续执行“main_tasks”。我已更新我的问题,并提供了此功能的用例。 - Zak Stucke

0

将输入和输出请求都推送到一个优先队列中,其中输入优先于输出。然后正常地从队列中处理任务,它将始终在任何输出请求之前满足所有未完成的输入请求。

因此,您的主循环将类似于以下内容:

  • InputListener1(将接收到的每个InputTask1排队,优先级为0)
  • InputListener2(将接收到的每个InputTask2排队,优先级为0)
  • InputListener3(将接收到的每个InputTask3排队,优先级为0)
  • OutputListener(将接收到的每个OutputTask排队,优先级为1)
  • QueueWorker(从队列中处理下一个任务)

这可能意味着您需要将所有现有任务的逻辑拆分为单独的套接字侦听器和实际任务处理,但这并不一定是坏事。


这对我的特定情况不起作用,因为即使InputListener准备好向队列添加另一个任务,它仍然可以在asyncio事件循环中更接近QueueWorker而运行! - Zak Stucke
然后,您必须将所有内容放入一个超级任务中,其中包含特殊功能,该功能在任何子任务实际执行操作时都会中断。但是,无法保证在发出输出之前未请求更新,这只是时间上的不同而已。 - Nick Hilt

0

为什么不使用信号量呢?

async def do_stuff(semaphore):
    async with semaphore:
      await getting_stuff_done()

async def wait_til_everyone_is_busy(semaphore):
    while not semaphore.locked():
      await asyncio.sleep(1)
    do_other_stuff()

为了更好地阐述我的观点,举个简单的例子:
import asyncio
import time

async def process(semaphore, i):
    while True:
        print(f"{i} I'm gonna await")
        await asyncio.sleep(1)
        async with semaphore:
            print(f'{i} sleeping')
            await asyncio.sleep(3)
        print(f'{i} done sleeping')
        print(f"{i} I'm gonna await again")
        await asyncio.sleep(1)

async def other_process(semaphore):
    while True:
        while not semaphore.locked():
            print("Everyone is awaiting... but I'm not startingr")
            await asyncio.sleep(1)
        print("Everyone is busy, let's do this!")
        time.sleep(5)
        print('5 seconds are up, let everyone else play again')
        await asyncio.sleep(1)

semaphore = asyncio.Semaphore(10)
dataset = [i for i in range(10)]
loop = asyncio.new_event_loop()
tasks = [loop.create_task(process(semaphore, i)) for i in dataset]
tasks.append(loop.create_task(other_process(semaphore)))
loop.run_until_complete(asyncio.wait(tasks))


我们创建了10个任务,使用“process”函数,并且有一个使用“other_process”的任务。执行“other_process”的任务只能在所有其他任务都持有信号量时运行,由于Asyncio上下文切换的工作方式,只有在其他任务处于等待状态时才会执行“other_process”函数,直到“other_process”达到自己的“await”。
$ python3 tmp
0 I'm gonna await
1 I'm gonna await
2 I'm gonna await
3 I'm gonna await
4 I'm gonna await
5 I'm gonna await
6 I'm gonna await
7 I'm gonna await
8 I'm gonna await
9 I'm gonna await
Everyone is awaiting... but I'm not startingr
0 sleeping
1 sleeping
2 sleeping
3 sleeping
4 sleeping
5 sleeping
6 sleeping
7 sleeping
8 sleeping
9 sleeping
Everyone is busy, let's do this!
5 seconds are up, let everyone else play again
0 done sleeping
0 I'm gonna await again
1 done sleeping
1 I'm gonna await again
2 done sleeping
2 I'm gonna await again
3 done sleeping
3 I'm gonna await again
4 done sleeping
4 I'm gonna await again
5 done sleeping
5 I'm gonna await again
6 done sleeping
6 I'm gonna await again
7 done sleeping
7 I'm gonna await again
8 done sleeping
8 I'm gonna await again
9 done sleeping
9 I'm gonna await again
Everyone is awaiting... but I'm not startingr
0 I'm gonna await
1 I'm gonna await
2 I'm gonna await
3 I'm gonna await
4 I'm gonna await
5 I'm gonna await
6 I'm gonna await
7 I'm gonna await
8 I'm gonna await
9 I'm gonna await
Everyone is awaiting... but I'm not startingr
0 sleeping
1 sleeping
2 sleeping
3 sleeping
4 sleeping
5 sleeping
6 sleeping
7 sleeping
8 sleeping
9 sleeping
Everyone is busy, let's do this!

嗨,杰森,感谢你的答案!经过一些信号量研究,它们似乎主要目的是防止过度使用资源,这与我的问题有些不同。由于我的函数在同一个事件循环中工作,我可以相信所有函数都将并发运行(不是同时而是一个接一个地运行)。我的主要问题是仅在所有函数处于“等待”状态但尚未准备好继续时才运行special_task,也就是说,它们仍然需要等待更长时间才能继续(例如,在长时间的asyncio.sleep()或I/O绑定中,这是我的情况)。 - Zak Stucke
当然,信号量的“主要”目的是如此,但它确实为您的问题提供了解决方案。 基本上,只有在所有上下文处于等待状态时,信号量才会被“锁定”,因此,线程只会在所有上下文都处于“等待”状态时运行。 当然,您需要设置信号量,以确保具有与工作线程一样数量的锁(除了正在等待其他所有人忙碌的那个线程)。 - Jason Shaffner
我认为你忽略了我问题的一个关键部分。我不仅需要它们处于await状态,这是由于asyncio事件循环的工作方式自动完成的。我需要所有的主任务都无法继续,因为等待的任务尚未完成,例如asyncio.sleep(100)还没有到时间或外部数据尚未接收。当函数开始await时,sephamore将立即锁定,并在函数从await中移开时解锁,而不是在函数准备继续时解锁(即,睡眠期结束/数据接收等)。 - Zak Stucke
是的,它们都处于等待状态,但是,取决于asyncio在事件循环中的位置,它可能会运行您的other_process,即使您的一个process工作程序已经到达了asyncio.sleep()的末尾并且可以在运行other_process之前继续,但由于other_process在asyncio事件循环的迭代中更接近,因此它不会被运行,这就是我想解决的问题!在您的答案中,您正在解决一个略有不同的问题。这有意义吗? - Zak Stucke
哈,我正在重新阅读我的答案,现在我明白你的意思了。我回答中的第一句话有误,我的意思是:在你知道有长时间等待或有意义的情况下使用信号量,而不是在每个上下文切换时都使用。我认为没有办法看到一个可等待对象是否“可以继续”,除非你再次运行该特定上下文... - Jason Shaffner
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接