在Python的多进程包中,队列和管道有哪些基本区别?
在什么情况下应该选择其中之一?何时使用Pipe()
更有优势?何时使用Queue()
更有优势?
截至CY2023年,本答案中描述的技术已经相当过时。如今,您可以使用pebble
、mpire
或concurrent.futures.ProcessPoolExecutor()
...
无论您使用哪种Python并发工具,下面是对原问题的回答仍然有效。
ProcessPoolExector()
不需要Pipe()
或Queue()
来进行任务/结果的通信。
Pipe()
只能有两个端点。Queue()
可以有多个生产者和消费者。Queue()
。Pipe()
要快得多,因为Queue()
是建立在Pipe()
之上的。Pipe()
和Queue()
进行类似测试的计时结果...SimpleQueue()
和JoinableQueue()
的结果。
JoinableQueue()
在调用queue.task_done()
时会计算任务(它甚至不知道具体的任务,只是计算队列中未完成的任务数),以便queue.join()
知道工作已经完成。每个代码都在答案底部...
# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.9.2
$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.14316844940185547 seconds
Sending 100000 numbers to Pipe() took 1.3749017715454102 seconds
Sending 1000000 numbers to Pipe() took 14.252539157867432 seconds
$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.17014789581298828 seconds
Sending 100000 numbers to Queue() took 1.7723784446716309 seconds
Sending 1000000 numbers to Queue() took 17.758610725402832 seconds
$ python multi_simplequeue.py
Sending 10000 numbers to SimpleQueue() took 0.14937686920166016 seconds
Sending 100000 numbers to SimpleQueue() took 1.5389132499694824 seconds
Sending 1000000 numbers to SimpleQueue() took 16.871352910995483 seconds
$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.15144729614257812 seconds
Sending 100000 numbers to JoinableQueue() took 1.567549228668213 seconds
Sending 1000000 numbers to JoinableQueue() took 16.237736225128174 seconds
# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.7.0
(py37_test) [mpenning@mudslide ~]$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.13469791412353516 seconds
Sending 100000 numbers to Pipe() took 1.5587594509124756 seconds
Sending 1000000 numbers to Pipe() took 14.467186689376831 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.1897726058959961 seconds
Sending 100000 numbers to Queue() took 1.7622203826904297 seconds
Sending 1000000 numbers to Queue() took 16.89015531539917 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.2238149642944336 seconds
Sending 100000 numbers to JoinableQueue() took 1.4744081497192383 seconds
Sending 1000000 numbers to JoinableQueue() took 15.264554023742676 seconds
# This is on a ThinkpadT61 running Ubuntu 11.10, and Python 2.7.2
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
起初,这个答案只考虑了一个工作者和一个生产者的表现;这是Pipe()
的基本用例。你的评论需要为多个工作者进程添加不同的测试。虽然这对于常见的Queue()
用例来说是一个有效的观察,但它很容易在完全新的维度上扩大测试矩阵(即添加具有不同数量的工作者进程的测试)。
额外材料2
多进程引入了信息流中的微妙变化,使得除非你知道一些捷径,否则调试变得困难。例如,你可能有一个脚本,在许多条件下通过字典进行索引时工作正常,但在某些输入下偶尔失败。
通常情况下,当整个Python进程崩溃时,我们会得到关于失败的线索;然而,如果多进程函数崩溃,你不会在控制台上得到未经请求的崩溃回溯打印。追踪未知的多进程崩溃是很困难的,因为你不知道是什么导致了进程崩溃。
我发现追踪多进程崩溃信息的最简单方法是将整个多进程函数包装在try
/ except
中,并使用traceback.print_exc()
:
import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'
源代码:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in range(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_simplequeue.py
"""
from multiprocessing import Process, SimpleQueue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = SimpleQueue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to SimpleQueue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
Queue()
的一个附加特性值得注意的是它的馈送线程。 此部分指出:“当进程首次将项放入队列时,将启动馈送线程,该线程将对象从缓冲区传输到管道中。” 无限数量(或最大值)的项目可以插入Queue()
,而不需要任何对queue.put()
的调用阻塞。 这使您可以将多个项目存储在Queue()
中,直到程序准备好处理它们。Pipe()
另一方面,对于已发送到一个连接但尚未从另一个连接接收的项目,它具有有限的存储空间。在使用完该存储空间后,对 connection.send()
的调用将被阻塞,直到有足够的空间写入整个项目为止。这将使执行写操作的线程停滞,直到某个其他线程从管道中读取数据。Connection
对象使您能够访问底层文件描述符。在 *nix 系统上,您可以使用 os.set_blocking()
函数来防止 connection.send()
调用阻塞。然而,如果您尝试发送一个不适合管道文件的单个项目,则会出现问题。最近的 Linux 版本允许您增加文件的大小,但允许的最大大小根据系统配置而异。因此,您永远不应该依赖于 Pipe()
来缓冲数据。对 connection.send
的调用可能会被阻塞,直到数据从某个其他地方的管道中读取为止。put
方法的文档仍然声明它是一个阻塞或失败的方法:“如果可选的参数block为真(默认值),并且timeout为None(默认值),则必要时阻塞,直到有一个空闲插槽为止。如果timeout是一个正数,则最多阻塞timeout秒,并在该时间内没有可用的自由插槽时引发queue.Full异常。”你确定你的答案吗? - AnabQueue
的构造函数中指定了maxsize
参数,那么put
方法将会阻塞。但这是由于队列中的项目数量而不是单个项目的大小所致。 - akindofyoga如果你和我一样在考虑是否在你的线程程序中使用multiprocessing
结构(Pipe
或Queue
)来提高性能,那么我已经改编了Mike Pennington的脚本来与queue.Queue
和queue.SimpleQueue
进行比较:
Sending 10000 numbers to mp.Pipe() took 65.051 ms
Sending 10000 numbers to mp.Queue() took 78.977 ms
Sending 10000 numbers to queue.Queue() took 14.781 ms
Sending 10000 numbers to queue.SimpleQueue() took 0.939 ms
Sending 100000 numbers to mp.Pipe() took 449.564 ms
Sending 100000 numbers to mp.Queue() took 811.938 ms
Sending 100000 numbers to queue.Queue() took 149.387 ms
Sending 100000 numbers to queue.SimpleQueue() took 9.264 ms
Sending 1000000 numbers to mp.Pipe() took 4660.451 ms
Sending 1000000 numbers to mp.Queue() took 8499.743 ms
Sending 1000000 numbers to queue.Queue() took 1490.062 ms
Sending 1000000 numbers to queue.SimpleQueue() took 91.238 ms
Sending 10000000 numbers to mp.Pipe() took 45095.935 ms
Sending 10000000 numbers to mp.Queue() took 84829.042 ms
Sending 10000000 numbers to queue.Queue() took 15179.356 ms
Sending 10000000 numbers to queue.SimpleQueue() took 917.562 ms
毫不意外,如果你只有线程的话,使用queue
包会产生更好的结果。尽管如此,我还是对于queue.SimpleQueue
的性能表现感到惊讶。
"""
pipe_performance.py
"""
import threading as td
import queue
import multiprocessing as mp
import multiprocessing.connection as mp_connection
import time
import typing
def reader_pipe(p_out: mp_connection.Connection) -> None:
while True:
msg = p_out.recv()
if msg=='DONE':
break
def reader_queue(p_queue: "queue.Queue[typing.Union[str, int]]") -> None:
while True:
msg = p_queue.get()
if msg=='DONE':
break
if __name__=='__main__':
for count in [10**4, 10**5, 10**6, 10**7]:
# first: mp.pipe
p_mppipe_out, p_mppipe_in = mp.Pipe()
reader_p = td.Thread(target=reader_pipe, args=((p_mppipe_out),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_mppipe_in.send(ii)
p_mppipe_in.send('DONE')
reader_p.join()
print(f"Sending {count} numbers to mp.Pipe() took {(time.time() - _start)*1e3:.3f} ms")
# second: mp.Queue
p_mpqueue = mp.Queue()
reader_p = td.Thread(target=reader_queue, args=((p_mpqueue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_mpqueue.put(ii)
p_mpqueue.put('DONE')
reader_p.join()
print(f"Sending {count} numbers to mp.Queue() took {(time.time() - _start)*1e3:.3f} ms")
# third: queue.Queue
p_queue = queue.Queue()
reader_p = td.Thread(target=reader_queue, args=((p_queue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_queue.put(ii)
p_queue.put('DONE')
reader_p.join()
print(f"Sending {count} numbers to queue.Queue() took {(time.time() - _start)*1e3:.3f} ms")
# fourth: queue.SimpleQueue
p_squeue = queue.SimpleQueue()
reader_p = td.Thread(target=reader_queue, args=((p_squeue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_squeue.put(ii)
p_squeue.put('DONE')
reader_p.join()
print(f"Sending {count} numbers to queue.SimpleQueue() took {(time.time() - _start)*1e3:.3f} ms")
mp.queues.SimpleQueue
吗?它是否比 mp.Queue
更快?因为我正在尝试优化我创建的多进程程序的性能,而且我只能使用 mp
内部的东西... - pepoluanmultiprocessing.SimpleQueue
。 - pepoluanRuntimeError: Queue objects should only be shared between processes through inheritance
Sending 10000 numbers to Queue() took 0.12702512741088867 seconds
Sending 100000 numbers to Queue() took 0.9972114562988281 seconds
Sending 1000000 numbers to Queue() took 9.9016695022583 seconds
Sending 10000 numbers to manager.Queue() took 1.0181043148040771 seconds
Sending 100000 numbers to manager.Queue() took 10.438829898834229 seconds
Sending 1000000 numbers to manager.Queue() took 102.3624701499939 seconds
"""
manager_multi_queue.py
"""
from multiprocessing import Process, Queue, Manager
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
manager = Manager()
pqueue = manager.Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to manager.Queue() took {1} seconds".format(count, (time.time() - _start)))
新更新:
在我的应用程序中,有多个进程同时向队列写入数据,而一个进程则消费这些结果。事实证明,在这种情况下,这些队列的性能差异非常大。当多个进程同时向标准的multiprocessing.Queue写入数据时,它很容易变得不堪重负,并且读取性能会大幅下降。在这种情况下,有更快速的替代方案可供选择。
在这里,我比较了三种类型的队列的读取性能,这三种队列都是在5个进程不断写入数据的情况下进行测试的,这三种队列分别是:
结果显示,这三种队列之间的性能差异非常大。最快的是使用Pipes的队列,其次是使用Manager创建的队列,最慢的是标准的multiprocessing.Queue。如果您关心队列在写入数据时的读取性能,最好选择使用管道或管理队列。
这是带有图表的新测试的源代码:
源代码:
from __future__ import annotations
"""
queue_comparison_plots.py
"""
import asyncio
import random
from dataclasses import dataclass
from itertools import groupby
from multiprocessing import Process, Queue, Manager
import time
from matplotlib import pyplot as plt
import multiprocessing as mp
class PipeQueue():
pipe_in: mp.connection.Connection
pipe_out: mp.connection.Connection
def __init__(self):
self.pipe_out, self.pipe_in = mp.Pipe(duplex=False)
self.write_lock = mp.Lock()
self.read_lock = mp.Lock()
def get(self):
with self.read_lock:
return self.pipe_out.recv()
def put(self, val):
with self.write_lock:
self.pipe_in.send(val)
@dataclass
class Result():
queue_type: str
data_size_bytes: int
num_writer_processes: int
num_reader_processes: int
msg_read_rate: float
class PerfTracker():
def __init__(self):
self.running = mp.Event()
self.count = mp.Value("i")
self.start_time: float | None = None
self.end_time: float | None = None
@property
def rate(self) -> float:
return (self.count.value)/(self.end_time-self.start_time)
def update(self):
if self.running.is_set():
with self.count.get_lock():
self.count.value += 1
def start(self):
with self.count.get_lock():
self.count.value = 0
self.running.set()
self.start_time = time.time()
def end(self):
self.running.clear()
self.end_time = time.time()
def reader_proc(queue, perf_tracker, num_threads = 1):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(reader_proc_async(queue, perf_tracker, num_threads))
async def reader_proc_async(queue, perf_tracker, num_threads = 1):
async def thread(queue, perf_tracker):
while True:
msg = queue.get()
perf_tracker.update()
futures = []
for i in range(num_threads):
futures.append(thread(queue, perf_tracker))
await asyncio.gather(*futures)
def writer_proc(queue, data_size_bytes: int):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(writer_proc_async(queue, data_size_bytes))
async def writer_proc_async(queue, data_size_bytes: int):
val = random.randbytes(data_size_bytes)
while True:
queue.put(val)
async def main():
num_reader_procs = 1
num_reader_threads = 1
num_writer_procs = 5
test_time = 5
results = []
for queue_type in ["Pipe + locks", "Queue using Manager", "Queue"]:
for data_size_bytes_order_of_magnitude in range(8):
data_size_bytes = 10 ** data_size_bytes_order_of_magnitude
perf_tracker = PerfTracker()
if queue_type == "Queue using Manager":
manager = Manager()
pqueue = manager.Queue()
elif queue_type == "Pipe + locks":
pqueue = PipeQueue()
elif queue_type == "Queue":
pqueue = Queue()
else:
raise NotImplementedError()
reader_ps = []
for i in range(num_reader_procs):
reader_p = Process(target=reader_proc, args=(pqueue, perf_tracker, num_reader_threads))
reader_ps.append(reader_p)
writer_ps = []
for i in range(num_writer_procs):
writer_p = Process(target=writer_proc, args=(pqueue, data_size_bytes))
writer_ps.append(writer_p)
for writer_p in writer_ps:
writer_p.start()
for reader_p in reader_ps:
reader_p.start()
await asyncio.sleep(1)
print("start")
perf_tracker.start()
await asyncio.sleep(test_time)
perf_tracker.end()
print(f"Finished. {queue_type} | {data_size_bytes} | {perf_tracker.rate} msg/sec")
results.append(
Result(
queue_type = queue_type,
data_size_bytes = data_size_bytes,
num_writer_processes = num_writer_procs,
num_reader_processes = num_reader_procs,
msg_read_rate = perf_tracker.rate,
)
)
for writer_p in writer_ps:
writer_p.kill()
for reader_p in reader_ps:
reader_p.kill()
print(results)
fig, ax = plt.subplots()
count = 0
for queue_type, result_iterator in groupby(results, key=lambda result: result.queue_type):
grouped_results = list(result_iterator)
x_coords = [x.data_size_bytes for x in grouped_results]
y_coords = [x.msg_read_rate for x in grouped_results]
ax.plot(x_coords, y_coords, label=f"{queue_type}")
count += 1
ax.set_title(f"Queue read performance comparison while writing continuously", fontsize=11)
ax.legend(loc='upper right', fontsize=10)
ax.set_yscale("log")
ax.set_xscale("log")
ax.set_xlabel("Message size (bytes)")
ax.set_ylabel("Message throughput (messages/second)")
plt.show()
if __name__=='__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())