Consume multiple messages at a time
Asked Answered
T

2

10

I am using an external service (Service) to process some particular type of objects. The Service works faster if I send objects in batches of 10. My current architecture is as follows. A producer broadcasts objects one-by-one, and a bunch of consumers pull them (one-by-one) from a queue and send them to The Service. This is obviously suboptimal.

I don't want to modify producer code as it can be used in different cases. I can modify consumer code but only with the cost of additional complexity. I'm also aware of the prefetch_count option but I think it only works on the network level -- the client library (pika) does not allow fetching multiple messages at once in the consumer callback.

So, can RabbitMQ create batches of messages before sending them to consumers? I'm looking for an option like "consume n messages at a time".

Tinfoil answered 29/5, 2014 at 12:12 Comment(4)
So what's wrong with prefetch_count? You can consume N number of messages, count them, and once your count exceed some value, process them all in once. In addition you can accnowledge them in batch.Campanile
What if there are only n < N items in the queue? Then the consumer will hang for an indeterminate amount of time waiting for N - n more messages to be queued. In my case, this is a crucial problem.Meteor
You can ether check the queue length after each message, or introduce some kind of a timer, that will process all messages every 'K' seconds, in addition to counting messages.Campanile
Yes, that is exactly what I meant by "additional complexity". Thank you for pointing it out though. Now, I know that I have at least one reasonable solution.Meteor
C
4

You cannot batch messages in the consumer callback, but you could use a thread safe library and use multiple threads to consume data. The advantage here is that you can fetch five messages on five different threads and combine the data if needed.

As an example you can take a look on how I would implement this using my AMQP library. https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_consumer.py

Choreodrama answered 15/4, 2015 at 12:20 Comment(2)
I'll use this to create a custom prefetch logic, thanksParricide
Would a "threaded approach" (Ill stand by the term :-) ) not be optimal only if after consuming the message you are running long running sync tasks on the consumer machine. If sending messages further downstream via network, its basically blocking as per the number of threads and network latency, right?Tabathatabb
I
1

The below code will make use of channel.consume to start consuming messages. We break out/stop when the desired number of messages is reached.

I have set a batch_size to prevent pulling of huge number of messages at once. You can always change the batch_size to fit your needs.

    def consume_messages(queue_name: str):
    msgs = list([])
    batch_size = 500

    q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
    q_length = q.method.message_count
    
    if not q_length:
        return msgs

    msgs_limit = batch_size if q_length > batch_size else q_length

    try:
        # Get messages and break out
        for method_frame, properties, body in channel.consume(queue_name):

            # Append the message
            try:
                msgs.append(json.loads(bytes.decode(body)))
            except:
                logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")

            # Acknowledge the message
            channel.basic_ack(method_frame.delivery_tag)

            # Escape out of the loop when desired msgs are fetched
            if method_frame.delivery_tag == msgs_limit:

                # Cancel the consumer and return any pending messages
                requeued_messages = channel.cancel()
                print('Requeued %i messages' % requeued_messages)
                break

    except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
        logger.info(f'Connection Interrupted: {str(e)}')

    finally:
        # Close the channel and the connection
        channel.stop_consuming()
        channel.close()

    return msgs
Iaverne answered 14/11, 2022 at 11:50 Comment(1)
I can write a similar code using basic get but that will have more number of round trips and not efficient.Iaverne

© 2022 - 2024 — McMap. All rights reserved.