您实现所需性能的主要问题在于
send_pkts()
方法。它不仅发送数据包,还会构造数据包:
def send_pkts(ip):
while True:
time.sleep(randint(0,3))
发送数据包几乎肯定是一个I/O绑定的任务,而制作数据包几乎肯定是一个CPU绑定的任务。这个方法需要分成两个任务:
- 制作数据包
- 发送数据包
我编写了一个基本的套接字服务器和客户端应用程序,用于制作并发送数据包到服务器。想法是有一个单独的进程来制作数据包并将它们放入队列中。有一个线程池与制作数据包的进程共享队列。这些线程从队列中取出数据包并将其发送到服务器。他们还将服务器的响应放入另一个共享队列,但这仅用于我的测试,与您正在尝试做的事情无关。当线程从队列中收到
None
(
毒丸)时退出。
server.py:
import argparse
import socketserver
import time
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, help="bind to host")
parser.add_argument("--port", type=int, help="bind to port")
parser.add_argument("--packet-size", type=int, help="size of packets")
args = parser.parse_args()
HOST, PORT = args.host, args.port
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
time.sleep(1.5)
data = self.request.recv(args.packet_size)
self.request.sendall(data.upper())
with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
client.py:
import argparse
import logging
import multiprocessing as mp
import os
import queue as q
import socket
import time
from threading import Thread
def get_logger():
logger = logging.getLogger("threading_example")
logger.setLevel(logging.INFO)
fh = logging.FileHandler("client.log")
fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(fmt)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
class PacketMaker(mp.Process):
def __init__(self, result_queue, max_packets, packet_size, num_poison_pills, logger):
mp.Process.__init__(self)
self.result_queue = result_queue
self.max_packets = max_packets
self.packet_size = packet_size
self.num_poison_pills = num_poison_pills
self.num_packets_made = 0
self.logger = logger
def run(self):
while True:
if self.num_packets_made >= self.max_packets:
for _ in range(self.num_poison_pills):
self.result_queue.put(None, timeout=1)
self.logger.debug('PacketMaker exiting')
return
self.result_queue.put(os.urandom(self.packet_size), timeout=1)
self.num_packets_made += 1
class PacketSender(Thread):
def __init__(self, task_queue, result_queue, addr, packet_size, logger):
Thread.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.server_addr = addr
self.packet_size = packet_size
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(addr)
self.logger = logger
def run(self):
while True:
packet = self.task_queue.get(timeout=1)
if packet is None:
self.logger.debug("PacketSender exiting")
return
try:
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
except socket.error:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(self.server_addr)
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
self.result_queue.put(response)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num-packets', type=int, help='number of packets to send')
parser.add_argument('--packet-size', type=int, help='packet size in bytes')
parser.add_argument('--num-threads', type=int, help='number of threads sending packets')
parser.add_argument('--host', type=str, help='name of host packets will be sent to')
parser.add_argument('--port', type=int, help='port number of host packets will be sent to')
args = parser.parse_args()
logger = get_logger()
logger.info(f"starting script with args {args}")
packets_to_send = mp.Queue(args.num_packets + args.num_threads)
packets_received = q.Queue(args.num_packets)
producers = [PacketMaker(packets_to_send, args.num_packets, args.packet_size, args.num_threads, logger)]
senders = [PacketSender(packets_to_send, packets_received, (args.host, args.port), args.packet_size, logger)
for _ in range(args.num_threads)]
start_time = time.time()
logger.info("starting workers")
for worker in senders + producers:
worker.start()
for worker in senders:
worker.join()
logger.info("workers finished")
end_time = time.time()
print(f"{packets_received.qsize()} packets received in {end_time - start_time} seconds")
run.sh:
#!/usr/bin/env bash
for i in "$@"
do
case $i in
-s=*|--packet-size=*)
packet_size="${i#*=}"
shift
;;
-n=*|--num-packets=*)
num_packets="${i#*=}"
shift
;;
-t=*|--num-threads=*)
num_threads="${i#*=}"
shift
;;
-h=*|--host=*)
host="${i#*=}"
shift
;;
-p=*|--port=*)
port="${i#*=}"
shift
;;
*)
;;
esac
done
python3 server.py --host="${host}" \
--port="${port}" \
--packet-size="${packet_size}" &
server_pid=$!
python3 client.py --packet-size="${packet_size}" \
--num-packets="${num_packets}" \
--num-threads="${num_threads}" \
--host="${host}" \
--port="${port}"
kill "${server_pid}"
$ ./run.sh -s=1024 -n=1500 -t=300 -h=localhost -p=9999
在 4.70330023765564 秒内接收到了 1500 个数据包
$ ./run.sh -s=1024 -n=1500 -t=1500 -h=localhost -p=9999
在 1.5025699138641357 秒内接收到了 1500 个数据包
更改 client.py 中的日志级别为 DEBUG
可以验证此结果。请注意,运行脚本所需的时间远远超过了 4.7 秒。使用 300 个线程时需要进行相当多的拆卸工作,但日志清楚地表明线程在 4.7 秒时已完成处理。
请谨慎对待所有性能结果。我不知道您正在运行此脚本的系统是什么。以下是我的相关系统统计信息:
2 Xeon X5550 @2.67GHz
24MB DDR3 @1333MHz
Debian 10
Python 3.7.3
我将解决您尝试的问题:
- 简单单线程:由于
randint(0, 3)
延迟,几乎肯定需要至少1.5倍的num_packets秒
- 多线程:GIL很可能是瓶颈,但可能是因为
craft packet
部分而不是send packet
- 多进程:每个进程至少需要一个文件描述符,因此您可能超过了用户或系统限制,但如果您更改适当的设置,则可以实现
- 多进程+多线程:与2号相同,制作数据包可能是CPU绑定的原因导致失败
经验法则是:I/O绑定-使用线程,CPU绑定-使用进程
select
模块来进行多路复用吗?这样你甚至不需要使用线程。 - sethmlarson