RabbitMQ - Send message to a particular consumer in a queue
Asked Answered
W

3

5

This is the scenario - There are multiple app servers. Browser can connect via websocket to any app server.

The app servers (consumers) are all listening on a particular queue. As soon as a web socket connection is received, the particular app server binds the queue with a routing key {userId} to a direct exchange.

I want a message sent to the direct exchange with the routing key {userId} to be received by only the particular app server where the binding has occured.

Is a direct exchange the right exchange to use in this case? Or should some other type of exchange be used?

I'm using spring-amqp to create dynamic bindings when a websocket comes in

// create the RabbitMq queue and bind to it
String routingKey = MessageConstants.getRoutingKeyForUserRecommendationQueue(user);
Binding userRecommendationBinding = BindingBuilder.bind(userRecommendationsQueue).
    to(directExchange).with(routingKey);
amqpAdmin.declareBinding(userRecommendationBinding);
Westbrook answered 3/3, 2016 at 21:19 Comment(0)
P
5

Actually you go right way.

And yes: Direct Exchange with an appropriate binding should save you.

See more info in the RabbitMQ Tutorial: http://www.rabbitmq.com/tutorials/tutorial-four-java.html

Also take a look into Spring AMQP Samples on the matter: https://github.com/spring-projects/spring-amqp-samples/tree/master/rabbitmq-tutorials

UPDATE

Unfortunately that is not what is happening. The messages seem to go randomly to any consumer, and not just the consumer that created the binding.

M-m-m. That's possible, because we route only my the key, but after that the message is placed to the queue, which may have several consumers on different machines.

In this case yes: the dynamic binding doesn't help.

You should consider to create an unique new queue (auto-deleted is fine) and bind and listen exactly from that. The SimpleMessageListenerContainer supports addQueues() at runtime to start a new consumer for a new queue.

I think that should work for you.

You still shouldn't do anything on the producer side: the same exhchange and routingKey logic.

Petrozavodsk answered 3/3, 2016 at 21:31 Comment(8)
Unfortunately that is not what is happening. The messages seem to go randomly to any consumer, and not just the consumer that created the binding.Westbrook
BTW, I could swear it was working before we moved from RabbitMQ 3.4 to 3.6Westbrook
Sorry, misunderstood from the first sight. Please, take a look into my updatePetrozavodsk
Thanks @Artem and Derick, your answers are the same. This works, but will having 1000s of queues cause performance issues? We expect that there will be about 10000 queues. Also, any idea what is the overhead of creating a queue vs binding?Westbrook
That's doesn't matter because I'm sure your queues should be deleted after the subscription is gone. Your queues don't make sense on the Broker if they aren't bound to to some exchanges. BTW, by default all queues are bound to the default direct exchange by their names. Jut for convenience. So, you may really don't need to worry about bindings but just send messages to the default exchange with queue names as a routingKey.Petrozavodsk
Artem and Derick, I cannot create an exclusive/autodelete queue because a user could open multiple tabs. What I need is a way to disconnect the consumer BUT keep the queue. Queue will expire because of the "x-expires" tag that I put on it. Can I disconnect a consumer without deleting a queue using rabbitmq-amqp ?Westbrook
I guess SimpleMessageListenerContainer.removeQueue will just remove the consumer. Correct me if I'm wrong.Westbrook
Yes, that's correct it stops to listen to those queues, therefore remove consumer on the RabbitMQ side.Petrozavodsk
C
7

Send message to a particular consumer in a queue

this is not possible. any consumer connected to a queue has a chance of consuming any given message in the queue

I want a message sent to the direct exchange with the routing key {userId} to be received by only the particular app server where the binding has occured.

you can do this by creating exclusive / autoDelete queues for your consumer, with a binding that directs all messages for that consumer to that queue.

Is a direct exchange the right exchange to use in this case?

either a direct exchange or a topic exchange is fine. direct exchange is slightly easier to understand, but topic exchange is more flexible

Carminacarminative answered 3/3, 2016 at 22:12 Comment(0)
P
5

Actually you go right way.

And yes: Direct Exchange with an appropriate binding should save you.

See more info in the RabbitMQ Tutorial: http://www.rabbitmq.com/tutorials/tutorial-four-java.html

Also take a look into Spring AMQP Samples on the matter: https://github.com/spring-projects/spring-amqp-samples/tree/master/rabbitmq-tutorials

UPDATE

Unfortunately that is not what is happening. The messages seem to go randomly to any consumer, and not just the consumer that created the binding.

M-m-m. That's possible, because we route only my the key, but after that the message is placed to the queue, which may have several consumers on different machines.

In this case yes: the dynamic binding doesn't help.

You should consider to create an unique new queue (auto-deleted is fine) and bind and listen exactly from that. The SimpleMessageListenerContainer supports addQueues() at runtime to start a new consumer for a new queue.

I think that should work for you.

You still shouldn't do anything on the producer side: the same exhchange and routingKey logic.

Petrozavodsk answered 3/3, 2016 at 21:31 Comment(8)
Unfortunately that is not what is happening. The messages seem to go randomly to any consumer, and not just the consumer that created the binding.Westbrook
BTW, I could swear it was working before we moved from RabbitMQ 3.4 to 3.6Westbrook
Sorry, misunderstood from the first sight. Please, take a look into my updatePetrozavodsk
Thanks @Artem and Derick, your answers are the same. This works, but will having 1000s of queues cause performance issues? We expect that there will be about 10000 queues. Also, any idea what is the overhead of creating a queue vs binding?Westbrook
That's doesn't matter because I'm sure your queues should be deleted after the subscription is gone. Your queues don't make sense on the Broker if they aren't bound to to some exchanges. BTW, by default all queues are bound to the default direct exchange by their names. Jut for convenience. So, you may really don't need to worry about bindings but just send messages to the default exchange with queue names as a routingKey.Petrozavodsk
Artem and Derick, I cannot create an exclusive/autodelete queue because a user could open multiple tabs. What I need is a way to disconnect the consumer BUT keep the queue. Queue will expire because of the "x-expires" tag that I put on it. Can I disconnect a consumer without deleting a queue using rabbitmq-amqp ?Westbrook
I guess SimpleMessageListenerContainer.removeQueue will just remove the consumer. Correct me if I'm wrong.Westbrook
Yes, that's correct it stops to listen to those queues, therefore remove consumer on the RabbitMQ side.Petrozavodsk
T
0

I needed this implementation. I tried all the ways. It didn't work! When multiple instances of a microservice are connected to a queue, it cannot be guaranteed that the next request will go to a specific instance. Because Rabbit-MQ uses the Round-robin method. For this I had to do a hard implementation.

Talanian answered 27/4, 2024 at 9:47 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.