在concurrent.futures.ThreadPoolExecutor中识别当前线程。

4
以下代码有5个工作者...每个工作者都打开自己的 worker_task()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(worker_task, command_, site_): site_ for site_ in URLS}

    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try: data = future.result()

但是在每个worker_task()内部......我无法确定当前正在使用的5个工作进程中的哪一个(Worker_ID

如果我想在worker_task()内打印“worker 3已完成”.....我无法这样做,因为executor.submit不允许

有解决方案吗?


这里是一个关于使用Python的concurrent.futures线程池来识别运行函数的线程的StackOverflow链接,希望对你有所帮助。 - njari
谢谢,这并没有将worker分配给我的URLS列表...我需要为每个URLS创建一个done/pending变量...这甚至无法适应executor.submit()语句...这个链接提供了跟踪线程ID的能力...但如果不将任务URL与工作人员联系起来,这样做是没有用的。 - Rhys
有任何反馈吗? - Artiom Kozyrev
1个回答

6
您可以使用 threading.current_thread() 函数获取工作线程的名称。以下是一些示例:

您可以使用 threading.current_thread() 函数获取工作者线程的名称。以下是一些示例:

from concurrent.futures import ThreadPoolExecutor, Future
from threading import current_thread
from time import sleep
from random import randint

# imagine these are urls
URLS = [i for i in range(100)]


def do_some_work(url, a, b):
    """Simulates some work"""
    sleep(2)
    rand_num = randint(a, b)
    if rand_num == 5:
        raise ValueError("No! 5 found!")
    r = f"{current_thread().getName()}||: {url}_{rand_num}\n"
    return r


def show_fut_results(fut: Future):
    """Callback for future shows results or shows error"""
    if not fut.exception():
        print(fut.result())
    else:
        print(f"{current_thread().getName()}|  Error: {fut.exception()}\n")


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=10) as pool:
        for i in URLS:
            _fut = pool.submit(do_some_work, i, 1, 10)
            _fut.add_done_callback(show_fut_results)

如果您想对线程进行更精细的控制,请使用threading模块:

from threading import Thread
from queue import Queue
from time import sleep
from random import randint
import logging

# imagine these are urls
URLS = [f"URL-{i}" for i in range(100)]

# number of worker threads
WORKER_NUM = 10


def do_some_work(url: str, a: int, b: int) -> str:
    """Simulates some work"""
    sleep(2)
    rand_num = randint(a, b)
    if rand_num == 5:
        raise ValueError(f"No! 5 found in URL: {url}")
    r = f"{url} = {rand_num}"
    return r


def thread_worker_func(q: Queue, a: int, b: int) -> None:
    """Target function for Worker threads"""
    logging.info("Started working")
    while True:
        try:
            url = q.get()

            # if poison pill - stop worker thread
            if url is None:
                break

            r = do_some_work(url, a, b)
            logging.info(f"Result: {r}")
        except ValueError as ex:
            logging.error(ex)
        except Exception as ex:
            logging.error(f"Unexpected error: {ex}")

    logging.info("Finished working")


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format="%(levelname)s | %(threadName)s | %(asctime)s | %(message)s",
    )
    in_q = Queue(50)
    workers = [
        Thread(target=thread_worker_func, args=(in_q, 1, 10, ), name=f"MyWorkerThread-{i+1}")
        for i in range(WORKER_NUM)
    ]
    [w.start() for w in workers]

    # start distributing tasks
    for _url in URLS:
        in_q.put(_url)

    # send poison pills to worker-threads
    for w in workers:
        in_q.put(None)

    # wait worker thread to join Main Thread
    logging.info("Main Thread waiting for Worker Threads")
    [w.join() for w in workers]

    logging.info("Workers joined")
    sleep(10)
    logging.info("App finished")

我想把线程名称 thread1 传递给 do_some_work() 函数……我正在尝试了解该名称是什么,以及如何调用它。 - Rhys
然而,我现在意识到...我可以创建一个[1,10]个空闲线程的列表。然后,在函数调用submit()中,随机选择一个空闲线程并将其分配。 - Rhys
你的随机数使用...或许是答案的关键。最好选择一些“空闲数字”,例如“线程1 [繁忙],线程2 [繁忙],线程3 [已选中]”......而不是使用“if rand_num == 5:”语句。在do_some_work()函数内部,使用Thread_Assignment来打印“[线程1加载纹理]”。 - Rhys
1
@Rhys 如果你想控制线程名称,你需要使用 threading.Thread 而不是 concurrent.futuretes.ThreadPoolExecutor。如果你查看 ThreadPoolExecutor 的源代码,你很快就会意识到它是围绕着 threading.Thread 进行包装的:https://github.com/python/cpython/blob/3.9/Lib/concurrent/futures/thread.py 如果你真的需要更多的控制,比如控制线程的名称,请选择 threading 模块。如果你愿意,我可以为你提供一些示例。 - Artiom Kozyrev
谢谢Artiom...如果您提供了threading.Thread的示例,我会将其标记为正确...否则我有可用的示例...我已经在threading.Thread中工作过... - Rhys
@Rhys添加了一个基于threading.Threadproducer-consumers模式的示例,使用了“毒丸”技术。 - Artiom Kozyrev

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接