RabbitMQ: persistent message with Topic exchange
Asked Answered
A

3

74

I am very new to RabbitMQ.

I have set up a 'topic' exchange. The consumers may be started after the publisher. I'd like the consumers to be able to receive messages that have been sent before they were up, and that was not consumed yet.

The exchange is set up with the following parameters:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

The messages are published with this parameter:

delivery_mode => 2

Consumers use get() to retrieve the messages from the exchange.

Unfortunately, any message published before any client was up is lost. I have used different combinations.

I guess my problem is that the exchange does not hold messages. Maybe I need to have a queue between the publisher and the consumer. But this does not seem to work with a 'topic' exchange where messages are routed by a key.

How should I proceed? I use the Perl binding Net::RabbitMQ (shouldn't matter) and RabbitMQ 2.2.0.

Akmolinsk answered 27/5, 2011 at 5:57 Comment(0)
C
83

You need a durable queue to store messages if there are no connected consumers available to process the messages at the time they are published.

An exchange doesn't store messages, but a queue can. The confusing part is that exchanges can be marked as "durable" but all that really means is that the exchange itself will still be there if you restart your broker, but it does not mean that any messages sent to that exchange are automatically persisted.

Given that, here are two options:

  1. Perform an administrative step before you start your publishers to create the queue(s) yourself. You could use the web UI or the command line tools to do this. Make sure you create it as a durable queue so that it will store any messages that are routed to it even if there are no active consumers.
  2. Assuming your consumers are coded to always declare (and therefore auto-create) their exchanges and queues on startup (and that they declare them as durable), just run all your consumers at least once before starting any publishers. That will ensure that all your queues get created correctly. You can then shut down the consumers until they're really needed because the queues will persistently store any future messages routed to them.

I would go for #1. There may not be many steps to perform and you could always script the steps required so that they could be repeated. Plus if all your consumers are going to pull from the same single queue (rather than have a dedicated queue each) it's really a minimal piece of administrative overhead.

Queues are something to be managed and controlled properly. Otherwise you could end up with rogue consumers declaring durable queues, using them for a few minutes but never again. Soon after you'll have a permanently-growing queue with nothing reducing its size, and an impending broker apocalypse.

Crandell answered 27/5, 2011 at 17:29 Comment(3)
OK, so the solution is to declare fixed client queues in the publisher script. Of course this requires me to know in advances how many consumers there will be.Akmolinsk
That's true, assuming that each consumer will need its own queue. But the main question you need to answer is, "Will those consumers need all your historical messages which were sent before they ever came into being?". If they won't care about old messages, they can just declare their own queue on startup and receive all messages from that point on, but nothing older.Crandell
Applications "declare" queues and then the MQ broker creates them if they do not yet exist. Although it makes sense for listener applications to declare queues, and not sender applications, you run into the problem that you have seen. It is probably the best solution to declare queues, declare exchanges, create vhost, etc. before running an app.Surinam
C
20

As mentioned by Brian an exchange does not store messages and is mainly responsible for routing messages to either another exchange/s or queue/s. If the exchange is not bound to a queue, then all messages sent to that exchange will be 'lost'.

You should not need to declare fixed client queues in the publisher script since this might not be scalable. Queues can be created dynamically by your publishers and routed internally using exchange-to-exchange binding.

RabbitMQ supports exchange-to-exchange bindings that will allow for topology flexibility, decoupling and other benefits. You can read more here at RabbitMQ Exchange to Exchange Bindings [AMPQ]

RabbitMQ Exchange To Exchange Binding

Example Topology

Example Python code to create exchange-to-exchange binding with persistence if no consumer is present using queue.

#!/usr/bin/env python
import pika
import sys
 
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
 
 
#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
 
#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
 
#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
 
##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
 
#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
Charissa answered 27/6, 2014 at 5:37 Comment(6)
The "Eat All Messages" queue is missing, and according to me the messages will still not arrive at 'late' subscribersChrystal
Explain ? It definitely answers the OP questions and works. Be more constructive with your commentsCharissa
This is actually could work @KurtPattyn and @flyer as you at anytime can create a new consumer for Eat All Messages that can "recover" not processed messages from there, and route them to right placeMantinea
what @Mantinea said, just adding: recovering consumers must not consume the messages (no auto ack, close connection to that queue once you've seen all messages). This way you can use rabbitmq as event store - not sure they intended that.Anglosaxon
This "smells". As mbx wrote, this configures rabbitmq to be kind of an event store, and that's not how it is supposed to be used, imho. Rather look into using Kafka for your use case. The answer from Brian Kelly explains it perfectly.Kalb
It is not being used as an event store. As the poster requested and stated clients might not be connected and so the messages have to be temporarily queued before they get delivered. The new and shinny "Kafka" was not really ready at that time. How about submitting a full answer? Enjoy the rest of your day.Charissa
I
0

Your case seems to be "Message durability".

From RabbitMQ Tutorials docs, You need to mark both the queue and messages as durable (The code below as C# version. With other languages, you can prefer here).

  1. Firstly, In Publisher, You need to make sure that the queue will survive a RabbitMQ node restart. In order to do so, we need to declare it as durable:
channel.QueueDeclare(queue: "hello",
                     durable: true,
                     ....);
  1. Secondly, In Consumer, You need to mark your messages as persistent - by setting IBasicProperties.SetPersistent to true.
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Implead answered 2/6, 2021 at 2:14 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.