Moving messages between queues rabbitMQ
Asked Answered
C

5

13

I am looking to be able to move messages between queues (manually) in Rabbit.

For example:

first-queue has messages ['a','b','c','d','e','f']
second-queue has messages ['x','y']

I want to be able to move for example message 'a' to second-queue from first-queue. This can be a manual operation. Both queues are on the same broker, and I do not want to send them through any exchange. Is there anyway to do this? I have been playing with rabbitmqctl but can't seem to get it to work. I am open to any other tools that would allow me to accomplish this. Eventually I am hoping to have some sort of message selector (for example move all messages with some header field = X from first-queue to second-queue).

I am still new to rabbitmq and amqp but have been unable to find documentation on how to do this (if it is even possible).

Thanks.

Castoff answered 25/3, 2014 at 20:24 Comment(0)
F
16

@Dax - I just answered this same question here: Is it possible to move / merge messages between RabbitMQ queues?

I have a long description there. To avoid duplicate content I don't want to copy/paste.

It sounds like what you are looking for is the rabbitmq shovel plugin.

It is built into the core, simply enable it:

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

From the Admin section in the GUI you'll find an easy interface to create shovels.

See other post for me deets!

Favus answered 8/11, 2014 at 7:0 Comment(1)
I see the Move Messages option in the queues as an administrator, but users are not seeing this option. How do we give them access to this option with queues they and config, write, and read?Bouffe
L
8

The fact that it's not documented is because it's far away from messaging model.

It's easy to send a message to a specific queue - see tutorial #1 for example - but the only way to read messages is to consume them, in the order the broker send to the clients.

It's not allowed to select messages from a queue as you can do with SQL.

What you can do is to let a client (or eventually, a plugin, but this is an advanced topic) consume messages from a queue, and based on some rule you re-publish them to a subsequent queue or to another one.

Lighterage answered 25/3, 2014 at 21:5 Comment(0)
C
2

Here is a simple java code to move everything from one queue to another:

public void moveMessages(
            final String sourceQueueName,
            final String targetQueueName,
            final String rabbitmqHost,
            final String rabbitmqUsername,
            final String rabbitmqPassword,
            final String rabbitmqVirtualHost
) throws IOException {

        // Initialize the consuming and publishing channel
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(rabbitmqHost);
        factory.setUsername(rabbitmqUsername);
        factory.setPassword(rabbitmqPassword);
        factory.setVirtualHost(rabbitmqVirtualHost);
        Connection connection = factory.newConnection();

        Channel consumingChannel = connection.createChannel();
        Channel publishingChannel = connection.createChannel();


        while (true) {
            // Get the first message in the queue (auto ack = false)
            GetResponse response = consumingChannel.basicGet(sourceQueueName, false);

            if (response == null) {
                return;
            }

            BasicProperties properties = response.getProps();

            // Publish the message to the origin queue
            publishingChannel.txSelect();
            publishingChannel.basicPublish("", targetQueueName, (AMQP.BasicProperties) properties, response.getBody());
            publishingChannel.txCommit();

            // Acknowledge the message in the dead letter queue
            consumingChannel.basicAck(response.getEnvelope().getDeliveryTag(), false);
        }
    }
Craggy answered 4/6, 2015 at 13:47 Comment(0)
J
2

You can create a shovel using this curl :

curl 
-u  "user:password" 
-vvv 'http://localhost:15672/api/parameters/shovel/%2Foms/Move%20from%20sourceQueue' 
-X PUT 
-H 'content-type: application/json' 
--data-binary '
{
    "component": "shovel",
    "vhost": "/vhost",
    "name": "Move from sourceQueue",
    "value": {
        "src-uri": "amqp:///%2Fvhost",
        "src-queue": "sourceQueue",
        "src-protocol": "amqp091",
        "src-prefetch-count": 1000,
        "src-delete-after": "queue-length",
        "dest-protocol": "amqp091",
        "dest-uri": "amqp:///%2Fvhost",
        "dest-add-forward-headers": false,
        "ack-mode": "on-confirm",
        "dest-queue": "destQueue"
    }
}
' --compressed
Joacimah answered 22/11, 2019 at 22:15 Comment(0)
S
0

@rocksfrow answer is good, but if you need to do it using CLI here is how to do it:

  • Activate the rabbitmq_shovel plugin
  • Define a shovel to migrate all messages from QUEUE_TO_MIGRATE (vhost: OLD_VHOST) to QUEUE_DEST (vhost: NEW_VHOST)
rabbitmqctl set_parameter shovel migrate-QUEUE_TO_MIGRATE \
  '{"src-protocol": "amqp091", "src-uri": "amqp:///OLD_VHOST", "src-queue": "QUEUE_TO_MIGRATE",
    "dest-protocol": "amqp091", "dest-uri": "amqp:///NEW_VHOST", "dest-queue": "QUEUE_TO_MIGRATE"}'
  • Ensure there is no message anymore on QUEUE_TO_MIGRATE (vhost: OLD_VHOST)
  • Remove the shovel
rabbitmqctl clear_parameter shovel migrate-QUEUE_TO_MIGRATE

Source: official RabbitMQ blog post

Sicyon answered 6/5 at 13:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.