Python 3并发.futures和每个线程的初始化

4
在Python 3中,是否可以在concurrent.futures.ThreadPoolExecutor上下文中使用Thread的子类,以便它们可以在处理(可能是许多)工作项之前进行单独初始化?
我想使用方便的concurrent.futures API来同步文件和S3对象的代码(每个工作项都是一个要同步的文件,如果相应的S3对象不存在或不同步)。我希望每个工作线程都能先进行一些初始化,例如设置一个boto3.session.Session。然后,该线程池就准备好处理潜在的成千上万的工作项(要同步的文件)。
顺便说一句,如果某个线程因某种原因死亡,是否可以合理地期望自动创建一个新线程并将其添加回线程池中?
(免责声明:我对Java的多线程框架比Python更熟悉。)
1个回答

4

所以,似乎解决我的问题的简单方法是使用threading.local来存储每个线程的“会话”(在下面的模拟中,仅为随机整数)。也许不是最干净的方法,但现在可以用它。这是一个模拟(Python 3.5.1):

import time
import threading
import concurrent.futures
import random
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')

x = [0.1, 0.1, 0.2, 0.4, 1.0, 0.1, 0.0]

mydata = threading.local()

def do_work(secs):
    if 'session' in mydata.__dict__:
        logging.debug('re-using session "{}"'.format(mydata.session))
    else:
        mydata.session = random.randint(0,1000)
        logging.debug('created new session: "{}"'.format(mydata.session))
    time.sleep(secs)
    logging.debug('slept for {} seconds'.format(secs))
    return secs

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    y = executor.map(do_work, x)

print(list(y))

产生以下输出,显示 “sessions” 确实是每个线程本地的并且被重复使用:
(Thread-1) 29 - created new session: "855"
(Thread-2) 29 - created new session: "58"
(Thread-3) 30 - created new session: "210"
(Thread-1) 129 - slept for 0.1 seconds
(Thread-1) 130 - re-using session "855"
(Thread-2) 130 - slept for 0.1 seconds
(Thread-2) 130 - re-using session "58"
(Thread-3) 230 - slept for 0.2 seconds
(Thread-3) 230 - re-using session "210"
(Thread-3) 331 - slept for 0.1 seconds
(Thread-3) 331 - re-using session "210"
(Thread-3) 331 - slept for 0.0 seconds
(Thread-1) 530 - slept for 0.4 seconds
(Thread-2) 1131 - slept for 1.0 seconds
[0.1, 0.1, 0.2, 0.4, 1.0, 0.1, 0.0]

关于日志记录的小提示:为了在IPython笔记本中使用它,需要稍微修改日志设置(因为IPython已经设置了根记录器)。更健壮的日志设置应该是:

IN_IPYNB = 'get_ipython' in vars()

if IN_IPYNB:
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    for h in logger.handlers:
        h.setFormatter(logging.Formatter(
                '(%(threadName)-0s) %(relativeCreated)d - %(message)s'))
else:
    logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')

2
这不是真的,ThreadPoolExecutor会重用线程(通常在任务数大于线程数时)。在这种情况下,使用相同线程的上下文可能会导致意外行为。 - Lewis Bushman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接