我为您编写了一种可能的解决方案,使用multiprocessing对象 Process和Queue。
我测量了其吞吐速度,并且平均需要150毫微秒
来处理几乎什么都不做的一个任务。处理过程只是从任务中获取整数,将其加1并将其发送回去。我认为150微秒的延迟足以让您处理30 FPS。
使用队列而不是管道,因为我认为它更适合多任务处理。而且如果您的时间测量精确,则与管道相比,队列还要快660倍
(150微秒与100毫秒延迟相比)。
您可以注意到,处理循环会批量发送任务,这意味着它首先会向所有进程发送多个任务,然后才会收集所有已发送和已处理的任务。这种批处理使得处理更加平滑,与一次只发送1个任务然后收集少量结果相比。
更好的方法是将任务发送到进程中,然后在单独的轻量级线程中异步收集结果。这将防止您在等待最慢的进程完成任务时被阻塞。
通过向进程发送None
任务来发出结束和退出信号。
在线试用!
def process(idx, in_q, out_q):
while True:
task = in_q.get()
if task is None:
break
out_q.put({'n': task['n'] + 1})
def main():
import multiprocessing, time
queue_size = 1 << 16
procs = []
for i in range(multiprocessing.cpu_count()):
in_q, out_q = [multiprocessing.Queue(queue_size) for j in range(2)]
procs.append({
'in_q': in_q,
'out_q': out_q,
'proc': multiprocessing.Process(target = process,
kwargs = dict(idx = i, in_q = in_q, out_q = out_q)),
})
procs[-1]['proc'].start()
num_blocks = 1 << 2
block = 1 << 10
assert block <= queue_size
tb = time.time()
for k in range(num_blocks):
for i in range(block):
for j, proc in enumerate(procs):
proc['in_q'].put({'n': k * block * len(procs) + i * len(procs) + j})
for i in range(block):
for proc in procs:
proc['out_q'].get()
print('Processing speed:', round((time.time() - tb) /
(num_blocks * block * len(procs)) * 1_000_000, 1), 'mcs per task')
for proc in procs:
proc['in_q'].put(None)
for proc in procs:
proc['proc'].join()
if __name__ == '__main__':
main()
输出:
Processing speed: 150.7 mcs per task
还测试了每次只发送一个任务(而不是一次发送1000个任务)给所有进程并一次接收一个任务的时间。在这种情况下,延迟为
460 mcs
(微秒)。因此,可以将其视为使用Queue的最坏情况下的纯延迟为460 mcs(460 mcs包括发送和接收)。
我已经采用了您的示例片段并对其进行了修改,改用Queue而不是Pipe,并获得了
0.1 ms
的延迟。
请注意,我在循环中执行此操作5次,因为第一次或第二次尝试会初始化一些与Queue相关的内容。
在线尝试!
import multiprocessing as mp
import time
def proc(inp_q, out_q):
for i in range(5):
e = inp_q.get()
ts = float(time.time_ns())
out_q.put(ts)
if __name__ == "__main__":
inp_q, out_q = [mp.Queue(1 << 10) for i in range(2)]
p1 = mp.Process(target=proc, args=(inp_q, out_q))
p1.start()
for i in range(5):
ts = float(time.time_ns())
inp_q.put("START")
ts_end = out_q.get()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
p1.join()
输出:
Time taken in ms: 2.181632
Time taken in ms: 0.14336
Time taken in ms: 0.09856
Time taken in ms: 0.156928
Time taken in ms: 0.108032
同时在循环中运行您的示例会使第二次和其他发送/接收迭代比第一次更快。第一次非常慢,因为资源的惰性初始化。大多数算法都是惰性初始化的,这意味着它们仅在第一次调用时分配所有所需的资源。这是为了防止在根本不使用算法时进行不必要的分配。另一方面,这使得第一次调用变得更加缓慢,因此您需要进行几次空调用来预热惰性算法。试一下在线版本:Python 3.8(预发布)- 在线尝试!
import multiprocessing as mp
import time
def proc(child_conn):
for i in range(5):
child_conn.recv()
ts = time.time_ns()
child_conn.send(ts)
if __name__ == "__main__":
parent_conn, child_conn = mp.Pipe()
p1 = mp.Process(target=proc, args=(child_conn,))
p1.start()
for i in range(5):
ts = time.time_ns()
parent_conn.send("START")
ts_end = parent_conn.recv()
print(f"Time taken in ms: {(ts_end - ts)/(10**6)}")
输出:
Time taken in ms: 2.693857
Time taken in ms: 0.072593
Time taken in ms: 0.038733
Time taken in ms: 0.039086
Time taken in ms: 0.037021