在pika/RabbitMQ中处理长时间运行的任务

77
我们正在尝试设置一个基本的有向队列系统,在这个系统中,生产者将生成多个任务,一个或多个消费者将一次获取一个任务,处理它,并确认消息。
问题是,处理可能需要10到20分钟的时间,在这段时间内我们没有响应消息,导致服务器将我们断开连接。
以下是我们消费者的伪代码:
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

第一个任务完成后,BlockingConnection内部发生异常,抱怨套接字已重置。此外,RabbitMQ日志显示消费者由于未能及时响应而断开连接(它为什么要重置连接而不是发送FIN很奇怪,但我们不需要担心这个问题)。

我们进行了大量搜索,因为我们认为这是RabbitMQ的正常用例(有很多长时间运行的任务应该分配给许多消费者),但似乎没有其他人真正遇到这个问题。最后,我们偶然发现了一个线程,在那里建议使用心跳和在单独的线程中生成long_running_task()

所以代码变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

这种方法似乎可行,但很混乱。我们确定ch对象是线程安全的吗?此外,想象一下long_running_task()正在使用该连接参数将任务添加到新队列中(即完成了此长过程的第一部分,让我们将任务发送到第二部分)。因此,线程正在使用connection对象。那么这个线程安全吗?

更重要的是,有没有更好的方法来处理这个问题?我感觉这很混乱,可能不是线程安全的,所以也许我们做错了。谢谢!


1
我遇到了同样的问题。文档中说pika连接不是线程安全的https://pika.readthedocs.org/en/latest/faq.html - Cenk Alti
7个回答

40

目前最好的方法是关闭心跳,这样RabbitMQ在您阻塞时间过长时不会关闭连接。我正在尝试将pika的核心连接管理和IO循环运行在后台线程中,但它还不够稳定可靠。

pika v1.1.0中,此参数为ConnectionParameters(heartbeat=0)


1
正如 @Gavin 所提到的,目前最好的方法是在设置连接时关闭 pika 中的心跳。connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', virtual_host='TestVirtualHost', credentials=credentials, heartbeat_interval=0, port=5672)) - Darshan Patel
4
Pika 0.12.0有一个更好的解决方案,请参阅这个答案 - Luke Bakken
谢谢,它可以工作了。如果你还是无法使用,请注意heartbeat参数应该设置为消费者和生产者的双方。这就是我遇到的情况。 - xanjay
设置 ConnectionParameters(heartbeat=0) 是安全的。因为当您杀死此进程时,连接会立即自动关闭。您可以访问 http://rabbit_mq_server:15672/#/connections 进行验证。 - Lane

22
请不要禁用心跳!
从 Pika 0.12.0 开始,请使用此示例代码中描述的技术,在单独的线程上运行您的长时间运行任务,然后从该线程确认消息。

1
禁用心跳为什么是一件坏事? - PLPeeters
无论是RabbitMQ还是你的应用程序,在TCP连接丢失之前都不会检测到,直到下一次对该连接进行操作才会发现。 - Luke Bakken
我理解这一点,但根据使用情况,这并不一定是坏事。在我的情况下,当连接丢失时尝试确认消息时出现错误,比因为我的处理时间过长而导致连接关闭时尝试确认消息出现错误更好。这就是为什么我认为一般警告不要禁用心跳似乎是不合理的。这完全取决于使用情况,因此我认为更有成效的做法是说出为什么你可能应该考虑不禁用它们,而不是没有任何进一步信息地说“禁用=不好”。 - PLPeeters
1
这是最好且正确的解决方案。感谢@LukeBakken。 - Prateek Kumar Dalbehera
Stackoverflow的答案应该是自包含的。除了链接示例代码外,还应将示例代码内联。 - Zachary Vance
显示剩余2条评论

11

我遇到了你曾经遇到的同样问题。
我的解决方案是:

  1. 关闭服务器端心跳
  2. 评估任务可能需要的最长时间
  3. 将客户端心跳超时设置为步骤2得到的时间

为什么要这样做?

因为我测试了以下情况:

第一种情况
  1. 服务器心跳开启,1800秒
  2. 客户端未设置

当任务运行时间超过1800秒时,我仍然会出现错误。

第二种情况
  1. 关闭服务器心跳
  2. 关闭客户端心跳

在客户端没有任何错误,除了一个问题——当客户端崩溃(我的操作系统在某些故障时重新启动)时,Rabbitmq Management插件中仍然可以看到TCP连接。这让我感到困惑。

第三种情况
  1. 关闭服务器心跳
  2. 打开客户端心跳,并将其设置为预见的最长运行时间

在这种情况下,我可以动态地为每个单独的客户端更改心跳。实际上,我为频繁崩溃的机器设置了心跳。此外,我可以通过Rabbitmq Management插件看到离线机器。

环境

操作系统:CentOS x86_64
pika版本:0.9.13
RabbitMQ版本:3.3.1


你怎样开启客户端心跳呢?我找不到关于如何执行此操作的任何信息。 - Justin Thomas
3
您可以尝试类似这样的代码:params = pika.ConnectionParameters(host=self.__host, port=self.__port, credentials=credentials, heartbeat_interval=<您设定的秒数>) - daniel_of_service
我应该先尝试你的方法,这样就能避免很多头痛和焦虑。感谢你提供的有益见解。 - CharlesC

10
  1. 在您的long_running_task(connection)中,您可以定期调用connection.process_data_events()函数。当被调用时,该函数会向服务器发送心跳包,并使pika客户端远离关闭。
  2. 在您的pika BlockingConnection中设置心跳间隔时间大于调用connection.process_data_events()的周期。

1
connection.process_data_events() 帮我。 - Druta Ruslan
我的长时间运行的任务大部分是等待/休眠,所以这对我帮助很大。我相当惊讶于在pika中,“请告诉服务器我仍然活着”的功能被埋没且不直观,但很高兴终于找到了它。 - bbg

7

0

这里有一种使用线程处理的简单方法。如果消费者应用程序在当前作业完成之前不应该消耗另一个作业,则特别有用。可以随时发送确认,但在本例中,我选择仅在作业完成时发送它(线程不再活动)。

在其自己的线程中启动长时间运行的进程,然后使用对channel.process_data_events()的调用在循环中监视该线程。在主线程中保留连接对象的引用,因为它不是线程安全的。基本上:

import time
import pika
from threading import Thread
from functools import partial

rmqconn = pika.BlockingConnection( ... )
rmqchan = rmqconn.channel()
rmqchan.basic_consume(
    queue='test',
    on_message_callback=partial(launch_process,rmqconn)
)
rmqchan.start_consuming()

def launch_process(conn,ch,method,properties,body):
    runthread = Thread(target=run_process,args=body)
    runthread.start()
    while runthread.is_alive():
        time.sleep(2)
        conn.process_data_events()
    ch.basic_ack(delivery_tag=method.delivery_tag)

def run_process(body):
    #do the long-running thing
    time.sleep(10)

0
你还可以设置一个新的线程,在这个新线程中处理消息,并在该线程存活时调用.sleep来防止丢失心跳。以下是从github用户@gmr处摘取的示例代码块,以及未来参考的问题链接。
import re
import json
import threading

from google.cloud import bigquery
import pandas as pd
import pika
from unidecode import unidecode

def process_export(url, tablename):
    df = pd.read_csv(csvURL, encoding="utf-8")
    print("read in the csv")
    columns = list(df)
    ascii_only_name = [unidecode(name) for name in columns]
    cleaned_column_names = [re.sub("[^a-zA-Z0-9_ ]", "", name) for name in ascii_only_name]
    underscored_names = [name.replace(" ", "_") for name in cleaned_column_names]
    valid_gbq_tablename = "test." + tablename
    df.columns = underscored_names

    # try:
    df.to_gbq(valid_gbq_tablename, "some_project", if_exists="append", verbose=True, chunksize=10000)
    # print("Finished Exporting")
    # except Exception as error:
    #     print("unable to export due to: ")
    #     print(error)
    #     print()

def data_handler(channel, method, properties, body):
    body = json.loads(body)

    thread = threading.Thread(target=process_export, args=(body["csvURL"], body["tablename"]))
    thread.start()
    while thread.is_alive():  # Loop while the thread is processing
        channel._connection.sleep(1.0)
    print('Back from thread')
    channel.basic_ack(delivery_tag=method.delivery_tag)


def main():
    params = pika.ConnectionParameters(host='localhost', heartbeat=60)
    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    channel.queue_declare(queue="some_queue", durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(data_handler, queue="some_queue")
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    channel.close()

if __name__ == '__main__':
    main()

链接: https://github.com/pika/pika/issues/930#issuecomment-360333837


请参考Luke Bakken提供的解决方案。它是线程安全的,并且参考了pika文档中的官方示例。 - Prateek Kumar Dalbehera

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接