Error “unknown delivery tag” occurs when i try ack

2019-03-24 15:40发布

问题:

I want process messages in few threads but i'm getting error during execute this code:

from __future__ import with_statement
import pika
import sys
from pika.adapters.blocking_connection import BlockingConnection
from pika import connection, credentials
import time
import threading
import random
from pika.adapters.select_connection import SelectConnection
from pika.connection import Connection
import traceback


def doWork(body, args, channel):


    r = random.random()
    time.sleep(r * 10)
    try:        
        channel.basic_ack(delivery_tag=args.delivery_tag)

    except :
        traceback.print_exc()


auth = credentials.PlainCredentials(username="guest", password="guest")
params = connection.ConnectionParameters(host="localhost", credentials=auth)
conn = BlockingConnection(params)
channel = conn.channel()


while True:

    time.sleep(0.03)    
    try:

        method_frame, header_frame, body = channel.basic_get(queue="test_queue")
        if method_frame.NAME == 'Basic.GetEmpty':
            continue        

        t = threading.Thread(target=doWork, args=[body, method_frame, channel])
        t.setDaemon(True)
        t.start()

    except Exception, e:
        traceback.print_exc()
        continue

Error desctiption:

Traceback (most recent call last):
  File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue")
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack)
  File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get
    no_ack=no_ack))
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method
    self.connection.process_data_events()
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events
    self._handle_read()
  File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read
    self._on_data_available(data)
  File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available
    frame)                 # Args
  File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process
    callback(*args, **keywords)
  File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close
    frame.method.reply_text)
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204')

Versions: pika 0.9.5, rabbitMQ 2.6.1

回答1:

The problem probably is that you're setting no_ack=True like this:

consumer_tag = channel.basic_consume(
    message_delivery_event,
    no_ack=True,
    queue=queue,
)

And then acknowledging the messages:

channel.basic_ack(delivery_tag=args.delivery_tag)

You have to chose if you want to acknowledge or not and set the correct consume parameter.



回答2:

For me, it was just that I told the queue I wasn't going to ack, then I acked.

E.g. WRONG:

channel.basic_consume(callback, queue=queue_name, no_ack=True)

and then in my callback:

def callback(ch, method, properties, body):
  # do stuff
  ch.basic_ack(delivery_tag = method.delivery_tag)

RIGHT:

channel.basic_consume(callback, queue=queue_name, no_ack=False)

Bottom line: If you want to manually ack, set no_ack=False.

From the docs:

no_ack: (bool) if set to True, automatic acknowledgement mode will be used (see http://www.rabbitmq.com/confirms.html)



回答3:

I don't have a fix, but I can verify that it occurs using the BlockingConnection adapter.

It consistently occurs when acknowledging or rejecting a message that is being redelivered in response to a channel.basic_recover()

pika 0.9.5, rabbitMQ 2.2.0, python 2.7, and Erlang R14B01

The workaround I have in place is to always specify deliver_tag=0

I suspect that this only works if the message you are acking/nacking is the last one you've read (in stream). The library I'm writing abstracts the message in such a way that each one can be acknowledged independently, which breaks with this solution.

Can anyone confirm if this has been fixed or acknowledged by anyone on the pika team yet ? Or, could it be it an issue with RabbitMQ ?



回答4:

There is a bug with your code. You share a channel across threads. This is not supported by pika (see FAQ). You have 2 options:

  1. Define the no_ack=True flag in basic_get(...) and do not use the channel object in thread's function doWork(...)
  2. If you need to ACK message only after you have finished your work, then let the main thread (the while True: loop) handle the message ack (and not the worker thread). Below is a modified version of your code that does that.

    from __future__ import with_statement
    import pika
    import sys
    from pika.adapters.blocking_connection import BlockingConnection
    from pika import connection, credentials
    import time
    import threading
    import random
    from pika.adapters.select_connection import SelectConnection
    from pika.connection import Connection
    import traceback
    from Queue import Queue, Empty
    
    def doWork(body, args, channel, ack_queue):
        time.sleep(random.random())
        ack_queue.put(args.delivery_tag)
    
    def doAck(channel):
        while True:
            try:
                r = ack_queue.get_nowait()
            except Empty:
                r = None
            if r is None:
                break
            try:
                channel.basic_ack(delivery_tag=r)
            except:
                traceback.print_exc()
    
    auth = credentials.PlainCredentials(username="guest", password="guest")
    params = connection.ConnectionParameters(host="localhost", credentials=auth)
    conn = BlockingConnection(params)
    channel = conn.channel()
    # Create a queue for the messages that should be ACKed by main thread
    ack_queue = Queue()
    
    while True:
        time.sleep(0.03)    
        try:
            doAck(channel)
            method_frame, header_frame, body = channel.basic_get(queue="test_queue")
            if method_frame.NAME == 'Basic.GetEmpty':
                continue        
            t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue])
            t.setDaemon(True)
            t.start()
        except Exception, e:
            traceback.print_exc()
            continue
    


回答5:

After seeing RabbitMQ - upgraded to a new version and got a lot of "PRECONDITION_FAILED unknown delivery tag 1"

I changed my basic-consume to look like this:

    consumer_tag = channel.basic_consume(
        message_delivery_event,
        no_ack=True,
        queue=queue,
    )

This had the effect of causing the described error on initial (not redelivered) acknowledgements when the message's delivery tag was specified. The delivery was extracted from the message delivery's method structure.

Using

    channel.basic_ack(delivery_tag=0)

suppresses the error in this case, too

Looking at http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.html makes it seem as though it may be an issue in RabbitMQ.