inequivalent arg 'durable' for queue
Asked Answered
W

3

19

I need some help. I have this code below and the logs tell me that I'm not passing the durable parameter, but as you can see I'm passing. I have tried everything and there is always the same mistake.

To mount this code I used these links:

https://github.com/sk2/ANK-NG/blob/master/rabbitmq/rabbit_websocket.py

https://pika.readthedocs.io/en/0.10.0/examples/tornado_consumer.html

websocket.py:

import pika
import tornado
import tornado.websocket as websocket
from tornado.options import options, define, parse_command_line
import tornado.httpserver
import tornado.ioloop
import tornado.wsgi
from pika.adapters.tornado_connection import TornadoConnection
from witbot import recebeInput
import logging

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)


class MyWebSocketHandler(websocket.WebSocketHandler):
    def allow_draft76(self):
        # for iOS 5.0 Safari
        return True

    def check_origin(self, origin):
        return True

    def open(self, *args, **kwargs):
        self.application.pc.add_event_listener(self)
        print ("WebSocket opened")

    def on_close(self):
        print ("WebSocket closed")
        self.application.pc.remove_event_listener(self)

    def on_message(self, message):
        print (message)
        self.application.pc.send_message(message)

class PikaClient(object):
    def __init__(self, io_loop):
        print ('PikaClient: __init__')
        self.io_loop = io_loop
        self.connected = False
        self.connecting = False
        self.connection = None
        self.channel = None
        self.event_listeners = set([])

    def connect(self):
        logging.basicConfig(level=logging.DEBUG)
        if self.connecting:
            print ('PikaClient: Already connecting to RabbitMQ')
            return

        print ('PikaClient: Connecting to RabbitMQ')
        self.connecting = True

        port = 5672
        cred = pika.PlainCredentials(username='guest', password='guest')
        param = pika.ConnectionParameters(host='192.168.99.100',port=port,credentials=cred,heartbeat_interval=5)
        print(param)
        self.connection = TornadoConnection(param,
            on_open_callback=self.on_connected)
        print(self.connection)
        self.connection.add_on_close_callback(self.on_closed)

    def on_connected(self, connection):
        print ('PikaClient: connected to RabbitMQ')
        self.connected = True
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
        print ('PikaClient: Channel open, Declaring exchange')
        self.channel = channel
        self.setup_exchange('amq.topic')


    def setup_exchange(self, exchange_name):
        print('Declaring exchange %s', exchange_name)
        self.channel.exchange_declare(self.on_exchange_declareok, exchange=exchange_name,exchange_type='topic',durable=True)

    def on_exchange_declareok(self, unused_frame):
        print('Exchange declared')
        self.setup_queue('bot.commands')

    def setup_queue(self, queue_name):
        print('Declaring queue %s', queue_name)
        self.channel.queue_declare(self.on_queue_declareok, queue_name)

    def on_queue_declareok(self, method_frame):
        print ('Binding %s to %s with %s', 'amq.topic', 'bot.commands', 'bot.commands')
        self.channel.queue_bind(self.on_bindok, 'bot.commands',
                                 'amq.topic', 'bot.commands')
    def on_bindok(self, unused_frame):
        print('Queue bound')
        self.start_consuming()

    def start_consuming(self):
        print('Issuing consumer related RPC commands')
        self._consumer_tag = self.channel.basic_consume(self.on_message,'bot.commands')

    def on_closed(self, connection):
        print ('PikaClient: rabbit connection closed')
        self.io_loop.stop()

    def on_message(self, channel, method, header, body):
        print ('PikaClient: message received: %s' % body)
        self.notify_listeners(body)

    def send_message(self, body):
        self.channel.basic_publish(exchange='topic',
                      routing_key='bot.commands',
                      body=body)

    def notify_listeners(self, body):
        bot = recebeInput.decisoes(recebeInput.devolveEntidade(recebeInput.trataJson(body)))
        for listener in self.event_listeners:
            listener.write_message(bot)
            print ('PikaClient: notified %s' % repr(listener))

    def add_event_listener(self, listener):
        print ("added listener")
        self.event_listeners.add(listener)
        print ('PikaClient: listener %s added' % repr(listener))

    def remove_event_listener(self, listener):
        try:
            self.event_listeners.remove(listener)
            print ('PikaClient: listener %s removed' % repr(listener))
        except KeyError:
            pass


def main():
    parse_command_line()
    aplication = tornado.web.Application([
        (r'/ws', MyWebSocketHandler)
    ])
    # server = tornado.httpserver.HTTPServer(aplication)
    io_loop = tornado.ioloop.IOLoop.instance()
    # PikaClient is our rabbitmq consumer
    pc = PikaClient(io_loop)
    aplication.pc = pc
    aplication.pc.connect()
    aplication.listen(8081)
    io_loop.start()

if __name__ == '__main__':
    main()

Error log:

websocket_1  | PikaClient: connected to RabbitMQ
websocket_1  | PikaClient: Channel open, Declaring exchange
websocket_1  | Declaring exchange %s amq.topic
websocket_1  | Exchange declared
websocket_1  | Declaring queue %s bot.commands
rabbit1_1    | 2018-07-17 15:48:18.792 [info] <0.6936.0> connection
<0.6936.0> (172.18.0.1:37556 -> 172.18.0.6:5672): user 'guest' authenticated and granted access to vhost '/'
websocket_1  | [W 180717 15:48:18 channel:1034] Received remote Channel.Close (406): "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'bot.commands' in vhost '/': received 'false' but current is 'true'" on <Channel number=1 OPEN conn=<TornadoConnection OPEN socket=('172.18.0.2', 47986)->('192.168.99.100', 5672) params=<ConnectionParameters host=192.168.99.100 port=5672 virtual_host=/ ssl=False>>>
websocket_1  | [I 180717 15:48:24 web:2162] 101 GET /ws (192.168.99.1) 1.29ms
websocket_1  | added listener
websocket_1  | PikaClient: listener <__main__.MyWebSocketHandler object at 0x7f483b277cf8> added
websocket_1  | WebSocket opened
Wame answered 17/7, 2018 at 15:54 Comment(1)
Probably you are creating non-durable queue on a durable exchange. Try self.channel.queue_declare(self.on_queue_declareok, queue_name, durable=True).Subirrigate
R
18

Perhaps the queue already exists and is durable so when your library attempts to subscribe to it and sees a durable it throws an error because it is expecting a non-durable queue

Rub answered 26/4, 2019 at 10:36 Comment(1)
it was also my case, I had to add the parameter durable=False or True to the procedure declaring the exchange and the queue.Nightie
H
1

For me, the queue exists. I deleted it by rabbitmqadmin delete queue name=name_of_queue and run program again

Hakan answered 13/11, 2021 at 11:8 Comment(1)
Concise answer. To add an example scenario - anytime you create a queue with default values and then subsequently go to change its durability by changing the argument in the queue_declare() method, you'll get the error cited in the OP.Dignadignified
R
0

As you are using nodeJs amqp lib and you faced with the same issue, try to specify options when specifying queue :

connection.queue('your_queue_name',{'durable':true}, function (q) {
          // Catch all messages
          q.bind('#');
        
          // Receive messages
          q.subscribe(function (message) {
            // Print messages to stdout
            console.log(message);
          });
});
Reface answered 13/10, 2022 at 13:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.