RabbitMQ by Example: Multiple Threads, Channels and Queues
Asked Answered
M

3

64

I just read RabbitMQ's Java API docs, and found it very informative and straight-forward. The example for how to set up a simple Channel for publishing/consuming is very easy to follow and understand. But it's a very simple/basic example, and it left me with an important question: How can I set up 1+ Channels to publish/consume to and from multiple queues?

Let's say I have a RabbitMQ server with 3 queues on it: logging, security_events and customer_orders. So we'd either need a single Channel to have the ability to publish/consume to all 3 queues, or more likely, have 3 separate Channels, each dedicated to a single queue.

On top of this, RabbitMQ's best practices dictate that we set up 1 Channel per consumer thread. For this example, let's say security_events is fine with only 1 consumer thread, but logging and customer_order both need 5 threads to handle the volume. So, if I understand correctly, does that mean we need:

  • 1 Channel and 1 consumer thread for publishing/consuming to and from security_events; and
  • 5 Channels and 5 consumer threads for publishing/consuming to and from logging; and
  • 5 Channels and 5 consumer threads for publishing/consuming to and from customer_orders?

If my understanding is misguided here, please begin by correcting me. Either way, could some battle-weary RabbitMQ veteran help me "connect the dots" with a decent code example for setting up publishers/consumers that meet my requirements here?

Milomilon answered 30/8, 2013 at 10:49 Comment(0)
H
141

I think you have several issues with initial understanding. Frankly, I'm a bit surprised to see the following: both need 5 threads to handle the volume. How did you identify you need that exact number? Do you have any guarantees 5 threads will be enough?

RabbitMQ is tuned and time tested, so it is all about proper design and efficient message processing.

Let's try to review the problem and find a proper solution. BTW, message queue itself will not provide any guarantees you have really good solution. You have to understand what you are doing and also do some additional testing.

As you definitely know there are many layouts possible:

enter image description here

I will use layout B as the simplest way to illustrate 1 producer N consumers problem. Since you are so worried about the throughput. BTW, as you might expect RabbitMQ behaves quite well (source). Pay attention to prefetchCount, I'll address it later:

enter image description here

So it is likely message processing logic is a right place to make sure you'll have enough throughput. Naturally you can span a new thread every time you need to process a message, but eventually such approach will kill your system. Basically, more threads you have bigger latency you'll get (you can check Amdahl's law if you want).

enter image description here

(see Amdahl’s law illustrated)

Tip #1: Be careful with threads, use ThreadPools (details)

A thread pool can be described as a collection of Runnable objects (work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new Runnable object to the work queue.

public class Main {
  private static final int NTHREDS = 10;

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    for (int i = 0; i < 500; i++) {
      Runnable worker = new MyRunnable(10000000L + i);
      executor.execute(worker);
    }
    // This will make the executor accept no new threads
    // and finish all existing threads in the queue
    executor.shutdown();
    // Wait until all threads are finish
    executor.awaitTermination();
    System.out.println("Finished all threads");
  }
} 

Tip #2: Be careful with message processing overhead

I would say this is obvious optimization technique. It is likely you'll send small and easy to process messages. The whole approach is about smaller messages to be continuously set and processed. Big messages eventually will play a bad joke, so it is better to avoid that.

enter image description here

So it is better to send tiny pieces of information, but what about processing? There is an overhead every time you submit a job. Batch processing can be very helpful in case of high incoming message rate.

enter image description here

For example, let's say we have simple message processing logic and we do not want to have thread specific overheads every time message is being processed. In order to optimize that very simple CompositeRunnable can be introduced:

class CompositeRunnable implements Runnable {

    protected Queue<Runnable> queue = new LinkedList<>();

    public void add(Runnable a) {
        queue.add(a);
    }

    @Override
    public void run() {
        for(Runnable r: queue) {
            r.run();
        }
    }
}

Or do the same in a slightly different way, by collecting messages to be processed:

class CompositeMessageWorker<T> implements Runnable {

    protected Queue<T> queue = new LinkedList<>();

    public void add(T message) {
        queue.add(message);
    }

    @Override
    public void run() {
        for(T message: queue) {
            // process a message
        }
    }
}

In such a way you can process messages more effectively.

Tip #3: Optimize message processing

Despite the fact you know can process messages in parallel (Tip #1) and reduce processing overhead (Tip #2) you have to do everything fast. Redundant processing steps, heavy loops and so on might affect performance a lot. Please see interesting case-study:

enter image description here

Improving Message Queue Throughput tenfold by choosing the right XML Parser

Tip #4: Connection and Channel Management

  • Starting a new channel on an existing connection involves one network round trip - starting a new connection takes several.
  • Each connection uses a file descriptor on the server. Channels don't.
  • Publishing a large message on one channel will block a connection while it goes out. Other than that, the multiplexing is fairly transparent.
  • Connections which are publishing can get blocked if the server is overloaded - it's a good idea to separate publishing and consuming connections
  • Be prepared to handle message bursts

(source)

Please note, all tips are perfectly work together. Feel free to let me know if you need additional details.

Complete consumer example (source)

Please note the following:

  • channel.basicQos(prefetch) - As you saw earlier prefetchCount might be very useful:

    This command allows a consumer to choose a prefetch window that specifies the amount of unacknowledged messages it is prepared to receive. By setting the prefetch count to a non-zero value, the broker will not deliver any messages to the consumer that would breach that limit. To move the window forwards, the consumer has to acknowledge the receipt of a message (or a group of messages).

  • ExecutorService threadExecutor - you can specify properly configured executor service.

Example:

static class Worker extends DefaultConsumer {

    String name;
    Channel channel;
    String queue;
    int processed;
    ExecutorService executorService;

    public Worker(int prefetch, ExecutorService threadExecutor,
                  , Channel c, String q) throws Exception {
        super(c);
        channel = c;
        queue = q;
        channel.basicQos(prefetch);
        channel.basicConsume(queue, false, this);
        executorService = threadExecutor;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        Runnable task = new VariableLengthTask(this,
                                               envelope.getDeliveryTag(),
                                               channel);
        executorService.submit(task);
    }
}

You can also check the following:

Halliard answered 30/8, 2013 at 10:49 Comment(3)
What if I set auto ACK =false, is it safe to ack in an executor thread pool?Annorah
using this approach consumer will devastate MQ queue and place task to executorService queue. It's may be trouble when incoming messages flow greater then execturorService message processing speed.Canopy
@Hett, it won't "devastate" at all because see "prefetch". On the other hand the single useful hint in this long&annoying answer is the usage of executorService.submit(task) which kind-of tells/confirms one that Worker will be run on a single thread.Worldlywise
R
27

How can I set up 1+ Channels to publish/consume to and from multiple queues?

You can implement using threads and channels. All you need is a way to categorize things, ie all the queue items from the login, all the queue elements from security_events etc. The catagorization can be achived using a routingKey.

ie: Every time when you add an item to the queue u specify the routing key. It will be appended as a property element. By this you can get the values from a particular event say logging.

The following Code sample explain how you make it done in client side.

Eg:

The routing key is used identify the type of the channel and retrive the types.

For example if you need to get all the channels about the type Login then you must specify the routing key as login or some other keyword to identify that.

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            string routingKey="login";

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

You can Look here for more details about the Categorization ..


Threads Part

Once the publishing part is over you can run the thread part..

In this part you can get the Published data on the basis of category. ie; routing Key which in your case is logging, security_events and customer_orders etc.

look in the Example to know how retrieve the data in threads.

Eg :

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
//**The threads part is as follows** 
 channel.exchangeDeclare(EXCHANGE_NAME, "direct");      
 String queueName = channel.queueDeclare().getQueue();
    // This part will biend the queue with the severity (login for eg:)
    for(String severity : argv){
              channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    }
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();

             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
     }
 });

Now a thread that process the Data in the Queue of the type login(routing key) is created. By this way you can create multiple threads. Each serving different purpose.

look here for more details about the threads part..

Retro answered 9/9, 2013 at 15:6 Comment(2)
Thank you. I prefer to declare n channels and bind the queue to each of them to designate the concurrency level which in turn removes the headache of managing the threads on my side.Ween
@Ween this won't work exactly as you expect; it might be possible in certain heavy load conditions for the Consumers to be executed serially; see github.com/rabbitmq/rabbitmq-java-client/discussions/…Worldlywise
W
1

Straight answer

For your particular situation (logging and customer_order both need 5 threads) I would create 1 Channel with 1 Consumer for logging and 1 Channel with 1 Consumer for customer_order. I would also create 2 thread pools (5 threads each): one to be used by logging Consumer and the other by customer_order Consumer.

See Consumption below for why should it work.

PS: do not create the thread pool inside the Consumer; be also aware that Channel.basicConsume(...) is not blocking

Publish

According to Channels and Concurrency Considerations (Thread Safety):

Concurrent publishing on a shared channel is best avoided entirely, e.g. by using a channel per thread. ... Consuming in one thread and publishing in another thread on a shared channel can be safe.

pretty clear ...

Consumption

The Channel might (I say might because of this) run all its Consumer(s) in the same thread; this ideea is almost explicitly conveyed by Receiving Messages by Subscription ("Push API"):

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

This means that in certain conditions many Consumers pertaining to the same Channel would run on the same thread such that the 1th one would hold up dispatch of callbacks for the next ones. The dispatch word is very confusing because sometimes refers to "thread work dispatching" while here refers mainly to calling Consumer.handleDelivery (see this again).

But what own dispatch thread is about? is about one from the thread pool used with (see Channels and Concurrency Considerations (Thread Safety)):

Server-pushed deliveries ... uses a java.util.concurrent.ExecutorService, one per connection.

Conclusion

If one has 1 Channel with 1 Consumer but wants to process the incoming messages in parallel than he better creates (outside the Consumer) and uses (inside the Consumer) his own thread pool; hence each Consumer received message will be processed on the user's thread pool instead on the Channel's own dispatch thread.

Is this approach (user's thread pool used from Consumer) even possible/valid/acceptable at all? it is, see Channels and Concurrency Considerations (Thread Safety):

thread that received the delivery (e.g. Consumer#handleDelivery delegated delivery handling to a different thread) ...

Worldlywise answered 31/3, 2022 at 13:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.