Python中使用asyncio库实现异步后台任务

3

我希望在后台运行一个重度计算任务而不阻塞IO。问题在于我的主函数不依赖于这个重度任务,需要在执行重度计算任务之前/同时返回值。下面是一个例子:

def main(args):
   transformed_data_list:List[Dict] = translate_request_to_object(args)
   status = insert_data_into_db(transformed_data:Dict)
   if(status)
      # running background task
      asyncio.run(process_background_task(transformed_data_list))

   # Here, I want to return a success response as soon as data inserted into db
   return "data insert into db"

   async process_background_task(transformed_data_list:List[Dict]):
      for data in transformed_data_list:List:
         asyncio.create_task(heavy_computation_task(data))

但是以上代码在 process_background_task 完成之前不会返回响应。

1个回答

5

如何在后台运行任务

asyncio.run 是一个阻塞函数,用于启动事件循环。如果你想要在后台启动 process_background_task,需要使用asyncio.create_task,并让 main 成为异步函数。然后运行 asyncio.run(main(...))

async def main(args):
    transformed_data_list:List[Dict] = translate_request_to_object(args)
    status = insert_data_into_db(transformed_data:Dict)
    if status:
        # running background task
        asyncio.create_task(process_background_task(transformed_data_list))

    # Here, I want to return a success response as soon as data inserted into db
    return "data insert into db"


async process_background_task(transformed_data_list:List[Dict]):
    for data in transformed_data_list:List:
        asyncio.create_task(heavy_computation_task(data))

# Start event loop, execute task and wait until task finish.
asyncio.run(main(...))

如何在事件循环中运行重型任务

如果heavy_computation_task会阻塞事件循环,则需要使用ProcessPoolExecutorloop.run_in_executor

以下是文档中的示例:

import asyncio
import concurrent.futures

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    # Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

在你的情况下:
async def main(args):
    transformed_data_list:List[Dict] = translate_request_to_object(args)
    status = insert_data_into_db(transformed_data:Dict)
    if status:
        # running background task
        asyncio.create_task(process_background_task(transformed_data_list))

    # Here, I want to return a success response as soon as data inserted into db
    return "data insert into db"


async process_background_task(transformed_data_list:List[Dict]):
    loop = asyncio.get_running_loop()

    for data in transformed_data_list:List:
        with concurrent.futures.ProcessPoolExecutor() as pool:
            await loop.run_in_executor(pool, heavy_computation_task, data)

# Start event loop, execute task and wait until task finish.
asyncio.run(main(...))

请问为什么您建议使用 ProcessPoolExecutor 而不是 ThreadPoolExecutor?并且我从哪里调用 asyncio.run(main(...)) 会再次阻塞父函数,对吗? - MathProblem
2
ThreadPoolExecutor只能同时运行一个任务,因此它只适用于IO绑定操作,而不适用于CPU绑定操作。ProcessPoolExecutor可以同时运行多个任务。 - Oleg Utkin
1
asyncio.run(main(...)) 会阻塞它所在的函数。 - Oleg Utkin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接