Python-requests多线程处理问题

5

我正在开发一个HTTP客户端,它可以每秒生成数百个连接,并在每个连接上发送最多10个请求。我使用线程来实现并发。

下面是我的代码:

def generate_req(reqSession):
    requestCounter = 0
    while requestCounter < requestRate:
        try:
            response1 = reqSession.get('http://20.20.1.2/tempurl.html')
            if response1.status_code == 200:
                client_notify('r')
        except(exceptions.ConnectionError, exceptions.HTTPError, exceptions.Timeout) as Err:
            client_notify('F')
            break
        requestCounter += 1

def main():
    for q in range(connectionPerSec):
        s1 = requests.session()
        t1 = threading.Thread(target=generate_req, args=(s1,))
        t1.start()

问题:

  1. 当requestRate = 1时,它无法扩展到超过200个连接/秒。我在同一客户机上运行了其他可用的HTTP客户端,并针对服务器进行测试,测试运行良好,可以扩展。

  2. 当requestRate = 10时,每秒连接数下降到30。 原因:无法每秒创建指定数量的线程。

对于问题#2,客户端机器无法创建足够的请求会话和启动新的线程。一旦将requestRate设置为大于1,就会出现问题。 我怀疑这与requests使用的HTTP连接池有关。

请建议我在这里做错了什么。


1
为什么要重复使用相同的“session”对象?据我所知,您的“connectionPerSec”实际上意味着并行线程的数量,“requestRate”实际上是每个线程上的串行请求数量。您可能还想查看concurrent.futures - Denilson Sá Maia
会话对象仅在生成的线程中重复使用,因此它为所有串行请求使用相同的连接。 - user3040103
看起来 concurrent.futures 中的 ThreadPoolExecutor 做的就是我想做的事情? - user3040103
请注意,在CPython中,线程的并发性是有限制的:一次只能有一个线程执行Python字节码。为了简化内存管理,全局解释器锁定强制执行此限制。 - Roland Smith
1个回答

1
我无法让事情分崩离析,但以下代码具有一些新功能:
1)扩展日志记录,包括每个线程的特定信息。
2)所有线程在结尾处都已加入(join),以确保父进程不会将它们挂起。
3)多线程打印往往交错输出消息,这可能不太方便。此版本使用yield,以便将来版本可以接受消息并清晰打印。

源码

import exceptions, requests, threading, time

requestRate = 1
connectionPerSec = 2


def client_notify(msg):
    return time.time(), threading.current_thread().name, msg

def generate_req(reqSession):
    requestCounter = 0
    while requestCounter < requestRate:
        try:
            response1 = reqSession.get('http://127.0.0.1/')
            if response1.status_code == 200:
                print client_notify('r')
        except (exceptions.ConnectionError, exceptions.HTTPError, exceptions.Timeout):
            print client_notify('F')
            break
        requestCounter += 1

def main():
    for cnum in range(connectionPerSec):
        s1 = requests.session()
        th = threading.Thread(
            target=generate_req, args=(s1,),
            name='thread-{:03d}'.format(cnum),
        )
        th.start()

    for th in threading.enumerate():
        if th != threading.current_thread():
            th.join()


if __name__=='__main__':
    main()

输出

(1407275951.954147, 'thread-000', 'r')
(1407275951.95479, 'thread-001', 'r')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接