Python的多进程管道非常慢 (>100ms)。

4

我正在使用Python 3.x编写一款图像处理程序,需要实时(30 FPS)处理帧并具有低延迟(<60ms)。我有1个父进程读取帧并通过SharedMemory对象将它们发送到多个子进程。子进程执行的计算是CPU绑定的,并且在单个核心上无法以30 FPS运行所有进程。但由于它们彼此独立工作,所以我决定将它们作为单独的进程运行。

目前,我正在使用管道将命令发送到子进程,特别是在每次更新帧时通知它们。通过测量父进程的send()命令和子进程的recv()命令之间的时间,延迟始终大于100ms。我使用了time.time_ns()。

这是一个问题,因为输出信息总是会滞后于>100ms +所有子进程完成处理所需的时间(另外还要加上所有send()函数之间的延迟20-30ms)。

该应用程序旨在用于现场体育赛事转播,因此不能引入如此高的延迟。因此,我有两个问题:

  1. 在Python中,管道真的如此慢吗?还是我的实现有问题。(注意:我已在Intel i5第9代和Apple M1上测试了延迟)

  2. 如果管道确实如此缓慢,我有其他Python选项吗?除了诉诸某种形式的套接字?

谢谢。

编辑:

我在这里添加了我用于测试管道延迟的代码。

import multiprocessing as mp
import time

def proc(child_conn):
    
    child_conn.recv()
    ts = time.time_ns()
    child_conn.send(ts)
    child_conn.close()

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    ts = time.time_ns()
    parent_conn.send("START")
    ts_end = parent_conn.recv()

    print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")

1
你有检查过操作系统的原始管道速度吗?请注意,在进程之间发送数据将对其进行pickle和unpickle,这可能实际上是瓶颈。使用更简单的数据可以加快速度。 - MisterMiyagi
等等,你有一个低延迟的应用程序,而你决定使用Python?你在开玩笑吗? - Mad Physicist
@MisterMiyagi,我已经尝试通过管道发送字节,但没有成功。我会研究原始管道,谢谢! - Abhishek Satish
@anon01,我已经添加了我用来测试的代码。 - Abhishek Satish
1
@AbhishekSatish 您添加的测试代码显示了“1.2毫秒”的延迟,请在此处查看 在线示例(向下滚动网页以查看控制台输出)。 - Arty
显示剩余2条评论
2个回答

4

我为您编写了一种可能的解决方案,使用multiprocessing对象 ProcessQueue

我测量了其吞吐速度,并且平均需要150毫微秒来处理几乎什么都不做的一个任务。处理过程只是从任务中获取整数,将其加1并将其发送回去。我认为150微秒的延迟足以让您处理30 FPS。

使用队列而不是管道,因为我认为它更适合多任务处理。而且如果您的时间测量精确,则与管道相比,队列还要快660倍(150微秒与100毫秒延迟相比)。

您可以注意到,处理循环会批量发送任务,这意味着它首先会向所有进程发送多个任务,然后才会收集所有已发送和已处理的任务。这种批处理使得处理更加平滑,与一次只发送1个任务然后收集少量结果相比。

更好的方法是将任务发送到进程中,然后在单独的轻量级线程中异步收集结果。这将防止您在等待最慢的进程完成任务时被阻塞。

通过向进程发送None任务来发出结束和退出信号。

在线试用!

def process(idx, in_q, out_q):
    while True:
        task = in_q.get()
        if task is None:
            break
        out_q.put({'n': task['n'] + 1})

def main():
    import multiprocessing, time

    queue_size = 1 << 16
    procs = []
    for i in range(multiprocessing.cpu_count()):
        in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
        procs.append({
            'in_q': in_q,
            'out_q': out_q,
            'proc': multiprocessing.Process(target = process,
                kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
        })
        procs[-1]['proc'].start()

    num_blocks = 1 << 2
    block = 1 << 10
    assert block <= queue_size

    tb = time.time()
    for k in range(num_blocks):
        # Send tasks
        for i in range(block):
            for j, proc in enumerate(procs):
                proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
        # Receive tasks results
        for i in range(block):
            for proc in procs:
                proc['out_q'].get()
    print('Processing speed:', round((time.time() - tb) /
        (num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
    
    # Send finish signals to processes
    for proc in procs:
        proc['in_q'].put(None)
    # Join processes (wait for exit)
    for proc in procs:
        proc['proc'].join()

if __name__ == '__main__':
    main()

输出:

Processing speed: 150.7 mcs per task

还测试了每次只发送一个任务(而不是一次发送1000个任务)给所有进程并一次接收一个任务的时间。在这种情况下,延迟为 460 mcs (微秒)。因此,可以将其视为使用Queue的最坏情况下的纯延迟为460 mcs(460 mcs包括发送和接收)。
我已经采用了您的示例片段并对其进行了修改,改用Queue而不是Pipe,并获得了 0.1 ms 的延迟。
请注意,我在循环中执行此操作5次,因为第一次或第二次尝试会初始化一些与Queue相关的内容。 在线尝试!
import multiprocessing as mp
import time

def proc(inp_q, out_q):
    for i in range(5):
        e = inp_q.get()
        ts = float(time.time_ns())
        out_q.put(ts)

if __name__ == "__main__":

    inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
    p1 = mp.Process(target=proc, args=(inp_q, out_q))
    p1.start()

    for i in range(5):
        ts = float(time.time_ns())
        inp_q.put("START")
        ts_end = out_q.get()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
    p1.join()

输出:

Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032

同时在循环中运行您的示例会使第二次和其他发送/接收迭代比第一次更快。第一次非常慢,因为资源的惰性初始化。大多数算法都是惰性初始化的,这意味着它们仅在第一次调用时分配所有所需的资源。这是为了防止在根本不使用算法时进行不必要的分配。另一方面,这使得第一次调用变得更加缓慢,因此您需要进行几次空调用来预热惰性算法。试一下在线版本:Python 3.8(预发布)- 在线尝试!
import multiprocessing as mp
import time

def proc(child_conn):
    for i in range(5):
        child_conn.recv()
        ts = time.time_ns()
        child_conn.send(ts)

if __name__ == "__main__":

    parent_conn, child_conn = mp.Pipe()
    p1 = mp.Process(target=proc, args=(child_conn,))
    p1.start()

    for i in range(5):
        ts = time.time_ns()
        parent_conn.send("START")
        ts_end = parent_conn.recv()

        print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")

输出:

Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021

1
嘿,感谢你的回答!然而,这里的问题是你的延迟被你正在处理的大量块所隐藏。你真正的延迟应该是发送和接收第一个块结果所需的时间。在我的情况下,每33毫秒才到达一帧,因此延迟始终相同,即管道从父进程发送消息到子进程所需的时间加上处理时间。 - Abhishek Satish
@AbhishekSatish 我刚刚把block设为1,现在处理速度是460微秒。在这种情况下,我一次向所有进程发送1帧,并且每次等待1帧。因此,如果只发送1帧,则460微秒是真实速度。因此,使用Queue发送和接收的延迟为460微秒。 - Arty
该死。我现在在M1上使用Mac OS Monterey,延迟为30毫秒。但是在i5的Windows机器上(Windows是目标操作系统),延迟高达160毫秒。这是操作系统相关问题吗? - Abhishek Satish
@AbhishekSatish 看看我的更新答案,我添加了第二部分的答案,将你的代码改为使用队列。它显示 0.1 毫秒。我的第二部分代码在这里,看一下。 - Arty
1
@AbhishekSatish 修改了你的示例,使其在循环中执行多个send/recv操作,第二个send显示“0.05毫秒”,请在此处查看代码。您必须发送多次,因为第一次发送会执行一些与套接字相关的重型初始化操作。 - Arty
显示剩余6条评论

4
下面的程序通过管道发送一个简单的对象1百万次,然后测量总共经过的时间(秒)和平均发送时间(毫秒)。我在一台相当老旧的Windows桌面电脑上运行,其CPU型号为Intel(R) Core(TM) i7-4790 @ 3.60 GHz。
from multiprocessing import Pipe, Process
import time

class Message:
    def __init__(self, text):
        self.text = text

N = 1_000_000

def worker(recv_connection):
    for _ in range(N):
        msg = recv_connection.recv()

def main():
    recv_connection, send_connection = Pipe(duplex=False)
    p = Process(target=worker, args=(recv_connection,))
    p.start()
    msg = Message('dummy')
    start_time = time.time_ns()
    for _ in range(N):
        send_connection.send(msg)
    p.join()
    elapsed = time.time_ns() - start_time
    print(f'Total elapsed time: {elapsed / 1_000_000_000} seconds')
    print(f'Average send time: {elapsed / (1_000_000 * N)}ms.')

if __name__ == '__main__':
    main()

输出:

Total elapsed time: 10.7369966 seconds
Average send time: 0.0107369966ms.
这比你现在实现的速度快了10,000倍(100毫秒),所以我只能得出结论,你发送到管道中的对象复杂度很高。 更新 你确实想使用多进程,但我建议使用一个多进程池,具体来说是与imap方法一起使用的multiprocessing.pool.Pool实例。这将允许您拥有一个生成器函数,它会产生下一个要处理的帧,并将其提交到池中进行处理,并在可用时将处理后的帧返回到主进程中,并按照提交的帧的顺序返回。以下概述了基本思想:
from multiprocessing import Pool, cpu_count
import time

def process_frame(frame):
    # return processed frame
    time.sleep(.1)
    return frame.upper()

def generate_frames_for_processing():
    for i in range(100):
        time.sleep(.033)
        yield f'msg{i}'

def main():
    # Leave a processor for the main process:
    pool = Pool(cpu_count() - 1)
    start_time = time.time()
    # get processed results as they are returned in order of being processed:
    for processed_frame in pool.imap(process_frame, generate_frames_for_processing()):
        # Do something with returned processed frame
        # These will be in the same order as the frames are submitted
        ...
        print(processed_frame)
    pool.close()
    pool.join()
    print('Elapsed:', time.time() - start_time)

if __name__ == '__main__':
    main()

打印:

MSG0
MSG1
MSG2
...
MSG97
MSG98
MSG99
Elapsed: 3.467884302139282

你可以在 imap 调用时指定 chunksize 参数,但你可能不希望这样做。有关详细信息,请参阅文档。

嗨!感谢您的回答!然而,在您的解决方案中,由于一次处理了大批量的数据,因此延迟被隐藏了起来。在我的情况下,我每33毫秒只接收一帧,因此批处理的可能性非常小。我的单个发送需要具有低延迟。 - Abhishek Satish
这与“批处理”无关。您的 generate_frames_for_processing 将是完全不同的实现,每 33 毫秒发出一个 yield。有一个底层的 Queue 实现了 imap 调用。每次生成器执行 yield,另一个任务就会排队到池中,并调用 process_frame,主进程将获得另一个已处理的帧。(更多...) - Booboo
Arty发布的解决方案无法控制输出队列中项目的顺序,因此每个帧都必须标记一个数字,并且已处理帧的输出队列也要标记数字。在所有帧都被接收之前,您不能对已处理的帧进行任何操作。可能您需要使用帧编号构建字典,或者如果帧大小固定,则可以将它们作为固定偏移量写入文件。但是整个已处理帧必须在内存中。 - Booboo
我已更新演示程序,使其更加明确。 - Booboo
非常感谢!我认为你的方法是一个很好的选择,如果我通过管道/进程发送帧并需要它们返回。然而,由于我正在使用共享内存对象,然后通过管道发送命令以接收某些字节的信息,所以我将无法在这部分使用此解决方案。不过,我一定会借鉴你的示例来改进我的池利用方式!非常感谢你抽出时间给出如此详细的答案! - Abhishek Satish

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接