Python性能-最佳并行方法

24

我正在编写一个Python脚本,需要在不到5秒的时间内并行发送1500多个数据包。

简而言之,我所需要的是:

def send_pkts(ip):
    #craft packet
    while True:
        #send packet
        time.sleep(randint(0,3))

for x in list[:1500]:
    send_pkts(x)
    time.sleep(randint(1,5))

我已经尝试了简单的单线程,多线程,多进程和多进程+多线程形式,并且遇到了以下问题:

  1. 简单的单线程: "for 延迟"似乎影响了"5 秒"的依赖性。
  2. 多线程: 由于Python GIL限制,我认为我无法实现所需的功能。
  3. 多进程: 这似乎是最好的方法。然而,由于进程数量过多,我运行脚本的 VM 会冻结(当然,有1500个进程在运行)。因此变得不切实际。
  4. 多进程+多线程: 在这种方法中,我创建了较少的进程,每个进程都调用一些线程(假设:10个进程每个调用150个线程)。很明显,VM冻结的速度不如第3种方法那么快,但我能够达到的最大“并发包发送”是约800. GIL限制?VM限制? 在这次尝试中,我还尝试使用了进程池,但结果相似。

有更好的方法可以完成这项任务吗?

[1] 编辑1:

 def send_pkt(x):
     #craft pkt
     while True:
         #send pkt
         gevent.sleep(0)

 gevent.joinall([gevent.spawn(send_pkt, x) for x in list[:1500]])

[2] 编辑 2 (gevent monkey-patching):

from gevent import monkey; monkey.patch_all()

jobs = [gevent.spawn(send_pkt, x) for x in list[:1500]]
gevent.wait(jobs)
#for send_pkt(x) check [1]

然而,我遇到了以下错误:"ValueError: filedescriptor out of range in select()"。因此,我检查了我的系统ulimit(软限制和硬限制都是最大值:65536)。

接着,我发现这与Linux上的select()限制有关(最多1024个文件描述符)。请参见:http://man7.org/linux/man-pages/man2/select.2.html(BUGS部分)-为了克服这一问题,我应该使用poll()http://man7.org/linux/man-pages/man2/poll.2.html)代替。但是使用poll()时,我返回到相同的限制:因为轮询是一种“阻塞方法”。

敬礼,


1
你尝试过使用select模块来进行多路复用吗?这样你甚至不需要使用线程。 - sethmlarson
关于“然而我能达到的最高并发数据包发送量大约是800”,那之后发生了什么? - l'L'l
1
没有确凿的证据,我不认为选项2(多线程)存在任何实际问题。它甚至可能是最快的。但这完全取决于大部分时间实际上花在哪里。 - zvone
@l'L'l 所以它没有起作用是因为我需要“同时”发送1500个数据包。 - pascoal
为什么不尝试使用。您需要在5秒内发送1500个数据包,这个时间是否考虑了数据包的制作?期望的数据包大小是多少?您的网络能够处理这样的负载吗? - Adonis
显示剩余9条评论
5个回答

2
在Python中使用并行技术时,一个好的方法是使用ThreadPoolExecutor或ProcessPoolExecutor,可以从https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures这里获取更多信息,我个人的经验是它们都能很好地工作。
以下是一个可供您调整使用的threadedPoolExecutor示例。
import concurrent.futures
import urllib.request
import time

IPs= ['168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204']

def send_pkt(x):
  status = 'Failed'
  while True:
    #send pkt
    time.sleep(10)
    status = 'Successful'
    break
  return status

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_ip = {executor.submit(send_pkt, ip): ip for ip in IPs}
    for future in concurrent.futures.as_completed(future_to_ip):
        ip = future_to_ip[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (ip, exc))
        else:
            print('%r send %s' % (url, data))

1

第三个选项的结果是:"由于进程数量过多,我运行脚本的虚拟机会冻结(当然,有1500个进程在运行)",这可能需要进一步调查。到目前为止,根据收集到的信息,我认为很难确定这是多进程方法的缺陷还是虚拟机的限制。

一个相当简单和直接的方法是运行一个扩展实验:不是让所有发送都来自单个进程或全部来自同一个进程,而是尝试中间值。测试将工作量分配给两个进程、4个进程、8个进程等所需的时间。

在执行此操作时,还可以使用Windows上的或Linux上的等工具记录不同并行性选择是否导致不同类型的瓶颈,例如抢占CPU缓存、耗尽虚拟机内存或其他原因。最简单的方法是尝试一下。

根据以往处理这类问题的经验和一般规则,我预计在多处理进程数量小于或等于可用CPU核心数(无论是在VM本身还是在hypervisor上)时会获得最佳性能。但是这仅适用于问题受限于CPU的情况;如果在数据包发送期间出现阻塞,通过与其他阻塞操作交错使用可以更好地利用CPU时间,因此可能需要更多的#cpu进程来提高性能。再次强调,只有进行一些分析和/或扩展实验后,我们才能知道具体情况。


1
如果瓶颈是基于http的(“发送数据包”),那么GIL实际上不应该是一个太大的问题。
如果Python中也有计算发生,那么GIL可能会成为障碍,如你所说,基于进程的并行性将更可取。
您不需要每个任务一个进程!这似乎是您思考中的疏忽。使用Python的Pool类,您可以轻松创建一组工作进程,它们将从队列接收任务。

import multiprocessing


def send_pkts(ip):
   ...


number_of_workers = 8

with multiprocessing.Pool(number_of_workers) as pool:
    pool.map(send_pkts, list[:1500])


你现在运行的是 number_of_workers + 1 个进程(包括工作进程和原始进程),并且 N 个工作进程正在同时运行 send_pkts 函数。

1
您是正确的,Python 是单线程的,但是您想要的任务(发送网络数据包)被认为是 I/O 绑定操作,因此是多线程的好选择。只要您在编写代码时考虑异步,主线程在传输数据包时并不会忙碌。
请参阅 Python 文档中有关异步 TCP 网络的内容 - https://docs.python.org/zh-cn/3/library/asyncio-protocol.html#tcp-echo-client

1
您实现所需性能的主要问题在于send_pkts()方法。它不仅发送数据包,还会构造数据包:
def send_pkts(ip):
#craft packet
while True:
    #send packet
    time.sleep(randint(0,3))

发送数据包几乎肯定是一个I/O绑定的任务,而制作数据包几乎肯定是一个CPU绑定的任务。这个方法需要分成两个任务:
  1. 制作数据包
  2. 发送数据包
我编写了一个基本的套接字服务器和客户端应用程序,用于制作并发送数据包到服务器。想法是有一个单独的进程来制作数据包并将它们放入队列中。有一个线程池与制作数据包的进程共享队列。这些线程从队列中取出数据包并将其发送到服务器。他们还将服务器的响应放入另一个共享队列,但这仅用于我的测试,与您正在尝试做的事情无关。当线程从队列中收到None毒丸)时退出。

server.py:

import argparse
import socketserver
import time


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--host", type=str, help="bind to host")
    parser.add_argument("--port", type=int, help="bind to port")
    parser.add_argument("--packet-size", type=int, help="size of packets")
    args = parser.parse_args()
    HOST, PORT = args.host, args.port

    class MyTCPHandler(socketserver.BaseRequestHandler):
        def handle(self):
            time.sleep(1.5)
            data = self.request.recv(args.packet_size)
            self.request.sendall(data.upper())

    with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
        server.serve_forever()

client.py:

import argparse
import logging
import multiprocessing as mp
import os
import queue as q
import socket
import time
from threading import Thread


def get_logger():
    logger = logging.getLogger("threading_example")
    logger.setLevel(logging.INFO)

    fh = logging.FileHandler("client.log")
    fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
    formatter = logging.Formatter(fmt)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    return logger


class PacketMaker(mp.Process):
    def __init__(self, result_queue, max_packets, packet_size, num_poison_pills, logger):
        mp.Process.__init__(self)
        self.result_queue = result_queue
        self.max_packets = max_packets
        self.packet_size = packet_size
        self.num_poison_pills = num_poison_pills
        self.num_packets_made = 0
        self.logger = logger

    def run(self):
        while True:
            if self.num_packets_made >= self.max_packets:
                for _ in range(self.num_poison_pills):
                    self.result_queue.put(None, timeout=1)
                self.logger.debug('PacketMaker exiting')
                return
            self.result_queue.put(os.urandom(self.packet_size), timeout=1)
            self.num_packets_made += 1


class PacketSender(Thread):
    def __init__(self, task_queue, result_queue, addr, packet_size, logger):
        Thread.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.server_addr = addr
        self.packet_size = packet_size
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect(addr)
        self.logger = logger

    def run(self):
        while True:
            packet = self.task_queue.get(timeout=1)
            if packet is None:
                self.logger.debug("PacketSender exiting")
                return
            try:
                self.sock.sendall(packet)
                response = self.sock.recv(self.packet_size)
            except socket.error:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect(self.server_addr)
                self.sock.sendall(packet)
                response = self.sock.recv(self.packet_size)
            self.result_queue.put(response)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--num-packets', type=int, help='number of packets to send')
    parser.add_argument('--packet-size', type=int, help='packet size in bytes')
    parser.add_argument('--num-threads', type=int, help='number of threads sending packets')
    parser.add_argument('--host', type=str, help='name of host packets will be sent to')
    parser.add_argument('--port', type=int, help='port number of host packets will be sent to')
    args = parser.parse_args()

    logger = get_logger()
    logger.info(f"starting script with args {args}")
    
    packets_to_send = mp.Queue(args.num_packets + args.num_threads)
    packets_received = q.Queue(args.num_packets)
    producers = [PacketMaker(packets_to_send, args.num_packets, args.packet_size, args.num_threads, logger)]
    senders = [PacketSender(packets_to_send, packets_received, (args.host, args.port), args.packet_size, logger)
               for _ in range(args.num_threads)]
    start_time = time.time()
    logger.info("starting workers")
    for worker in senders + producers:
        worker.start()
    for worker in senders:
        worker.join()
    logger.info("workers finished")
    end_time = time.time()
    print(f"{packets_received.qsize()} packets received in {end_time - start_time} seconds")

run.sh:

#!/usr/bin/env bash

for i in "$@"
do
case $i in
    -s=*|--packet-size=*)
    packet_size="${i#*=}"
    shift 
    ;;
    -n=*|--num-packets=*)
    num_packets="${i#*=}"
    shift 
    ;;
    -t=*|--num-threads=*)
    num_threads="${i#*=}"
    shift 
    ;;
    -h=*|--host=*)
    host="${i#*=}"
    shift 
    ;;
    -p=*|--port=*)
    port="${i#*=}"
    shift 
    ;;
    *)
    ;;
esac
done

python3 server.py --host="${host}" \
                  --port="${port}" \
                  --packet-size="${packet_size}" &
server_pid=$!
python3 client.py --packet-size="${packet_size}" \
                  --num-packets="${num_packets}" \
                  --num-threads="${num_threads}" \
                  --host="${host}" \
                  --port="${port}"
kill "${server_pid}"

$ ./run.sh -s=1024 -n=1500 -t=300 -h=localhost -p=9999

在 4.70330023765564 秒内接收到了 1500 个数据包

$ ./run.sh -s=1024 -n=1500 -t=1500 -h=localhost -p=9999

在 1.5025699138641357 秒内接收到了 1500 个数据包

更改 client.py 中的日志级别为 DEBUG 可以验证此结果。请注意,运行脚本所需的时间远远超过了 4.7 秒。使用 300 个线程时需要进行相当多的拆卸工作,但日志清楚地表明线程在 4.7 秒时已完成处理。

请谨慎对待所有性能结果。我不知道您正在运行此脚本的系统是什么。以下是我的相关系统统计信息: 2 Xeon X5550 @2.67GHz 24MB DDR3 @1333MHz Debian 10 Python 3.7.3


我将解决您尝试的问题:
  1. 简单单线程:由于randint(0, 3)延迟,几乎肯定需要至少1.5倍的num_packets秒
  2. 多线程:GIL很可能是瓶颈,但可能是因为craft packet部分而不是send packet
  3. 多进程:每个进程至少需要一个文件描述符,因此您可能超过了用户或系统限制,但如果您更改适当的设置,则可以实现
  4. 多进程+多线程:与2号相同,制作数据包可能是CPU绑定的原因导致失败

经验法则是:I/O绑定-使用线程,CPU绑定-使用进程


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接