在Python线程中消费RabbitMQ队列

12

这是一篇较长的文章。

我有一些用户名和密码列表。对于每个用户名和密码,我想要登录到账户并进行一些操作。为了更快地完成任务,我想使用多台机器。我的设想是有一个主机器,它的工作就是定期检查rabbitmq队列是否为空。如果队列为空,则从文件中读取用户名和密码列表并将其发送到rabbitmq队列。然后,有一堆订阅该队列的机器,它们的工作是接收用户/密码,对其执行操作,确认并继续下一个,直到队列为空,然后主机器再次填充它。至此,我认为我已经掌握了一切。

现在出现了问题。我已经检查过与每个用户名/密码相关的操作不太复杂,因此我可以让每个机器同时处理三个用户名/密码,使用Python的“线程”实现。事实上,我已经为单个机器实现了这一点,其中我将用户/密码加载到Python Queue()中,然后有三个线程消耗该Queue()。现在我想做类似的事情,但是每个机器的每个线程都应从rabbitmq队列中消耗。这就是我卡住的地方。为了进行测试,我开始使用rabbitmq的教程。

send.py:

import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

message = ' '.join(sys.argv[1:])
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
connection.close()

worker.py

import time, pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print ' [x] received %r' % (body,)
    time.sleep( body.count('.') )
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
channel.start_consuming()

对于上述内容,您可以运行两个 worker.py,它们将订阅 rabbitmq 队列并按预期消费。

没有 rabbitmq 的我的线程示例如下:

runit.py

class Threaded_do_stuff(threading.Thread):
    def __init__(self, user_queue):
        threading.Thread.__init__(self)
        self.user_queue = user_queue

    def run(self):
        while True:
            login = self.user_queue.get()
            do_stuff(user=login[0], pass=login[1])
            self.user_queue.task_done()

user_queue = Queue.Queue()
for i in range(3):
    td = Threaded_do_stuff(user_queue)
    td.setDaemon(True)
    td.start()

## fill up the queue
for user in list_users:
    user_queue.put(user)

## go!
user_queue.join()

这也像预期的那样工作:您填充了队列并有3个线程订阅它。现在我想做的是像runit.py那样,但不是使用python Queue(),而是使用类似worker.py的东西,其中队列实际上是一个rabbitmq队列。

这是我尝试过的东西,但没能成功(我不明白为什么)

rabbitmq_runit.py

import time, threading, pika

class Threaded_worker(threading.Thread):
    def callback(self, ch, method, properties, body):
        print ' [x] received %r' % (body,)
        time.sleep( body.count('.') )
        ch.basic_ack(delivery_tag = method.delivery_tag)

    def __init__(self):
        threading.Thread.__init__(self)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='hello')
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(self.callback, queue='hello')

    def run(self):
        print 'start consuming'
        self.channel.start_consuming()

for _ in range(3):
    print 'launch thread'
    td = Threaded_worker()
    td.setDaemon(True)
    td.start()

我期望这个程序将启动三个线程,每个线程都会被.start_consuming()阻塞,该方法只是在那里等待RabbitMQ队列发送信息。然而,实际上这个程序会启动,进行一些打印操作,然后退出。退出的模式也很奇怪:

launch thread
launch thread
start consuming
launch thread
start consuming

特别注意,有一个“开始消耗”的步骤缺失了。

发生了什么?

编辑:我在这里找到了一个类似问题的答案: Consuming a rabbitmq message queue with multiple threads (Python Kombu) 答案是“使用celery”,不管那是什么意思。我不同意,我不应该需要任何像celery一样复杂的东西。特别是,我不尝试设置RPC,并且我不需要阅读do_stuff例程的回复。

编辑2:我期望的打印模式应该如下。我执行:

python send.py first message......
python send.py second message.
python send.py third message.
python send.py fourth message.

打印的图案将会是:

launch thread
start consuming
 [x] received 'first message......'
launch thread
start consuming
 [x] received 'second message.'
launch thread
start consuming
 [x] received 'third message.'
 [x] received 'fourth message.'
1个回答

18
问题在于您将线程设置为守护线程(daemonic):
td = Threaded_worker()
td.setDaemon(True)  # Shouldn't do that.
td.start()

当主线程退出时,守护进程线程将被终止:(参考链接)

线程可以被标记为“守护线程”。这个标志的意义是,当只剩下守护线程时,整个Python程序将退出。初始值从创建线程继承。该标志可以通过守护进程属性进行设置。

去掉 setDaemon(True),你应该看到它表现出你期望的方式。

此外,pika FAQ有一个关于如何在线程中使用它的注意事项:

Pika在代码中没有任何线程概念。如果你想在Pika中使用线程,请确保每个线程都有一个Pika连接,并在该线程中创建。在不同的线程之间共享一个Pika连接是不安全的。

这表明您应该将所有在__init__()中做的事情移到run()中,以便在实际从队列中消耗时在同一线程中创建连接。


1
就是这样!太棒了,我没想到会有人会读这么长的帖子,更不用说回答它了。非常感谢你! - user1950164
1
另外为了完整性:https://www.rabbitmq.com/tutorials/amqp-concepts.html,章节“Channels”和“Connections”:最好每个线程有一个通道并共享连接,但是pika不支持。 - marcv81

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接