据我理解,您希望有一个单一的工作线程可以处理提交的任务,不按照它们被提交的顺序,而是按照某种优先顺序。这似乎是线程安全的
queue.PriorityQueue
的工作。
from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue
@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any=field(compare=False)
def thread_worker(q: PriorityQueue[PrioritizedItem]):
while True:
do_it(q.get().item)
q.task_done()
q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
for i in range(20):
q.put(PrioritizedItem(priority=i * 0.010, item=i))
wait_for_something_else()
这段代码假设您想永远运行。如果不是这样,您可以在thread_worker
中的q.get
中添加超时,并在抛出queue.Empty
异常时返回,因为超时已过期。这样,您将能够在处理完所有作业并且超时已过期后加入队列/线程。
如果您想等待到将来的某个特定时间再运行任务,则会变得更加复杂。以下方法通过在线程工作器线程中休眠,直到指定的时间到达,来扩展上述方法,但请注意time.sleep
只有您的操作系统允许它精确到哪里。
from dataclasses import astuple, dataclass, field
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from typing import Any
from queue import PriorityQueue
@dataclass(order=True)
class TimedItem:
when: datetime
item: Any=field(compare=False)
def thread_worker(q: PriorityQueue[TimedItem]):
while True:
when, item = astuple(q.get())
sleep_time = (when - datetime.now()).total_seconds()
if sleep_time > 0:
sleep(sleep_time)
do_it(item)
q.task_done()
q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
now = datetime.now()
for i in range(20):
q.put(TimedItem(when=now + timedelta(seconds=i * 0.010), item=i))
wait_for_something_else()
为了解决这个问题,我们只需使用一个额外的线程并在该线程中休眠,这样在工作线程休眠时可能会有新的优先级更高的任务进来。在这种情况下,工作线程将在完成当前任务后处理新的高优先级任务。上述代码假定这种情况不会发生,根据问题描述,这似乎是合理的。如果可能会发生这种情况,您可以修改休眠代码以重复轮询队列前面的任务是否到期。这种轮询方法的缺点是它会更加占用CPU。
另外,如果您能够保证任务提交给工作者之后相对顺序不会改变,那么您可以用普通的
queue.Queue
替换优先级队列,以简化代码。
这些“do_it”任务可以通过从队列中删除来取消。
上述代码使用以下模拟定义进行测试:
def do_it(x):
print(x)
def wait_for_something_else():
sleep(5)
一种不使用额外线程的替代方法是使用asyncio,正如smcjones所指出的那样。以下是一种使用asyncio的方法,通过使用
loop.call_later
在将来特定时间调用
do_it
:
import asyncio
def do_it(x):
print(x)
async def wait_for_something_else():
await asyncio.sleep(5)
async def main():
loop = asyncio.get_event_loop()
while True:
for i in range(20):
loop.call_later(i * 0.010, do_it, i)
await wait_for_something_else()
asyncio.run(main())
这些
do_it
任务可以使用
loop.call_later
返回的句柄进行取消。
然而,采用这种方法需要将程序全部切换为使用asyncio,或在单独的线程中运行asyncio事件循环。
max_workers
值设置为1,您将拥有一个线程处理所有异步do_it调用。https://docs.python.org/3/library/concurrent.futures.html - Gui LeFlea