现在开始异步任务,稍后等待。

15

一位C#程序员试图学习一些Python。我正在尝试在运行CPU密集计算的同时让一个IO绑定的异步方法在后台默默运转。在C#中,我通常会设置可等待对象,然后启动CPU密集代码,然后等待IO任务完成,最后组合结果。

以下是我在C#中的做法:

static async Task DoStuff() {
    var ioBoundTask = DoIoBoundWorkAsync();
    int cpuBoundResult = DoCpuIntensizeCalc();
    int ioBoundResult = await ioBoundTask.ConfigureAwait(false);

    Console.WriteLine($"The result is {cpuBoundResult + ioBoundResult}");
}

static async Task<int> DoIoBoundWorkAsync() {
    Console.WriteLine("Make API call...");
    await Task.Delay(2500).ConfigureAwait(false); // non-blocking async call
    Console.WriteLine("Data back.");
    return 1;
}

static int DoCpuIntensizeCalc() {
    Console.WriteLine("Do smart calc...");
    Thread.Sleep(2000);  // blocking call. e.g. a spinning loop
    Console.WriteLine("Calc finished.");
    return 2;
}

以下是相应的Python代码

import time
import asyncio

async def do_stuff():
    ioBoundTask = do_iobound_work_async()
    cpuBoundResult = do_cpu_intensive_calc()
    ioBoundResult = await ioBoundTask
    print(f"The result is {cpuBoundResult + ioBoundResult}")

async def do_iobound_work_async(): 
    print("Make API call...")
    await asyncio.sleep(2.5)  # non-blocking async call
    print("Data back.")
    return 1

def do_cpu_intensive_calc():
    print("Do smart calc...")
    time.sleep(2)  # blocking call. e.g. a spinning loop
    print("Calc finished.")
    return 2

await do_stuff()

请注意,CPU密集型任务由阻塞式的睡眠表示,无法等待完成,而IO绑定型任务则由可等待的非阻塞式睡眠表示。

在C#中,这需要2.5秒才能运行,在Python中需要4.5秒。区别在于C#直接运行异步方法,而Python只有在遇到"await"关键字时才开始运行该方法。如下所示的输出证实了这一点。如何实现所需结果?如果可能,请提供适用于Jupyter Notebook的代码。

--- C# ---
Make API call...
Do smart calc...
Calc finished.
Data back.
The result is 3
--- Python ---
Do smart calc...
Calc finished.
Make API call...
Data back.
The result is 3

更新1

受knh190答案的启发,似乎我可以使用asyncio.create_task(...)实现大部分功能。这将实现所需的结果(2.5秒):首先,异步代码开始运行;接着,阻塞的CPU代码同步运行;第三步等待异步代码执行完毕;最后将结果组合在一起。为了让异步调用真正开始运行,我不得不添加await asyncio.sleep(0),这感觉像一个可怕的hack。我们能否在不这样做的情况下启动任务?肯定有更好的方法...

async def do_stuff():
    task = asyncio.create_task(do_iobound_work_async())
    await asyncio.sleep(0)  #   <~~~~~~~~~ This hacky line sets the task running

    cpuBoundResult = do_cpu_intensive_calc()
    ioBoundResult = await task

    print(f"The result is {cpuBoundResult + ioBoundResult}")
5个回答

5

经过进一步研究,似乎这是可能的,但与 C# 中那样简单还有些不同。对于 do_stuff() 的代码如下:

async def do_stuff():
    task = asyncio.create_task(do_iobound_work_async())  # add task to event loop
    await asyncio.sleep(0)                               # return control to loop so task can start
    cpuBoundResult = do_cpu_intensive_calc()             # run blocking code synchronously
    ioBoundResult = await task                           # at last, we can await our async code

    print(f"The result is {cpuBoundResult + ioBoundResult}")

与C#相比,两个区别在于:
  1. 必须使用asyncio.create_task(...)将任务添加到正在运行的事件循环中
  2. 使用await asyncio.sleep(0)暂时将控制权返还给事件循环,以便它可以开始执行任务。
完整的代码示例如下:
import time
import asyncio

async def do_stuff():
    task = asyncio.create_task(do_iobound_work_async())  # add task to event loop
    await asyncio.sleep(0)                               # return control to loop so task can start
    cpuBoundResult = do_cpu_intensive_calc()             # run blocking code synchronously
    ioBoundResult = await task                           # at last, we can await our async code

    print(f"The result is {cpuBoundResult + ioBoundResult}")

async def do_iobound_work_async(): 
    print("Make API call...")
    await asyncio.sleep(2.5)  # non-blocking async call. Hence the use of asyncio
    print("Data back.")
    return 1

def do_cpu_intensive_calc():
    print("Do smart calc...")
    time.sleep(2)  # long blocking code that cannot be awaited. e.g. a spinning loop
    print("Calc finished.")
    return 2

await do_stuff()

我不是很喜欢必须记住要添加额外的await asyncio.sleep(0)才能启动任务。可能有一个可等待的函数,像begin_task(...)自动开始运行任务,这样它就可以在以后被等待。例如下面这样:

async def begin_task(coro):
    """Awaitable function that adds a coroutine to the event loop and sets it running."""
    task = asyncio.create_task(coro)
    await asyncio.sleep(0)
    return task

async def do_stuff():
    io_task = await begin_task(do_iobound_work_async())
    cpuBoundResult = do_cpu_intensive_calc()
    ioBoundResult = await io_task
    print(f"The result is {cpuBoundResult + ioBoundResult}")

4

很遗憾,目前还没有可靠的方法来实现这个功能。当我为Stackless Python's Stacklesslib编写“async”帮助程序时,我就考虑过C#的行为。因为当我第一次接触到那里的"async"关键字时,它是一种在执行阻塞时进行深度优先执行的绝妙方式,在这种情况下,执行将继续在最后调用"async"函数的位置。

这样做之所以聪明,是因为程序可以安排阻塞事务(例如HTTP请求)尽可能早地开始,从而减少延迟。当您启动一个方法并知道它会阻塞等待回复时,您希望它立即开始执行以提高程序的响应速度,然后在您真正需要时等待结果。

上述示例是有效的,但在有多个级别的调用时会失败。async.sleep(0)不保证任何内容:

import asyncio


async def startit(thing):
    t = asyncio.create_task(thing)
    # what we really need to do here is:
    # Insert t into runnable queue, just before asyncio.current_task(), and switch to it.
    # Only, it is not possible since event loops are just about scheduling callbacks
    await asyncio.sleep(0)
    return t


async def fa():
    print('fa start')
    gb = await startit(fb())
    # send off a hTTP request and wait for it
    print ('fa doing blocking thing')
    await asyncio.sleep(0.1)
    print ('fa waiting for gb')
    await gb
    print ('fa stopping')
    return 'a'

async def fb():
    print('fb start')
    # send off another http request and wait for it
    await asyncio.sleep(0.1)
    print('fb stop')
    return 'b'


async def main():
    
    print('main start')
    ga = await startit(fa())
    print("main waiting for a")
    await ga
    print('main done')

asyncio.run(main())

这将输出:

main start
fa start
main waiting for a
fb start
fa doing blocking thing
fb stop
fa waiting for gb
fa stopping
main done

而你想要输出的结果是:

main start
fa start
fb start
fa doing blocking thing
main waiting for a
fb stop
fa waiting for gb
fa stopping
main done

基本上,当fb阻止时,控制权会全部转移到main(),而不是像C#的深度优先模型中那样跳到调用级别fa

无栈Python,在命中使用了@stacklessio.async修饰函数时,会执行以下操作:

  1. 创建一个tasklet future。
  2. 将其插入可运行队列中“在”当前正在运行的tasklet之前。
  3. 切换到它。

当新的tasklet被阻塞时,调度程序将切换到“下一个”tasklet,这将是之前正在运行的tasklet。

不幸的是,在Python的"asyncio"框架中,这不容易实现,因为那里的调度是基于回调而不是任务。


2
我认为你的测试非常易于理解。在 Python 中,awaitasync 的前身是生成器(在 Python 2 中)。Python 只会创建协程,但不会在你明确调用它之前启动它。
因此,如果你想像 C# 一样立即触发协程,你需要将await行移到前面。
async def do_stuff():
    ioBoundTask = do_iobound_work_async() # created a coroutine
    ioBoundResult = await ioBoundTask     # start the coroutine
    cpuBoundResult = do_cpu_intensive_calc()
    print(f"The result is {cpuBoundResult + ioBoundResult}")

"最初的回答"的翻译相当于:

这等同于:

def do_stuff():
    # create a generator based coroutine
    # cannot mix syntax of asyncio
    ioBoundTask = do_iobound_work_async()
    ioBoundResult = yield from ioBoundTask
    # whatever

请参考这篇文章:在实践中,Python 3.3中的新“yield from”语法主要用于什么?


我注意到你的C#和Python并不完全等价。只有Python中的asyncio.Task是并发的:


async def do_cpu_intensive_calc():
    print("Do smart calc...")
    await asyncio.sleep(2)
    print("Calc finished.")
    return 2

# 2.5s
async def do_stuff():
    task1 = asyncio.create_task(do_iobound_work_async())
    task2 = asyncio.create_task(do_cpu_intensive_calc())

    ioBoundResult = await task1
    cpuBoundResult = await task2
    print(f"The result is {cpuBoundResult + ioBoundResult}")

现在执行时间应该是相同的。最初的回答。

https://dev59.com/nVoV5IYBdhLWcg3wXNzj 表明应该使用 asyncio.ensure_future - Dan D.
1
@DanD。它明确表示“create_task”是有利的。而且官方文档也使用了“create_task”。 - knh190
感谢您的回复,但这不是我想做的 - 抱歉。Python和C#都在IO代码中具有非阻塞睡眠和CPU密集调用中的阻塞睡眠。在任何一种语言中,CPU任务都无法等待。睡眠的目的是表示类似于激进嵌套循环的东西。如果可以提高清晰度,请使用max(i%3 for i in range(10000000))之类的内容替换time.sleep(2) - Big AL
@BigAL 阻塞调用自然不能与异步调用一起使用,就我所知。这就是为什么我们除了出名的“ requests”(阻塞库)之外还有异步库[aiohttp](https://github.com/aio-libs/aiohttp) 。 [这篇帖子](https://dev59.com/WVgR5IYBdhLWcg3wp-q5)建议您可以使用executor调用阻塞函数,但是如果您的代码库很复杂,则并发性不能保证。 - knh190

1
现在可以使用“asynkit”库实现这一点。

pip install asynkit

async def foo():
   r = asynkit.eager(get_remote_data(url))
   s = await do_other_stuff()
   return compute(s, await r)

"eager"将启动您的异步方法。如果它立即完成,请返回一个“Future”,否则创建一个“Task”。直接返回给调用者。


0
在我看来,Python处理异步/等待的方式与C#有很大不同。
在Python中,您可以将“任务”视为事件循环中的处理单元,这意味着当调用“asyncio.create_task()”时,主线程会将您的协程包装为一个任务并将其放入事件循环中。但是我们一次只有一个线程,Python不会立即处理事件循环中的任务只有当主线程遇到“await”关键字时,事件循环中的任务才会被执行,它将查看事件循环中的任务并执行该任务。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接