AMQP RabbitMQ消费者相互阻塞?

4
我编写了一个 C (rabbitmq-c) 工作程序,它消费由 Python 脚本 (pika) 发布的队列。
我遇到了以下奇怪的问题,无法解决:
1. 在将消息发布到队列之前启动所有工作程序可以按预期工作。 2. 在队列已经被发布后启动 1 个工作程序可以按预期工作。 3. 但是:在一个工作程序已经开始从队列中消费后启动其他工作程序,这些工作程序就看不到队列中的任何消息(消息计数为 0),因此只能等待(即使队列中仍应有许多消息)。杀死第一个工作程序将突然启动消息流到所有其他(等待)消费者。
有什么想法吗?
我尝试确保每个消费者都有自己的通道(这是必要的吗?),但仍然出现相同的行为...
以下是消费者(工作程序)的代码:
conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
           "/",
           0,
           131072,
           0,
           AMQP_SASL_METHOD_PLAIN,
           "guest",
           "guest");

if (amqp_channel_open(conn, chan) == NULL)
    LOG_ERR(" [!] Failed to open amqp channel!\n");

if ((q = amqp_queue_declare(conn,
                            chan,
                            amqp_cstring_bytes("ranges"),
                            0,
                            0,
                            0,
                            0,
                            amqp_empty_table)) == NULL)
    LOG_ERR(" [!] Failed to declare queue!\n");

LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);

amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);

while(1) {
    amqp_maybe_release_buffers(conn);
    amqp_consume_message(conn, &e, NULL, 0);

    {
        int n;
        amqp_frame_t f;
        unsigned char buf[8];
        unsigned char *pbuf = buf;

        amqp_simple_wait_frame(conn, &f);       // METHOD frame
        amqp_simple_wait_frame(conn, &f);       // HEADER frame

        n = f.payload.properties.body_size;
        if (n != sizeof(range_buf))
            LOG_ERR(" [!] Invalid message size!");

        while (n) {
            amqp_simple_wait_frame(conn, &f);   // BODY frame
            memcpy(pbuf,
                   f.payload.body_fragment.bytes,
                   f.payload.body_fragment.len);
            n -= f.payload.body_fragment.len;
            pbuf += f.payload.body_fragment.len;
        }

        // do something with buf

        LOG_INFO(" [x] Message recevied from queue\n");
    }

    amqp_destroy_envelope(&e);

    amqp_maybe_release_buffers(conn);
}

我使用Pika 0.9.14客户端也遇到了相同的问题。不幸的是,目前还没有任何线索。 - Coleman S
1
你有检查 prefetch_count 吗? - Vor
2个回答

4
问题很可能是您的消费者在启动时预取了所有消息。这是RabbitMQ的默认行为,但您可以通过减少消费者预取的消息数量来更好地将工作负载分散到多个工作者中。
这意味着一个或多个消费者将拾取所有消息,并留下没有新消费者的消息。
如果您对消费者应用qos并将预取限制为10条消息,那么消费者将仅排队前10条消息,新的消费者可以接管剩余的工作。
要实现此功能,您需要使用名为amqp_basic_qos的函数,此外,您还可以在here中了解有关消费者预取的更多信息。

这正是问题所在(邮件列表中的某个人指出了这一点)。我使用 QOS 来限制消费者一次只获取 1 条消息(正是我想要的)。我还手动确认了这些消息。 - depletionmode

0

这可能会对你有所帮助

消息确认

执行任务可能需要几秒钟的时间。您可能会想知道,如果其中一个消费者开始了一项长时间的任务并在处理过程中死亡,会发生什么情况。根据我们当前的代码,一旦RabbitMQ将消息传递给消费者,它就会立即从内存中删除该消息。在这种情况下,如果您杀死一个工作进程,我们将丢失它刚刚正在处理的消息。我们还将失去所有已分派给此特定工作进程但尚未处理的消息。

但是我们不想丢失任何任务。如果工作进程死亡,我们希望将任务交付给另一个工作进程。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。消费者向RabbitMQ发送确认(acknowledgement)以告知其已接收、处理特定消息,并且RabbitMQ可以删除该消息。

如果消费者在没有发送确认的情况下死亡,则RabbitMQ将理解到消息未被完全处理,并将重新将其传递给另一个消费者。这样,即使工作进程偶尔死亡,您也可以确保不会丢失任何消息。

没有任何消息超时;只有在工作进程连接断开时,RabbitMQ才会重新传递消息。即使处理消息需要非常长的时间,也没有问题。

消息确认默认已打开。


是的,我知道消息确认。问题在于其他工作人员根本没有收到任何消息。如果他们检查队列消息计数,它为零,但是一旦我杀死第一个工作人员,其他工作人员突然开始接收到消息。 - depletionmode

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接