如何避免在未来不同时间启动(非常短暂)的操作时同时启动数百个线程?

5
我使用这种方法在不同的时间将少于一千个do_it调用启动了几十次:
import threading
timers = []
while True:
    for i in range(20):
        t = threading.Timer(i * 0.010, do_it, [i])    # I pass the parameter i to function do_it
        t.start()
        timers.append(t)  # so that they can be cancelled if needed
    wait_for_something_else() # this can last from 5 ms to 20 seconds

每个do_it调用的运行时间非常快(远小于0.1毫秒)且不阻塞。我想避免为这么简单的任务创建数百个新线程
如何只使用一个附加线程来处理所有do_it调用?
有没有一种使用Python的简单方法,不需要第三方库,只使用标准库?

所以你想确保不要用太多[并行]线程来超载你的系统?这可能与你有关:https://dev59.com/PGEi5IYBdhLWcg3w2fYm - Adam Smooch
@AdamSmooch,不,我不想使用多进程。相反,我想使用一个专用于所有这些“do_it”调用的单个线程。这是可能的,因为每个调用都是非阻塞的,几乎是瞬间完成的。 - Basj
2
所以你要寻找的模式是单个“工作线程”,你的代码将发布“作业”到“工作队列”中,而不是生成不同的线程。 - Adam Smooch
@AdamSmooch 是的,在这种情况下,你怎么能以简单的方式做到这一点呢?每个“job”都应该是可取消的,并且应该在特定时间发生,而不仅仅是一个接一个地发生。 - Basj
在Python中,我发现concurrent.futures库非常简单且非常有用。将max_workers值设置为1,您将拥有一个线程处理所有异步do_it调用。https://docs.python.org/3/library/concurrent.futures.html - Gui LeFlea
@Basj,你的问题中是否有任何当前答案未涉及的内容?如果有,请澄清一下。 - Will Da Silva
5个回答

6
据我理解,您希望有一个单一的工作线程可以处理提交的任务,不按照它们被提交的顺序,而是按照某种优先顺序。这似乎是线程安全的 queue.PriorityQueue 的工作。
from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[PrioritizedItem]):
    while True:
        do_it(q.get().item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    for i in range(20):
        q.put(PrioritizedItem(priority=i * 0.010, item=i))
    wait_for_something_else()

这段代码假设您想永远运行。如果不是这样,您可以在thread_worker中的q.get中添加超时,并在抛出queue.Empty异常时返回,因为超时已过期。这样,您将能够在处理完所有作业并且超时已过期后加入队列/线程。

如果您想等待到将来的某个特定时间再运行任务,则会变得更加复杂。以下方法通过在线程工作器线程中休眠,直到指定的时间到达,来扩展上述方法,但请注意time.sleep只有您的操作系统允许它精确到哪里

from dataclasses import astuple, dataclass, field
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class TimedItem:
    when: datetime
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[TimedItem]):
    while True:
        when, item = astuple(q.get())
        sleep_time = (when - datetime.now()).total_seconds()
        if sleep_time > 0:
            sleep(sleep_time)
        do_it(item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    now = datetime.now()
    for i in range(20):
        q.put(TimedItem(when=now + timedelta(seconds=i * 0.010), item=i))
    wait_for_something_else()

为了解决这个问题,我们只需使用一个额外的线程并在该线程中休眠,这样在工作线程休眠时可能会有新的优先级更高的任务进来。在这种情况下,工作线程将在完成当前任务后处理新的高优先级任务。上述代码假定这种情况不会发生,根据问题描述,这似乎是合理的。如果可能会发生这种情况,您可以修改休眠代码以重复轮询队列前面的任务是否到期。这种轮询方法的缺点是它会更加占用CPU。
另外,如果您能够保证任务提交给工作者之后相对顺序不会改变,那么您可以用普通的 queue.Queue 替换优先级队列,以简化代码。
这些“do_it”任务可以通过从队列中删除来取消。
上述代码使用以下模拟定义进行测试:
def do_it(x):
    print(x)

def wait_for_something_else():
    sleep(5)

一种不使用额外线程的替代方法是使用asyncio,正如smcjones所指出的那样。以下是一种使用asyncio的方法,通过使用loop.call_later在将来特定时间调用do_it:
import asyncio


def do_it(x):
    print(x)


async def wait_for_something_else():
    await asyncio.sleep(5)


async def main():
    loop = asyncio.get_event_loop()
    while True:
        for i in range(20):
            loop.call_later(i * 0.010, do_it, i)
        await wait_for_something_else()

asyncio.run(main())

这些do_it任务可以使用loop.call_later返回的句柄进行取消。
然而,采用这种方法需要将程序全部切换为使用asyncio,或在单独的线程中运行asyncio事件循环。

2

听起来你想要某些东西是非阻塞和异步的,但也是单进程和单线程的(一个线程专门用于do_it)。

如果是这种情况,特别是涉及到任何网络操作,只要你的主线程没有在进行严重的I/O操作,那么最好使用asyncio

它被设计用来处理非阻塞操作,并允许你在不等待响应的情况下进行所有请求。

示例:

import asyncio


def main():
    while True:
        tasks = []
        for i in range(20):
            tasks.append(asyncio.create_task(do_it(i)))  
        await wait_for_something_else()
        for task in tasks:
            await task

asyncio.run(main())

考虑到阻塞 I/O 的时间(以秒为单位)——你可能比单独生成线程来执行其他操作更多地浪费时间来管理线程。


问题指定了他们想要在未来的不同时间进行“do_it的调用”,但这种方法会在它们周围时立即调用它们,可能比预期的更早运行它们。 - Will Da Silva

0

正如您所说,在您的代码中,每个包含20个do_it调用的系列都是在wait_for_something_else完成后开始的,我建议在每次while循环迭代中调用join方法:

import threading
timers = []
while True:
    for i in range(20):
        t = threading.Timer(i * 0.010, do_it, [i])    # I pass the parameter i to function do_it
        t.start()
        timers.append(t)  # so that they can be cancelled if needed
    wait_for_something_else() # this can last from 5 ms to 20 seconds
    for t in timers[-20:]:
        t.join()

每次使用 threading.Timer 都会创建一个线程,但问题所述的目标是避免为 do_it 任务启动许多线程。具体而言,它问道:“我如何只使用一个额外的线程来处理所有 do_it 调用?” - Will Da Silva

0

我在 Python 的线程方面没有太多经验,所以请多包涵。concurrent.futures 库是 Python3 的一部分,使用起来非常简单。我为您提供了一个示例,让您看到它有多么直接。

Concurrent.futures 使用仅一个线程执行 do_it() 并发:

import concurrent.futures
import time

def do_it(iteration):
  time.sleep(0.1)
  print('do it counter', iteration)
 
def wait_for_something_else():
    time.sleep(1)
    print('waiting for something else')

def single_thread():
  with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    futures = (executor.submit(do_it, i) for i in range(20))
    for future in concurrent.futures.as_completed(futures):
        future.result()
 
def do_asap():
  wait_for_something_else()

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(single_thread), executor.submit(do_asap)] 
    for future in concurrent.futures.as_completed(futures):
        future.result()

上面的代码使用max_workers=1线程在单个线程中执行do_it()。在第13行,使用选项max_workers=1do_it()限制为单个线程,以将工作限制为仅一个线程。
在第22行,两种方法都被提交到concurrent.futures线程池执行器中。从第21-24行的代码使得两种方法可以在线程池中运行,并且do_it在单个非阻塞线程上运行。 concurrent.futures文档描述了如何控制线程数。当未指定max_workers时,分配给两个进程的总线程数为max_workers = min(32, os.cpu_count() + 4)

当您调用concurrent.futures.as_completed时,这将阻塞主线程,因此在所有do_it任务完成之前,wait_for_something_else不会运行。 - Will Da Silva
我的理解是,问题的作者想要在所有do_it调用按照设定时间后在后台执行时运行wait_for_something_elseconcurrent.futures.as_complete会阻塞直到所有do_it调用都完成,因此当do_it调用时,wait_for_something_else将无法运行。 - Will Da Silva
1
那是关于差异的很好的解释,@WillDaSilva。现在我明白了concurrent.futures使得在do_it进程运行时可以控制分配给它多少线程。concurrent.futures不会使第一种方法和第二种方法在独立的并发线程中运行。 - Gui LeFlea
我根据@WillDaSilva的观察,修改了我的答案,使do_it()在单个非阻塞线程中并发运行,而不会影响wait_for_something_else()。谢谢! - Gui LeFlea

0

do_it按顺序运行并可取消

在一个线程中运行所有do_it,并休眠特定时间(可能不使用sleep)

使用变量“should_run_it”来检查do_it是否应该运行或取消

就是这样吗?

import threading
import time

def do_it(i):
    print(f"[{i}] {time.time()}")

should_run_it = {i:True for i in range(20)}

def guard_do_it(i):
    if should_run_it[i]:
        do_it(i)

def run_do_it():
    for i in range(20):
        guard_do_it(i)
        time.sleep(0.010)

if __name__ == "__main__":
    t = threading.Timer(0.010, run_do_it)
    start = time.time()
    print(start)
    t.start()
    #should_run_it[5] = should_run_it[10] = should_run_it[15] = False # test
    t.join()
    end = time.time()
    print(end)
    print(end - start)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接