需要一个线程安全的异步消息队列。

9

我正在寻找一个Python类(最好是标准语言的一部分,而不是第三方库),来管理异步“广播式”消息。

我将有一个线程将消息放入队列中(“putMessageOnQueue”方法不能阻塞),然后多个其他线程将等待消息,预计调用一些阻塞的“waitForMessage”函数。当一个消息被放置在队列中时,我希望每个等待的线程都能得到自己的消息副本。

我已经查看了内置的Queue类,但我认为这并不适合,因为消费消息似乎涉及从队列中移除它们,所以只有1个客户端线程会看到每个消息。

这似乎应该是一个常见的用例,有人可以推荐一个解决方案吗?


我相信你可以建立自己的类来跟踪哪个线程收到了哪个消息,而不会遇到太多问题。 - Bakuriu
2个回答

8

我认为通常的做法是为每个线程使用一个单独的消息队列,并将消息推送到之前已经注册了接收此类消息的每个队列中。

像这样的代码应该可以工作,但它是未经测试的...

from time import sleep
from threading import Thread
from Queue import Queue

class DispatcherThread(Thread):

   def __init__(self, *args, **kwargs):
       super(DispatcherThread, self).__init__(*args, **kwargs)
       self.interested_threads = []

   def run(self):
       while 1:
           if some_condition:
               self.dispatch_message(some_message)
           else:
               sleep(0.1)

   def register_interest(self, thread):
       self.interested_threads.append(thread)

   def dispatch_message(self, message):
       for thread in self.interested_threads:
           thread.put_message(message)



class WorkerThread(Thread):

   def __init__(self, *args, **kwargs):
       super(WorkerThread, self).__init__(*args, **kwargs)
       self.queue = Queue()


   def run(self):

       # Tell the dispatcher thread we want messages
       dispatcher_thread.register_interest(self)

       while 1:
           # Wait for next message
           message = self.queue.get()

           # Process message
           # ...

   def put_message(self, message):
       self.queue.put(message)


dispatcher_thread = DispatcherThread()
dispatcher_thread.start()

worker_threads = []
for i in range(10):
    worker_thread = WorkerThread()
    worker_thread.start()
    worker_threads.append(worker_thread)

dispatcher_thread.join()

太棒了,运行得很顺利!遗憾的是没有现成的版本,但我猜原则并不那么复杂,只要有人清楚地解释一下(就像你做的那样)。 - codebox
@codebox 好吧,在 multiprocessing 模块中有更好的支持,但那是针对子进程而不是线程的。我猜这是因为进程间通信通常比线程间通信更复杂,因为线程自然共享同一堆。 - Aya
如果您需要从一个写入者进行广播,队列是否是最佳解决方案?也许更好的方法是使用一种只写一次/多次读取的结构,每个进程都可以在自己的空闲时间并发地读取它? - fralau

2

我认为这是一个更直观的例子(摘自Python Lib中的队列示例)

from threading import Thread
from Queue import Queue


num_worker_threads = 2

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

1
那怎么满足问题的要求呢?他明确表示队列不起作用,因为每个线程都需要该项的副本。 - Wlerin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接