如何在不阻塞主线程的情况下“发送并忘记”一个任务?

3
我心目中的想法是创建一个通用的BackgroundTask类,可以在Web服务器或独立脚本中使用,以调度不需要阻塞的任务。
我不想在这里使用任何任务队列(如celery、rabbitmq等),因为我所考虑的任务太小而且运行速度很快。只是想尽可能将它们完成并脱离掉。这是否是异步方法?将它们扔到另一个进程中?
我想出的第一个解决方案是可行的:
# Need ParamSpec to get correct type hints in BackgroundTask init
P = ParamSpec("P")


class BackgroundTask(metaclass=ThreadSafeSingleton):
    """Easy way to create a background task that is not dependent on any webserver internals.

    Usage:
        async def sleep(t):
            time.sleep(t)

        BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
        BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
    """

    background_tasks = set()
    lock = threading.Lock()

    def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
        """Uses singleton instance of BackgroundTask to add a task to the async execution queue.

        Args:
            func (typing.Callable[P, typing.Any]): _description_
        """
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            with self.lock:
                task = asyncio.create_task(self.func(*self.args, **self.kwargs))
                self.background_tasks.add(task)
                print(len(self.background_tasks))
                task.add_done_callback(self.background_tasks.discard)

        # TODO: Create sync task (this will follow a similar pattern)


async def create_background_task(func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
    b = BackgroundTask(func, *args, **kwargs)
    await b()


# Usage:
async def sleep(t):
    time.sleep(t)

await create_background_task(sleep, 5)


我认为这样做可能错过了重点。如果我将此代码与其他异步代码一起运行,那么是的,我会获得性能优势,因为阻塞操作不再阻塞主线程。
我在考虑,也许需要像单独的进程来处理这些后台任务,而完全不会阻塞主线程(上述异步代码仍将在主线程上运行)。
是否有意义创建一个处理后台作业的单独线程?像简单的作业队列,但非常轻量且不需要额外的基础设施?
还是创建类似上面的解决方案更合适?
我看到Starlette做了类似的事情(https://github.com/encode/starlette/blob/decc5279335f105837987505e3e477463a996f3e/starlette/background.py#L15),但他们在返回响应后等待后台任务。
这使得他们的解决方案依赖于Web服务器设计(即在响应后执行操作是可以的)。我想知道是否可以构建更通用的东西,使您可以在脚本或Web服务器中运行后台任务,而不会牺牲性能。
我不太熟悉异步/并发功能,因此不知道如何比较这些解决方案。看起来像是一个有趣的问题!
以下是我尝试在另一个进程上执行任务时想出的内容:

class BackgroundTask(metaclass=ThreadSafeSingleton):
    """Easy way to create a background task that is not dependent on any webserver internals.

    Usage:
        async def sleep(t):
            time.sleep(t)

        BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
        BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
        BackgroundTask(es.transport.close) <- Probably most common use in our codebase
    """

    background_tasks = set()
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
    lock = threading.Lock()

    def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
        """Uses singleton instance of BackgroundTask to add a task to the async execution queue.

        Args:
            func (typing.Callable[P, typing.Any]): _description_
        """
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            with self.lock:
                loop = asyncio.get_running_loop()
                with self.executor as pool:
                    result = await loop.run_in_executor(
                        pool, functools.partial(self.func, *self.args, **self.kwargs))

5个回答

4

你的问题非常抽象,我会试着给所有问题都提供一些通用的答案。

如何“fire and forget”一个任务,而不阻塞主线程?

这取决于你所说的“forget”的含义。

  • 如果你运行后不打算访问该任务,可以在并行进程中运行它。
  • 如果主应用程序需要访问后台任务,则应具有事件驱动架构。在这种情况下,先前称为任务的东西将是服务或微服务。

我不想在这里使用任何任务队列(celery、rabbitmq等),因为我想到的任务太小、太快了,只想尽可能的把它们完成。这样做是异步方法吗?将它们扔到另一个进程中?

如果包含循环或其他 CPU-bound 操作,则使用子进程是正确的。如果任务进行请求(async)、读文件、记录到 stdout 或其他 I/O bound 操作,则使用协程或线程是正确的。

是否有意义拥有一个处理后台作业的单独线程?就像一个简单的作业队列,但非常轻量级并且不需要额外的基础设施?

我们不能仅使用线程,因为它可能会被使用 CPU-bound 操作的另一个任务阻塞。相反,我们可以运行后台进程,并使用管道、队列和事件在进程之间通信。不幸的是,我们不能在进程之间提供复杂的对象,但是我们可以提供基本数据结构来处理在后台运行的任务的状态更改。

关于 StarletteBackgroundTask

Starlette 是一个轻量级的 ASGI 框架/工具包,非常适合在 Python 中构建异步 Web 服务。(README 描述)

它基于并发性。因此,即使这不是所有类型任务的通用解决方案。 注意:并发性与并行性不同。

我想知道是否可以构建一个更通用的东西,在脚本或 Web 服务器中运行后台任务,而不牺牲性能。

上述解决方案建议使用后台进程。但它将取决于应用程序设计,因为必须执行一些操作(发出事件、将指示器添加到队列等),这些操作需要用于通信和同步运行进程(任务)。没有通用工具,但有情况依赖的解决方案。

情况1-任务是异步函数

假设我们有一个 request 函数,它应该在不阻塞其他任务的工作的情况下调用 API。还有一个 sleep 函数,它不应阻止任何事情。

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            try:
                return await response.json()
            except aiohttp.ContentTypeError:
                return await response.read()


async def sleep(t):
    await asyncio.sleep(t)


async def main():
    background_task_1 = asyncio.create_task(request("https://google.com/"))
    background_task_2 = asyncio.create_task(sleep(5))

    ...  # here we can do even CPU-bound operations

    result1 = await background_task_1

    ...  # use the 'result1', etc.

    await background_task_2


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

在这种情况下,我们使用asyncio.create_task来并发地运行协程(就像在后台一样)。当然,我们可以在子进程中运行它,但没有必要这样做,因为这样会消耗更多的资源而不提高性能。
情况二 - 任务是同步函数(I/O绑定)
与第一种情况不同,这里的函数是同步的,但不是CPU密集型(I/O绑定)。这给了我们一个能力,可以在线程中运行它们或将它们异步化(使用asyncio.to_thread),并且可以并发地运行。
import time
import asyncio
import requests


def asynchronous(func):
    """
    This decorator converts a synchronous function to an asynchronous
    
    Usage:
        @asynchronous
        def sleep(t):
            time.sleep(t)
            
        async def main():
            await sleep(5)
    """
    
    async def wrapper(*args, **kwargs):
        await asyncio.to_thread(func, *args, **kwargs)

    return wrapper


@asynchronous
def request(url):
    with requests.Session() as session:
        response = session.get(url)
        try:
            return response.json()
        except requests.JSONDecodeError:
            return response.text


@asynchronous
def sleep(t):
    time.sleep(t)

    
async def main():
    background_task_1 = asyncio.create_task(request("https://google.com/"))
    background_task_2 = asyncio.create_task(sleep(5))
    ...

这里我们使用了一个装饰器将同步(I/O绑定)函数转换为异步函数,并像第一种情况一样使用它们。

情境3 - 任务是同步函数(CPU绑定)

为了在后台并行运行CPU绑定的任务,我们必须使用多进程。为了确保任务完成,我们使用 join 方法。

import time
import multiprocessing


def task():
    for i in range(10):
        time.sleep(0.3)


def main():
    background_task = multiprocessing.Process(target=task)
    background_task.start()

    ...  # do the rest stuff that does not depend on the background task

    background_task.join()  # wait until the background task is done

    ...  # do stuff that depends on the background task


if __name__ == "__main__":
    main()

假设主应用程序依赖于后台任务的某些部分。在这种情况下,我们需要一种事件驱动的设计,因为无法多次调用 join event
import multiprocessing

event = multiprocessing.Event()


def task():
    ...  # synchronous operations

    event.set()  # notify the main function that the first part of the task is done

    ...  # synchronous operations

    event.set()  # notify the main function that the second part of the task is also done

    ...  # synchronous operations


def main():
    background_task = multiprocessing.Process(target=task)
    background_task.start()

    ...  # do the rest stuff that does not depend on the background task

    event.wait()  # wait until the first part of the background task is done

    ...  # do stuff that depends on the first part of the background task

    event.wait()  # wait until the second part of the background task is done

    ...  # do stuff that depends on the second part of the background task

    background_task.join()  # wait until the background task is finally done

    ...  # do stuff that depends on the whole background task


if __name__ == "__main__":
    main()

当我们处理的进程超过两个时(这时无法知道事件从哪个进程中发出),正如您已经注意到的,我们只能提供二进制信息。因此,我们使用管道队列管理器在处理进程间提供非二进制信息。


1
非常感谢您提供如此详细的答案!它证实了我认为正确但不确定的许多事情。我希望我也能再次奖励这个问题,因为我先看到了另一个答案! - Rami Awar

3

我会回答你所问的问题,但我要先说,由于缺乏理解,你可能在问错问题。

在Python标准库中,subprocess可以启动行为类似“发射和忘记”的独立进程。以下是一些示例:

import os, subprocess
subprocess.Popen(['mkdir', 'foo'])
os.popen('touch answer_is_$((1 + 2))')

提供一些具体的例子来说明你想要的这些“小而快速的非阻塞任务”,并附带你希望它们运行的环境会更好。你缺乏一些明显的理解,因为你的某些陈述是相互冲突的。例如,asynciothreading根本不像“fire and forget”那样运作。
此外,没有一个好的方法可以“在任何上下文中后台运行”,因为不同上下文之间的差异很重要,“最好的方式”取决于许多因素。

是的,我想我正在尝试想出一个神奇的适用于所有情况的解决方案。我认为异步任务、将任务放在其他线程和进程上以及单独的作业队列都是类似问题的不同解决方案,但上下文是关键。 - Rami Awar
只是为了澄清一件事,如果我启动一个异步任务与在单独的进程中运行它会发生什么?异步任务将在主线程上执行,对吧?而单独的进程任务将独立执行,但显然会占用更多资源。异步基本上是优化在主线程上运行任务(不并行)对吧? - Rami Awar

2
你所提到的解决方案可以实现你所要求的“fire and forget”,但我听到的隐藏问题是什么是高效/常见的方法。
像计算机科学中的许多问题一样,答案是:这取决于情况。我会尝试解释一下。基本上有两种方法:在单独的线程或进程中执行。
使用线程,你可以获得共享内存访问,并且它们在资源使用方面更轻量级。它还可以比进程上下文切换更好地执行上下文切换。
使用进程,你可以利用更多的CPU,但你失去了共享内存,并且(取决于你运行的进程数量与核心数),你可能会遇到更多的上下文切换。(例如,如果你运行具有2个CPU和8个进程的容器,它们都会争夺CPU时间,可能会发生更多的上下文切换)。
举一个具体的例子,让我们考虑两种情况。如果你的应用程序需要进行CPU密集型任务(如加密、压缩等),你可能会利用更多的CPU获得更好的性能。
另一方面,如果您的任务由于I/O(例如等待网络、从磁盘读取等)而被阻塞,使用线程可能比使用不同进程更快。
另一个需要注意的问题是“任务管理器和任务执行器之间的通信”。如果两者需要通信,则使用线程的性能会更好,因为可以共享内存访问。
总之,最好的方法是进行性能测试,因为它确实取决于您需要完成的工作。我的建议是先尝试较简单的解决方案(线程),获取性能结果的基准,然后尝试使用进程。在多进程环境中运行程序被认为比多线程更复杂。
我的2分钱意见是,在处理性能问题时,您必须有一个基准并与其进行比较,否则您就是瞎猜,浪费时间调整应用程序的错误部分。

1
为了实现“忘记”,您需要将任务绑定到一个实例存在,以便它将根据您已经使用的内容进行init/del,您添加的示例没有任何问题,但是当它太短以至于只适用于特定需求时,它更多的是“迎合那个具体的需求”,而不是通用用例。您可以扩展该示例以使异步执行更容易处理,例如我在示例中采用了“可变”变量,self在实例生命周期内不会改变(但是内容可以更改而不改变self的地址),因此您既可以在存在期间“忘记”任务,又可以通过将self作为参数来提供不同的值。
这个异步实现非常简单,它由一个列表组成,其中添加了所需的任务,并返回“句柄”以获取返回或终止它,每次线程弹出一个调用时,它将被删除或重新添加,这是为了保持FIFO列表,已执行的调用必须被添加为新的调用,而不是旧的调用。
import threading as th
import time

class CTask(object):

    once  = 1
    loop  = 8

    def __init__(self, foo, args, kwargs, type):
        self.__kwargs = kwargs
        self.__args = args
        self.__foo = foo
        self.__ret = None

        self.task_type = type
        self.terminated = False

    def pool_call(self):
        self.__ret = self.__foo(
            *self.__args,
            **self.__kwargs
        )
        if self.task_type == self.once:
            self.terminated = True

    def __call__(self):
        return self.__ret

    def terminate(self):
        self.terminated = True

class Worker_Pool:

    def push_task(self, type=CTask.once):
        def func_wraper(func):
            def varg_wraper(*a, **b):
                ntask = CTask(func, a, b, type)
                self.__lock.acquire(True, -1)
                self.__ref.append(ntask)
                self.__lock.release()
                return ntask
            return varg_wraper
        return func_wraper

    def __init__(self, n_work):
        self.__lock = th.Lock()
        self.__pool = []
        self.__ref = []
        self.alive = True

        for _ in range(n_work):
            self.__pool.append(
                th.Thread(target=self.__run)
            )
        for w in self.__pool:
            w.start()

    def __run(self):
        while self.alive:
            wtask = None
            if self.__lock.acquire(True, 0.1):
                if len(self.__ref) > 0:
                    wtask = self.__ref.pop(0)
                self.__lock.release()

            if wtask is None or wtask.terminated:
                continue

            wtask.pool_call()

            if wtask.task_type == CTask.loop:
                self.__lock.acquire(True, -1)
                self.__ref.append(wtask)
                self.__lock.release()

    def terminate(self):
        self.alive = False
        for w in self.__pool:
            w.join()

bg = Worker_Pool(3)

class Some_App:

    @bg.push_task(CTask.loop)
    def do_task(self):
        print(f'[{self.id}] -> {self.num}')
        time.sleep(0.5)

    @bg.push_task()
    def do_mult(self):
        self.num *= self.mul

    def __init__(self, id, imul):
        self.mul = imul
        self.num = 1
        self.id = id

        self.bg_task = self.do_task()

    def __del__(self):
        # you should make sure that the the 
        # function is no longer in execution
        # before continuing this function
        self.bg_task.terminate()

if __name__ == '__main__':
    inst_a = Some_App(1, 2)
    task_a = inst_a.do_task()
    inst_b = Some_App(5, 6)
    task_b = inst_b.do_task()

    print('')
    inst_a.do_mult()
    inst_b.do_mult()
    time.sleep(0.6)

    print('')
    inst_a.do_mult()
    inst_b.do_mult()
    time.sleep(1.2)

    bg.terminate()

其他答案使用多进程/子进程,对于轻量级任务,我会将其描述为自掘坟墓。子进程使用文本进行通信,而进程使用套接字+pickles,因此不是最佳选择。


1
您可以尝试这样做:

import multiprocessing


class MPPool:
    def __init__(self, num=multiprocessing.cpu_count() - 1):
        self.pool = multiprocessing.Pool(num)

    def __call__(self, f, *args, **kwargs):
        self.pool.apply_async(f, args=args, kwds=kwargs)


def run_and_forget(f, *args, **kwargs):
    if "pool" not in run_and_forget.__dict__:
        run_and_forget.pool = MPPool()

    run_and_forget.pool(f, *args, **kwargs)


if __name__ == '__main__':
    import time

    def test(n):
        time.sleep(n)
        print(f"done {n}")

    for i in range(20):
        run_and_forget(test, i)
        print(f"passed {i}")

    time.sleep(50)
    print("end")

函数run_and_forget可以在任何地方使用(在单个进程内),因为成员pool是类似静态的,因此在第一次调用时定义。

这还没有完全测试,但我提供了一些快速测试代码来查看它的工作原理。首先想到的是,在退出之前清理多进程池会更明智。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接