问题是,处理可能需要10到20分钟的时间,在这段时间内我们没有响应消息,导致服务器将我们断开连接。
以下是我们消费者的伪代码:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
第一个任务完成后,BlockingConnection内部发生异常,抱怨套接字已重置。此外,RabbitMQ日志显示消费者由于未能及时响应而断开连接(它为什么要重置连接而不是发送FIN很奇怪,但我们不需要担心这个问题)。
我们进行了大量搜索,因为我们认为这是RabbitMQ的正常用例(有很多长时间运行的任务应该分配给许多消费者),但似乎没有其他人真正遇到这个问题。最后,我们偶然发现了一个线程,在那里建议使用心跳和在单独的线程中生成long_running_task()
。
所以代码变成了:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
这种方法似乎可行,但很混乱。我们确定ch
对象是线程安全的吗?此外,想象一下long_running_task()
正在使用该连接参数将任务添加到新队列中(即完成了此长过程的第一部分,让我们将任务发送到第二部分)。因此,线程正在使用connection
对象。那么这个线程安全吗?
更重要的是,有没有更好的方法来处理这个问题?我感觉这很混乱,可能不是线程安全的,所以也许我们做错了。谢谢!