Python - 动态缩小线程池 / 停止一个线程

3
我正在编写一个小型的多线程http文件下载器,希望能够在代码遇到错误时缩小可用线程。
这些错误将特定于返回的http错误,其中Web服务器不允许更多连接。
例如,如果我设置了一个5个线程的池,每个线程都试图打开自己的连接并下载文件的一部分。服务器可能只允许2个连接,并且会返回503错误。我想检测到这一点并关闭一个线程,最终将池的大小限制为服务器允许的仅2个线程。
我可以让一个线程停止自己吗?
self.Thread_stop()足够吗?
我还需要join()吗?
这是我的工作类,它执行下载操作,从队列中获取要处理的数据,一旦下载完成,将结果倒入resultQ以便主线程保存到文件中。
在这里,我想检测http 503并从可用池中停止/删除线程 - 当然,还要将失败的块重新添加到队列中,以便剩余的线程将其处理。
class Downloader(threading.Thread):
    def __init__(self, queue, resultQ, file_name):
        threading.Thread.__init__(self)
        self.workQ = queue
        self.resultQ = resultQ
        self.file_name = file_name

    def run(self):
        while True:
            block_num, url, start, length = self.workQ.get()
            print 'Starting Queue #: %s' % block_num
            print start
            print length

            #Download the file
            self.download_file(url, start, length)

            #Tell queue that this task is done
            print 'Queue #: %s finished' % block_num
            self.workQ.task_done()


    def download_file(self, url, start, length):        

        request = urllib2.Request(url, None, headers)
        if length == 0:
            return None
        request.add_header('Range', 'bytes=%d-%d' % (start, start + length))

        while 1:
            try:
                data = urllib2.urlopen(request)
            except urllib2.URLError, u:
                print "Connection did not start with", u
            else:
                break

        chunk = ''
        block_size = 1024
        remaining_blocks = length

        while remaining_blocks > 0:

            if remaining_blocks >= block_size:
                fetch_size = block_size
            else:
                fetch_size = int(remaining_blocks)

            try:
                data_block = data.read(fetch_size)
                if len(data_block) == 0:
                    print "Connection: [TESTING]: 0 sized block" + \
                        " fetched."
                if len(data_block) != fetch_size:
                    print "Connection: len(data_block) != length" + \
                        ", but continuing anyway."
                    self.run()
                    return

            except socket.timeout, s:
                print "Connection timed out with", s
                self.run()
                return

            remaining_blocks -= fetch_size
            chunk += data_block

        resultQ.put([start, chunk])

以下是我初始化线程池的地方,稍后我将项目放入队列中。

# create a thread pool and give them a queue
for i in range(num_threads):
    t = Downloader(workQ, resultQ, file_name)
    t.setDaemon(True)
    t.start()
3个回答

2
我能使一个线程停止吗?
不要使用self._Thread__stop()。退出线程的run()方法就足够了(你可以检查标志或从队列中读取特殊值来知道何时退出)。
在这里,我想检测http 503并从可用池中停止/杀死/删除一个线程 - 当然还需要将失败的块重新添加到队列中,以便剩余的线程将其处理。
您可以通过分离职责来简化代码:
1. download_file() 不应该在无限循环中尝试重新连接。如果出现错误,则让调用download_file()的代码在必要时重新提交它。 2. 关于并发连接数的控制可以封装在Semaphore对象中。此情况下,线程数可能与并发连接数不同。
import concurrent.futures # on Python 2.x: pip install futures 
from threading import BoundedSemaphore

def download_file(args):
    nconcurrent.acquire(timeout=args['timeout']) # block if too many connections
    # ...
    nconcurrent.release() #NOTE: don't release it on exception,
                          #      allow the caller to handle it

# you can put it into a dictionary: server -> semaphore instead of the global
nconcurrent = BoundedSemaphore(5) # start with at most 5 concurrent connections
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(download_file, args), args)
                           for args in generate_initial_download_tasks())

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            try: 
                result = future.result()
            except Exception as e:
                print('%r generated an exception: %s' % (args, e))
                if getattr(e, 'code') != 503:
                   # don't decrease number of concurrent connections
                   nconcurrent.release() 
                # resubmit
                args['timeout'] *= 2                    
                future_to_args[executor.submit(download_file, args)] = args
            else: # successfully downloaded `args`
                print('f%r returned %r' % (args, result))

请查看 ThreadPoolExecutor() 示例

谢谢,我需要仔细阅读一下这个。 我最后得出了和你说的相同的结论,只需退出线程的run()函数,它就会停止尝试从队列中获取数据。我喜欢你提出的建议,谢谢! - MikeM

1
你应该使用线程池来控制线程的生命周期: 当线程存在时,您可以向处理线程池的主线程发送消息,然后更改线程池的大小,并将新请求或失败请求推迟到一个堆栈中,您将清空该堆栈。
tedelanay关于您给线程提供的守护进程状态是完全正确的。没有必要将它们设置为守护进程。
基本上,您可以简化代码,可以按照以下方式进行操作:
import threadpool

def process_tasks():
    pool = threadpool.ThreadPool(4)

    requests = threadpool.makeRequests(download_file, arguments)

    for req in requests:
        pool.putRequest(req) 

    #wait for them to finish (or you could go and do something else)
    pool.wait()

if __name__ == '__main__': 
    process_tasks()

其中arguments取决于您的策略。您可以将一个队列作为参数传递给线程,然后清空队列。或者您可以在process_tasks中处理队列,在池满时阻塞,并在线程完成但队列不为空时打开新线程。这完全取决于您的需求和下载器的上下文。

资源:


非常好的信息,谢谢!我没有看到你如何使用线程池来调整大小.. 我一定是忽略了一些明显的东西? - MikeM

0
一个Thread对象通过从run方法返回来终止线程,而不是调用stop方法。如果将线程设置为守护模式,则无需加入(join),否则主线程需要这样做。通常,线程使用resultq来报告自己正在退出,主线程使用该信息来进行加入(join)操作。这有助于进程的有序终止。如果Python仍在处理多个线程,系统退出时可能会出现奇怪的错误,最好避免这种情况。

但是正如你所看到的,只要从workQ中有项目可获取,线程就会持续运行。如果一个线程遇到503错误,我希望将可用线程数减少1个,让剩余的线程处理workQ中剩下的内容。 - MikeM

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接