使用pika(python)向RabbitMQ确认消息时出现“未知的传递标签”错误

26
我想在几个线程中处理消息,但在执行此代码时出现错误:
from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, args=[body, method_frame, channel])
        t.setDaemon(True)
        t.start()

    except Exception, e:
        traceback.print_exc()
        continue

错误描述:

Traceback (most recent call last):
  File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue")
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack)
  File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get
    no_ack=no_ack))
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method
    self.connection.process_data_events()
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events
    self._handle_read()
  File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read
    self._on_data_available(data)
  File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available
    frame)                 # Args
  File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process
    callback(*args, **keywords)
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close
    frame.method.reply_text)
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204')

版本信息:pika 0.9.5,rabbitMQ 2.6.1


昨天我尝试使用py-amqplib库,而不是pika。它运行良好。很可能是pika库出了问题。 - solo117
1
如果您想在多个线程之间共享代码,应该使用像rabbitpy或amqp-storm这样的线程安全库。不确定py-amqplib是否是线程安全的。https://github.com/eandersson/amqp-storm - eandersson
7个回答

47
问题可能是你这样设置了 no_ack=True
consumer_tag = channel.basic_consume(
    message_delivery_event,
    no_ack=True,
    queue=queue,
)

然后确认这些信息:

channel.basic_ack(delivery_tag=args.delivery_tag)

你需要选择是否确认并设置正确的消费参数。


1
我的代码出现问题的根本原因是同步问题和配置问题。我有一个简单的包装来创建RabbitMQ消费者。当消费临时队列(channel.queueDeclare("", false, true, true, args).getQueue())时,nextDelivery需要在多线程环境中进行保护。这意味着,如果你收到一条消息,在消费其他消息之前,你需要先确认它。否则,当调用ack时,它会抛出异常并在消费时不断抛出异常... - DeepNightTwo
这正是我遇到的问题,非常感谢。 - Rob
1
在一次糟糕的合并后,我遇到了这个错误,其中一个消息使用相同的传递标签被确认了两次。 - blockloop

14

对我来说,就是我告诉队列我不会确认(ack),然后我却确认了。

例如,错误的做法

channel.basic_consume(callback, queue=queue_name, no_ack=True)

然后在我的回调函数中:

def callback(ch, method, properties, body):
  # do stuff
  ch.basic_ack(delivery_tag = method.delivery_tag)

正确:

channel.basic_consume(callback, queue=queue_name, no_ack=False)
底线: 如果你想手动确认,就将no_ack设置为False。
从文档中可以看到:

no_ack: (bool) 如果设置为True,将使用自动确认模式(参见http://www.rabbitmq.com/confirms.html)


谢谢。这真的很有用。我看到的问题是参数名称(no_ack或在.NET中为noAck)有点令人困惑。我觉得它应该被称为“ack”,如果你传递true,它将确认消息。 - Hanlet Escaño

4

你的代码存在一个bug。你在不同线程间共用了同一个通道,而这在pika中是不被支持的(请参考FAQ)。你有两个选择:

  1. Define the no_ack=True flag in basic_get(...) and do not use the channel object in thread's function doWork(...)
  2. If you need to ACK message only after you have finished your work, then let the main thread (the while True: loop) handle the message ack (and not the worker thread). Below is a modified version of your code that does that.

    from __future__ import with_statement
    import pika
    import sys
    from pika.adapters.blocking_connection import BlockingConnection
    from pika import connection, credentials
    import time
    import threading
    import random
    from pika.adapters.select_connection import SelectConnection
    from pika.connection import Connection
    import traceback
    from Queue import Queue, Empty
    
    def doWork(body, args, channel, ack_queue):
        time.sleep(random.random())
        ack_queue.put(args.delivery_tag)
    
    def doAck(channel):
        while True:
            try:
                r = ack_queue.get_nowait()
            except Empty:
                r = None
            if r is None:
                break
            try:
                channel.basic_ack(delivery_tag=r)
            except:
                traceback.print_exc()
    
    auth = credentials.PlainCredentials(username="guest", password="guest")
    params = connection.ConnectionParameters(host="localhost", credentials=auth)
    conn = BlockingConnection(params)
    channel = conn.channel()
    # Create a queue for the messages that should be ACKed by main thread
    ack_queue = Queue()
    
    while True:
        time.sleep(0.03)    
        try:
            doAck(channel)
            method_frame, header_frame, body = channel.basic_get(queue="test_queue")
            if method_frame.NAME == 'Basic.GetEmpty':
                continue        
            t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue])
            t.setDaemon(True)
            t.start()
        except Exception, e:
            traceback.print_exc()
            continue
    

2

我没有解决方案,但我可以验证使用BlockingConnection适配器时出现问题。

当响应于channel.basic_recover()重新传递的消息被确认或拒绝时,它总是发生。

该问题出现在pika 0.9.5,rabbitMQ 2.2.0,python 2.7和Erlang R14B01中。

我目前的解决方法是始终指定deliver_tag = 0。

我怀疑这仅在您确认/否认的消息是您读取的最后一条消息(在流中)时才有效。我编写的库以这种方式抽象出每个消息,使每个消息都可以独立确认,这与此解决方案不兼容。

有人能否确认Pika团队是否已经修复或确认了此问题?或者,这可能是RabbitMQ的问题吗?


我在使用node-amqp时看到了这个错误,所以这一定是与RabbitMQ(版本3.0.2-1)有关的问题。 - alexfernandez

1
如果你尝试在与消息创建不同的通道上确认消息,也会遇到此错误。如果你正在关闭或重新创建通道,则可能会发生这种情况。
从文档中了解更多信息: https://www.rabbitmq.com/confirms.html 经纪人会因为在其他信道上尝试确认(无论是积极的还是消极的)投递时而抱怨“未知的传递标签”,这是另一种情况。必须在同一信道上确认传递。

0

看到RabbitMQ - 升级到新版本并收到了很多“PRECONDITION_FAILED unknown delivery tag 1”错误信息

我将我的基本消费者更改为以下内容:

    consumer_tag = channel.basic_consume(
        message_delivery_event,
        no_ack=True,
        queue=queue,
    )

当消息的传递标签被指定时,这会导致初始(非重新传递的)确认出现描述的错误。传递是从消息传递的方法结构中提取的。

使用

    channel.basic_ack(delivery_tag=0)

在这种情况下,它也会抑制错误。

查看http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.html,似乎可能是RabbitMQ的问题。


0

这个问题是由于您设置了 { noack: true },但仍然尝试发送确认而引起的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接