在asyncio中批量处理任务

3

我有一个生成任务(IO绑定的任务)的函数:

最初的回答:

def get_task():
    while True:
        new_task = _get_task()
        if new_task is not None:
            yield new_task
        else:
            sleep(1)

我正在尝试编写一个异步消费者,它将同时处理最多10个任务,当一个任务完成时,它会接受新任务。 我不确定是否应该使用信号量或是否有任何类型的asycio池执行程序?我开始用线程写伪代码:

我正在尝试编写一个在asyncio中每次处理最多10个任务的消费者,并在完成一个任务后立即开始处理下一个任务。我不确定是否应该使用信号量还是使用任何类型的asyncio池执行程序?我已经用线程开始写伪代码:

def run(self)
   while True:
       self.semaphore.acquire() # first acquire, then get task
       t = get_task()
       self.process_task(t)

def process_task(self, task):
   try:
       self.execute_task(task)
       self.mark_as_done(task)
   except:
       self.mark_as_failed(task)
   self.semaphore.release()

请问有人能帮忙吗?我不知道在哪里使用async/await关键字。

最初的回答:


顺便问一下,最多同时运行10个任务的要求是从哪里来的? - Dima Tisnek
现有一个问题,其中包含了一个可能适用于此处的解决方案:https://dev59.com/21YM5IYBdhLWcg3wsxzy#48486557 - Dima Tisnek
3个回答

6

使用 asyncio.Sepmaphore 实现简单的任务限制

async def max10(task_generator):
    semaphore = asyncio.Semaphore(10)

    async def bounded(task):
        async with semaphore:
            return await task

    async for task in task_generator:
        asyncio.ensure_future(bounded(task))

这种解决方案的问题在于任务是贪婪地从生成器中获取的。例如,如果生成器从大型数据库中读取,程序可能会耗尽内存。
除此之外,它是惯用的和良好的行为。
一种解决方案是使用异步生成器协议按需拉取新任务:
async def max10(task_generator):
    tasks = set()
    gen = task_generator.__aiter__()
    try:
        while True:
            while len(tasks) < 10:
                tasks.add(await gen.__anext__())
            _done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    except StopAsyncIteration:
        await asyncio.gather(*tasks)

这可能被认为是次优的,因为它直到有10个任务可用才开始执行任务。
以下是使用工作者模式的简洁而神奇的解决方案:
async def max10(task_generator):
    async def worker():
        async for task in task_generator:
            await task

    await asyncio.gather(*[worker() for i in range(10)])

它依赖于一种有点违反直觉的特性,即能够在同一个异步生成器上拥有多个异步迭代器,在这种情况下,每个生成的项只被一个迭代器看到。
我的直觉告诉我,这些解决方案都无法在cancellation方面正常工作。

1
我喜欢这个答案的教学方法,但最后一段代码可以更简单。由于你有固定数量的工作线程,可以去掉信号量。没有信号量的情况下,工作线程可以使用普通的async for循环。 - user4815162342
谢谢,已编辑。还添加了一个解释为什么那个方法有效 :) - Dima Tisnek
请注意,ensure_future 在3.7中被软弃用,推荐使用更友好的 asyncio.create_task - Mike 'Pomax' Kamermans

1

异步不等于线程。例如,如果您有文件IO绑定的任务,则可以使用aiofiles异步写入它们

async with aiofiles.open('filename', mode='r') as f:
    contents = await f.read()

然后将任务替换为您的任务。如果您只想同时运行10个任务,请等待每10个任务后使用asyncio.gather。
import asyncio

async def task(x):
  await asyncio.sleep(0.5)
  print( x, "is done" )

async def run(loop):
  futs = []
  for x in range(50):
    futs.append( task(x) )

  await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

如果您无法异步编写任务并且需要使用线程,这是使用asyncio的ThreadPoolExecutor的基本示例。请注意,仅在max_workers = 5时才会同时运行5个任务。
import time
from concurrent.futures import ThreadPoolExecutor
import asyncio

def blocking(x):
  time.sleep(1)
  print( x, "is done" )

async def run(loop):
  futs = []
  executor = ThreadPoolExecutor(max_workers=5)
  for x in range(15):
    future = loop.run_in_executor(executor, blocking, x)
    futs.append( future )

  await asyncio.sleep(4)
  res = await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

谢谢回复。问题是我不想按照10个进行,而是在任一任务完成后开始下一个任务。所以一开始我启动了10个任务,当一个任务完成时,我会将下一个任务添加到任务池中。这样,始终会有10个任务同时进行处理。 - Alicja Głowacka
但是你的解决方案中存在的问题是,当你还没有获取到信号量时就获取新任务,这在我的情况下会导致问题。 - Alicja Głowacka

1
正如Dima Tismek所指出的那样,使用信号量来限制并发存在耗尽task_generator太急切的风险,因为在获取任务和将它们提交到事件循环之间没有反向压力。另一个答案也探讨了更好的选择,即不是在生成器产生项后立即生成任务,而是创建一定数量的工作线程以同时使用生成器。
代码可以改进的两个方面:
  • 当任务数量一开始就固定时,不需要使用信号量;
  • 处理生成任务和节流任务的取消。
以下是解决这两个问题的实现:
async def throttle(task_generator, max_tasks):
    it = task_generator.__aiter__()
    cancelled = False
    async def worker():
        async for task in it:
            try:
                await task
            except asyncio.CancelledError:
                # If a generated task is canceled, let its worker
                # proceed with other tasks - except if it's the
                # outer coroutine that is cancelling us.
                if cancelled:
                    raise
            # other exceptions are propagated to the caller
    worker_tasks = [asyncio.create_task(worker())
                    for i in range(max_tasks)]
    try:
        await asyncio.gather(*worker_tasks)
    except:
        # In case of exception in one worker, or in case we're
        # being cancelled, cancel all workers and propagate the
        # exception.
        cancelled = True
        for t in worker_tasks:
            t.cancel()
        raise

一份简单的测试案例:

async def mock_task(num):
    print('running', num)
    await asyncio.sleep(random.uniform(1, 5))
    print('done', num)

async def mock_gen():
    tnum = 0
    while True:
        await asyncio.sleep(.1 * random.random())
        print('generating', tnum)
        yield asyncio.create_task(mock_task(tnum))
        tnum += 1

if __name__ == '__main__':
    asyncio.run(throttle(mock_gen(), 3))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接