多进程 - 管道 vs 队列

200

Python的多进程包中,队列和管道有哪些基本区别?

在什么情况下应该选择其中之一?何时使用Pipe()更有优势?何时使用Queue()更有优势?

4个回答

374

简短总结

截至CY2023年,本答案中描述的技术已经相当过时。如今,您可以使用pebblempireconcurrent.futures.ProcessPoolExecutor()...

无论您使用哪种Python并发工具,下面是对原问题的回答仍然有效。

ProcessPoolExector()不需要Pipe()Queue()来进行任务/结果的通信。

原始答案

一个Pipe()只能有两个端点。
一个Queue()可以有多个生产者和消费者。
何时使用它们
如果你需要超过两个点来进行通信,请使用Queue()
如果你需要绝对的性能,Pipe()要快得多,因为Queue()是建立在Pipe()之上的。
性能基准测试
假设你想要生成两个进程,并尽快地在它们之间发送消息。这些是使用Pipe()Queue()进行类似测试的计时结果...
FYI,我额外提供了SimpleQueue()JoinableQueue()的结果。
  • JoinableQueue()在调用queue.task_done()时会计算任务(它甚至不知道具体的任务,只是计算队列中未完成的任务数),以便queue.join()知道工作已经完成。

每个代码都在答案底部...

# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.9.2

$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.14316844940185547 seconds
Sending 100000 numbers to Pipe() took 1.3749017715454102 seconds
Sending 1000000 numbers to Pipe() took 14.252539157867432 seconds
$  python multi_queue.py
Sending 10000 numbers to Queue() took 0.17014789581298828 seconds
Sending 100000 numbers to Queue() took 1.7723784446716309 seconds
Sending 1000000 numbers to Queue() took 17.758610725402832 seconds
$ python multi_simplequeue.py
Sending 10000 numbers to SimpleQueue() took 0.14937686920166016 seconds
Sending 100000 numbers to SimpleQueue() took 1.5389132499694824 seconds
Sending 1000000 numbers to SimpleQueue() took 16.871352910995483 seconds
$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.15144729614257812 seconds
Sending 100000 numbers to JoinableQueue() took 1.567549228668213 seconds
Sending 1000000 numbers to JoinableQueue() took 16.237736225128174 seconds



# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.7.0

(py37_test) [mpenning@mudslide ~]$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.13469791412353516 seconds
Sending 100000 numbers to Pipe() took 1.5587594509124756 seconds
Sending 1000000 numbers to Pipe() took 14.467186689376831 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.1897726058959961 seconds
Sending 100000 numbers to Queue() took 1.7622203826904297 seconds
Sending 1000000 numbers to Queue() took 16.89015531539917 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.2238149642944336 seconds
Sending 100000 numbers to JoinableQueue() took 1.4744081497192383 seconds
Sending 1000000 numbers to JoinableQueue() took 15.264554023742676 seconds


# This is on a ThinkpadT61 running Ubuntu 11.10, and Python 2.7.2

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

总结一下:
在Python 2.7版本中,使用`Pipe()`比使用`Queue()`快大约300%。除非你真的需要它的好处,否则不要考虑使用`JoinableQueue()`。
在Python 3.x版本中,`Pipe()`仍然比`Queue()`快大约20%,但是`Pipe()`和`Queue()`之间的性能差距没有在Python 2.7中那么明显。各种`Queue()`实现之间的性能差距大约在15%左右。此外,我的测试使用的是整数数据。有些人评论说他们在使用多进程时发现了不同数据类型之间的性能差异。
对于Python 3.x来说,最重要的是:你可能会有不同的结果...建议你使用自己的数据类型(例如整数/字符串/对象)进行测试,以便对你自己感兴趣的平台和使用情况得出结论。
我还应该提到,我的Python3.x性能测试不一致且有些变化。我运行了多次测试,持续几分钟,以获得每种情况下的最佳结果。我怀疑这些差异与在VMWare/虚拟化环境下运行我的Python3测试有关;然而,虚拟化诊断只是猜测。
*** 对有关测试技术的评论的回应 ***
在评论中,@JJC 说道
引用: 一个更公平的比较是运行N个工作线程,每个通过点对点管道与主线程通信,与运行N个工作线程从单个点到多点队列中拉取的性能进行比较。

起初,这个答案只考虑了一个工作者和一个生产者的表现;这是Pipe()的基本用例。你的评论需要为多个工作者进程添加不同的测试。虽然这对于常见的Queue()用例来说是一个有效的观察,但它很容易在完全新的维度上扩大测试矩阵(即添加具有不同数量的工作者进程的测试)。

额外材料2

多进程引入了信息流中的微妙变化,使得除非你知道一些捷径,否则调试变得困难。例如,你可能有一个脚本,在许多条件下通过字典进行索引时工作正常,但在某些输入下偶尔失败。

通常情况下,当整个Python进程崩溃时,我们会得到关于失败的线索;然而,如果多进程函数崩溃,你不会在控制台上得到未经请求的崩溃回溯打印。追踪未知的多进程崩溃是很困难的,因为你不知道是什么导致了进程崩溃。

我发现追踪多进程崩溃信息的最简单方法是将整个多进程函数包装在try / except中,并使用traceback.print_exc()

import traceback
def run(self, args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()

现在,当你遇到一个崩溃时,你会看到类似这样的东西:
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(args)
  File "foo.py", line 46, in run
    KeyError: 'that'

源代码:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()    # We are only reading
    while True:
        msg = p_output.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer(count, p_input):
    for ii in range(0, count):
        p_input.send(ii)             # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        p_output.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input) # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
            (time.time() - _start)))

"""
multi_queue.py
"""

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))

"""
multi_simplequeue.py
"""

from multiprocessing import Process, SimpleQueue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = SimpleQueue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to SimpleQueue() took {1} seconds".format(count,
            (time.time() - _start)))

"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process
        _start = time.time()
        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()         # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
            (time.time() - _start)))

3
总之,Pipe() 的速度大约是 Queue() 的三倍。 - James Brady
16
很好!回答不错,很高兴你提供了基准!我只有两个小问题:(1)“快数个数量级”有些夸张了。差别是3倍,大约是一个数量级的三分之一。仅此而已。;);(2)更公平的比较应该是运行N个工作者,每个工作者通过点对点管道与主线程通信,与运行N个工作者所有从单一点到多点队列中拉取性能相比较。 - JJC
3
关于您的“奖励材料”……是的。如果您正在子类化Process,请将大部分“run”方法放入try块中。这也是记录异常的有用方法。为了复制正常的异常输出:sys.stderr.write(''.join(traceback.format_exception(*(sys.exc_info()))))。 - travc
2
@Mike,只是想说你真棒。这个答案帮了我很多忙。 - Will
12
针对你的抱怨,我想指出,3倍大约是半个数量级,而不是三分之一 -- 根号10约等于3。 - jab
显示剩余14条评论

14
Queue()的一个附加特性值得注意的是它的馈送线程。 部分指出:“当进程首次将项放入队列时,将启动馈送线程,该线程将对象从缓冲区传输到管道中。” 无限数量(或最大值)的项目可以插入Queue(),而不需要任何对queue.put()的调用阻塞。 这使您可以将多个项目存储在Queue()中,直到程序准备好处理它们。Pipe() 另一方面,对于已发送到一个连接但尚未从另一个连接接收的项目,它具有有限的存储空间。在使用完该存储空间后,对 connection.send() 的调用将被阻塞,直到有足够的空间写入整个项目为止。这将使执行写操作的线程停滞,直到某个其他线程从管道中读取数据。Connection 对象使您能够访问底层文件描述符。在 *nix 系统上,您可以使用 os.set_blocking() 函数来防止 connection.send() 调用阻塞。然而,如果您尝试发送一个不适合管道文件的单个项目,则会出现问题。最近的 Linux 版本允许您增加文件的大小,但允许的最大大小根据系统配置而异。因此,您永远不应该依赖于 Pipe() 来缓冲数据。对 connection.send 的调用可能会被阻塞,直到数据从某个其他地方的管道中读取为止。
总之,当您需要缓冲数据时,队列比管道更好。即使您只需要在两点之间通信。

你所链接的部分提到了一个“feeder thread”,但是put方法的文档仍然声明它是一个阻塞或失败的方法:“如果可选的参数block为真(默认值),并且timeout为None(默认值),则必要时阻塞,直到有一个空闲插槽为止。如果timeout是一个正数,则最多阻塞timeout秒,并在该时间内没有可用的自由插槽时引发queue.Full异常。”你确定你的答案吗? - Anab
1
我对我的答案很有信心。如果在Queue的构造函数中指定了maxsize参数,那么put方法将会阻塞。但这是由于队列中的项目数量而不是单个项目的大小所致。 - akindofyoga
感谢您的澄清,我之前漏掉了那一部分。 - Anab

6

如果你和我一样在考虑是否在你的线程程序中使用multiprocessing结构(PipeQueue)来提高性能,那么我已经改编了Mike Pennington的脚本来与queue.Queuequeue.SimpleQueue进行比较:

Sending 10000 numbers to mp.Pipe() took 65.051 ms
Sending 10000 numbers to mp.Queue() took 78.977 ms
Sending 10000 numbers to queue.Queue() took 14.781 ms
Sending 10000 numbers to queue.SimpleQueue() took 0.939 ms
Sending 100000 numbers to mp.Pipe() took 449.564 ms
Sending 100000 numbers to mp.Queue() took 811.938 ms
Sending 100000 numbers to queue.Queue() took 149.387 ms
Sending 100000 numbers to queue.SimpleQueue() took 9.264 ms
Sending 1000000 numbers to mp.Pipe() took 4660.451 ms
Sending 1000000 numbers to mp.Queue() took 8499.743 ms
Sending 1000000 numbers to queue.Queue() took 1490.062 ms
Sending 1000000 numbers to queue.SimpleQueue() took 91.238 ms
Sending 10000000 numbers to mp.Pipe() took 45095.935 ms
Sending 10000000 numbers to mp.Queue() took 84829.042 ms
Sending 10000000 numbers to queue.Queue() took 15179.356 ms
Sending 10000000 numbers to queue.SimpleQueue() took 917.562 ms

毫不意外,如果你只有线程的话,使用queue包会产生更好的结果。尽管如此,我还是对于queue.SimpleQueue的性能表现感到惊讶。


"""
pipe_performance.py
"""
import threading as td
import queue
import multiprocessing as mp
import multiprocessing.connection as mp_connection
import time
import typing

def reader_pipe(p_out: mp_connection.Connection) -> None:
    while True:
        msg = p_out.recv()
        if msg=='DONE':
            break

def reader_queue(p_queue: "queue.Queue[typing.Union[str, int]]") -> None:
    while True:
        msg = p_queue.get()
        if msg=='DONE':
            break

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6, 10**7]:
    # first: mp.pipe
        p_mppipe_out, p_mppipe_in = mp.Pipe()
        reader_p = td.Thread(target=reader_pipe, args=((p_mppipe_out),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_mppipe_in.send(ii)
        p_mppipe_in.send('DONE')
        reader_p.join()
        print(f"Sending {count} numbers to mp.Pipe() took {(time.time() - _start)*1e3:.3f} ms")

    # second: mp.Queue
        p_mpqueue  = mp.Queue()
        reader_p = td.Thread(target=reader_queue, args=((p_mpqueue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_mpqueue.put(ii)
        p_mpqueue.put('DONE')
        reader_p.join()
        print(f"Sending {count} numbers to mp.Queue() took {(time.time() - _start)*1e3:.3f} ms")

    # third: queue.Queue
        p_queue = queue.Queue()
        reader_p = td.Thread(target=reader_queue, args=((p_queue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_queue.put(ii)
        p_queue.put('DONE')
        reader_p.join()
        print(f"Sending {count} numbers to queue.Queue() took {(time.time() - _start)*1e3:.3f} ms")

    # fourth: queue.SimpleQueue
        p_squeue = queue.SimpleQueue()
        reader_p = td.Thread(target=reader_queue, args=((p_squeue),))
        reader_p.start()
        _start = time.time()
        for ii in range(0, count):
            p_squeue.put(ii)
        p_squeue.put('DONE')
        reader_p.join()
        print(f"Sending {count} numbers to queue.SimpleQueue() took {(time.time() - _start)*1e3:.3f} ms")

你能测试一下 mp.queues.SimpleQueue 吗?它是否比 mp.Queue 更快?因为我正在尝试优化我创建的多进程程序的性能,而且我只能使用 mp 内部的东西... - pepoluan
抱歉,应该是 multiprocessing.SimpleQueue - pepoluan

0
使用concurrent.futures.ProcessPoolExecutor在Python中执行子进程时,不能将multiprocessing.Queue作为参数传递。如果这样做,你会收到类似以下的错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance

在这种情况下,一个解决方法是使用multiprocessing.Manager创建一个队列,并将其作为参数传递给进程。然而,我发现这种类型的队列比标准的multiprocessing.Queue慢得多。我没有找到任何关于这种类型队列的基准测试,所以我自己运行了一些测试。我修改了Mike Pennington的测试代码来测试这种Manager类型的队列。
以下是测试结果。首先,我重新运行标准队列测试作为参考:
Sending 10000 numbers to Queue() took 0.12702512741088867 seconds
Sending 100000 numbers to Queue() took 0.9972114562988281 seconds
Sending 1000000 numbers to Queue() took 9.9016695022583 seconds

Sending 10000 numbers to manager.Queue() took 1.0181043148040771 seconds
Sending 100000 numbers to manager.Queue() took 10.438829898834229 seconds
Sending 1000000 numbers to manager.Queue() took 102.3624701499939 seconds

结果显示,由multiprocessing.Manager创建的队列大约比标准的multiprocessing.Queue慢10倍。这是非常大的差异。如果您关心性能,请不要使用这种类型的队列。
源代码:
"""
manager_multi_queue.py
"""

from multiprocessing import Process, Queue, Manager
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    manager = Manager()
    pqueue = manager.Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to manager.Queue() took {1} seconds".format(count, (time.time() - _start)))

新更新:

在我的应用程序中,有多个进程同时向队列写入数据,而一个进程则消费这些结果。事实证明,在这种情况下,这些队列的性能差异非常大。当多个进程同时向标准的multiprocessing.Queue写入数据时,它很容易变得不堪重负,并且读取性能会大幅下降。在这种情况下,有更快速的替代方案可供选择。

在这里,我比较了三种类型的队列的读取性能,这三种队列都是在5个进程不断写入数据的情况下进行测试的,这三种队列分别是:

  1. multiprocessing.Queue
  2. multiprocessing.Manager.Queue
  3. 使用带锁的multiprocessing.Pipe实现的自定义队列,以便可以安全地被多个进程使用。

点击此处查看结果图表

结果显示,这三种队列之间的性能差异非常大。最快的是使用Pipes的队列,其次是使用Manager创建的队列,最慢的是标准的multiprocessing.Queue。如果您关心队列在写入数据时的读取性能,最好选择使用管道或管理队列。

这是带有图表的新测试的源代码:

源代码:

from __future__ import annotations

"""
queue_comparison_plots.py
"""

import asyncio
import random
from dataclasses import dataclass
from itertools import groupby
from multiprocessing import Process, Queue, Manager
import time
from matplotlib import pyplot as plt
import multiprocessing as mp

class PipeQueue():
    pipe_in: mp.connection.Connection
    pipe_out: mp.connection.Connection

    def __init__(self):
        self.pipe_out, self.pipe_in = mp.Pipe(duplex=False)
        self.write_lock = mp.Lock()
        self.read_lock = mp.Lock()

    def get(self):
        with self.read_lock:
            return self.pipe_out.recv()

    def put(self, val):
        with self.write_lock:
            self.pipe_in.send(val)

@dataclass
class Result():
    queue_type: str
    data_size_bytes: int
    num_writer_processes: int
    num_reader_processes: int
    msg_read_rate: float

class PerfTracker():
    def __init__(self):
        self.running = mp.Event()
        self.count = mp.Value("i")
        self.start_time: float | None = None
        self.end_time: float | None = None

    @property
    def rate(self) -> float:
        return (self.count.value)/(self.end_time-self.start_time)

    def update(self):
        if self.running.is_set():
            with self.count.get_lock():
                self.count.value += 1

    def start(self):
        with self.count.get_lock():
            self.count.value = 0
        self.running.set()
        self.start_time = time.time()

    def end(self):
        self.running.clear()
        self.end_time = time.time()


def reader_proc(queue, perf_tracker, num_threads = 1):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(reader_proc_async(queue, perf_tracker, num_threads))


async def reader_proc_async(queue, perf_tracker, num_threads = 1):
    async def thread(queue, perf_tracker):
        while True:
            msg = queue.get()
            perf_tracker.update()

    futures = []
    for i in range(num_threads):
        futures.append(thread(queue, perf_tracker))

    await asyncio.gather(*futures)


def writer_proc(queue, data_size_bytes: int):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(writer_proc_async(queue, data_size_bytes))


async def writer_proc_async(queue, data_size_bytes: int):
    val = random.randbytes(data_size_bytes)

    while True:
        queue.put(val)


async def main():
    num_reader_procs = 1
    num_reader_threads = 1
    num_writer_procs = 5
    test_time = 5

    results = []
    for queue_type in ["Pipe + locks", "Queue using Manager", "Queue"]:
        for data_size_bytes_order_of_magnitude in range(8):
            data_size_bytes = 10 ** data_size_bytes_order_of_magnitude
            perf_tracker = PerfTracker()

            if queue_type == "Queue using Manager":
                manager = Manager()
                pqueue = manager.Queue()
            elif queue_type == "Pipe + locks":
                pqueue = PipeQueue()
            elif queue_type == "Queue":
                pqueue = Queue()
            else:
                raise NotImplementedError()

            reader_ps = []
            for i in range(num_reader_procs):
                reader_p = Process(target=reader_proc, args=(pqueue, perf_tracker, num_reader_threads))
                reader_ps.append(reader_p)


            writer_ps = []
            for i in range(num_writer_procs):
                writer_p = Process(target=writer_proc, args=(pqueue, data_size_bytes))
                writer_ps.append(writer_p)

            for writer_p in writer_ps:
                writer_p.start()

            for reader_p in reader_ps:
                reader_p.start()

            await asyncio.sleep(1)
            print("start")
            perf_tracker.start()
            await asyncio.sleep(test_time)
            perf_tracker.end()
            print(f"Finished. {queue_type} | {data_size_bytes} |  {perf_tracker.rate} msg/sec")

            results.append(
                Result(
                    queue_type = queue_type,
                    data_size_bytes = data_size_bytes,
                    num_writer_processes = num_writer_procs,
                    num_reader_processes = num_reader_procs,
                    msg_read_rate = perf_tracker.rate,
                )
            )
            for writer_p in writer_ps:
                writer_p.kill()

            for reader_p in reader_ps:
                reader_p.kill()

    print(results)

    fig, ax = plt.subplots()

    count = 0
    for queue_type, result_iterator in groupby(results, key=lambda result: result.queue_type):
        grouped_results = list(result_iterator)
        x_coords = [x.data_size_bytes for x in grouped_results]
        y_coords = [x.msg_read_rate for x in grouped_results]
        ax.plot(x_coords, y_coords, label=f"{queue_type}")
        count += 1

    ax.set_title(f"Queue read performance comparison while writing continuously", fontsize=11)
    ax.legend(loc='upper right', fontsize=10)
    ax.set_yscale("log")
    ax.set_xscale("log")
    ax.set_xlabel("Message size (bytes)")
    ax.set_ylabel("Message throughput (messages/second)")
    plt.show()

if __name__=='__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(main())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接