Split batch of messages to be sent to Azure Service Bus
Asked Answered
L

3

12

Let's say I have a collection List<BrokeredMessage> of messages that I want to batch-send to Azure Service Bus.

The collection size is arbitrary, so the total size of all messages combined might be over the limit of 256k imposed by Service Bus. How do I split it into smaller chunks that will go through, in the optimal way?

This task seems to be simple, but it appears to be not: the size of each BrokeredMessage is unknown before I try to send it. Size property returns just the size of message body, without headers and other overhead.

If I try to send 1000 messages with body of 250 bytes each, I'll get MessageSizeExceededException. The catch is that now I can't even retry, because messages were already consumed, so I'd have to re-create all the BrokeredMessage's.

So the only way I see for now is to be very conservative about the batch size when lots of small messages are sent, which will probably cost me some throughput.

Are there more reliable and/or clean ways?

Lownecked answered 27/6, 2017 at 11:57 Comment(3)
This is a crazy idea, but if you serialize the payload as JSON, you could calculate (based on the length of the string containing the JSON payload) how many bytes each message takes. While JSON is a bit less efficient, perhaps you can make up for this by being able to batch sends right up to the 256KB limit.Pomp
@RobReagan I already do that, but that's just message body length, as I mentioned. Headers (session ID, message ID, content type etc) are not included, but they make a difference for small payloads.Lownecked
My only other idea is to guesstimate an average header size and use that in the calculation. Then perhaps catch the MessageSizeExceededException, and progressively step down the number of items to send in the batch. My guess is that even with a few multiple send attempts for a given batch size, you will still save money.Pomp
C
4

So the only way I see for now is to be very conservative about the batch size when lots of small messages are sent, which will probably cost me some throughput.

It will cost not just throughput, but also reliability. When MessageSender.SendBatchAsync() is used, all messages are sent as an atomic operation and either succeed or fail together.

Are there more reliable and/or clean ways

Using TransactionScope to wrap all of the sends would and achieve the same effect, but you won't be sending messages as a batch anymore.

If you still want to send batches and ensure that you don't into size/count problems, just like suggested you could chunk your sends. Unfortunately, the Size property is a no go for size estimates. It reports body before serialization. Unless use Stream and then serialization is not applied. And event then, still your size will be skewed by the standard and custom properties. When documentation for the WindowsAzure.ServiceBus was re-arranged, the following remark got lost from MSDN API documentation for BrokeredMessage:

To get an accurate value for the size of a BrokeredMessage, you should read the Size property after completing the Send/Receive operation on the BrokeredMessage.

I've took an approach of chunking based on estimated size. Estimated size is based on a certain padding percentage to inflate size of the message in anticipation that on average a message will be smaller than padded size. Plus and average assumed size for string based properties. In this blog post I've laid out the idea behind estimating a single message size to be used to calculate a chunk size that could go out as a batch.

Crouton answered 27/6, 2017 at 14:50 Comment(2)
Thanks for the nice answer. I'm doing the chunked approach, working on my own way to estimate the size. From what I see, Size property before sending shows the serialized body size. After sending, it returns same body size + header size (that used to be 0). So the estimation task is about estimating the headers, which are mostly fixed in our use case.Lownecked
I loved your article Sean. Cool to see how to estimate the size of the message :-)Sampson
W
4

In the current library, the ServiceBusMessageBatch class has a method which allow you to see if adding a message will exceed the batch size, allowing you to write something like this...

public async Task SendBatchAsync<T>(ServiceBusSender sender, IEnumerable<ServiceBusMessage> messages)
{
    // Don't use using - we want to explicitly control batch lifecycle
    var batch = await sender.CreateMessageBatchAsync();

    foreach (var message in messages)
    {
        if (!batch.TryAddMessage(message))
        {
            // Batch is full, so send this batch
            await sender.SendMessagesAsync(batch);
            batch.Dispose();

            // And create a new batch starting with this message
            batch = await sender.CreateMessageBatchAsync();
            batch.TryAddMessage(message);
        }
    }

    // Send the final batch
    await sender.SendMessagesAsync(batch);
    batch.Dispose();
}
Windrow answered 6/1, 2023 at 15:10 Comment(0)
S
1

To complete Sean answer, Paolo Salvatori wrote some extensions methods for the MessageSender Class, you can find it here:

Basically, it is gonna iterate through all your messages and batch them so the size won't exceed the max batch size.

I ran into some issues because the BrokeredMessage.Size does not take in account the Properties of the BrokerMessage. I modified his version a little bit to add the Properties size:

/// <summary>
/// This class contains extensions methods for the <see cref="MessageSender"/> class.
/// </summary>
public static class MessageSenderExtensions
{
    private const string BrokeredMessageListCannotBeNullOrEmpty = "The brokeredMessageEnumerable parameter cannot be null or empty.";
    private const string SendPartitionedBatchFormat = "[MessageSender.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    private const string SendPartitionedBatchAsyncFormat = "[MessageSender.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    private const int MaxBathSizeInBytes = 262144;

    /// <summary>
    /// Sends a set of brokered messages (for batch processing). 
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size.
    /// </summary>
    /// <param name="messageSender">The current <see cref="MessageSender"/> object.</param>
    /// <param name="messages">The collection of brokered messages to send.</param>
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param>
    /// <returns>The asynchronous operation.</returns>
    public async static Task SendPartitionedBatchAsync(this MessageSender messageSender, IEnumerable<BrokeredMessage> messages, bool trace = false)
    {
        var brokeredMessageList = messages as IList<BrokeredMessage> ?? messages.ToList();
        if (messages == null || !brokeredMessageList.Any())
        {
            throw new ArgumentNullException(BrokeredMessageListCannotBeNullOrEmpty);
        }

        var batchList = new List<BrokeredMessage>();
        long batchSize = 0;

        foreach (var brokeredMessage in brokeredMessageList)
        {
            // Hack because the size of the brokered message does not take into account the size of the properties
            var messageSize = GetObjectSize(brokeredMessage.Properties) + brokeredMessage.Size;
            if ((batchSize + messageSize) > MaxBathSizeInBytes)
            {
                // Send current batch
                await messageSender.SendBatchAsync(batchList);
                Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));

                // Initialize a new batch
                batchList = new List<BrokeredMessage> { brokeredMessage };
                batchSize = messageSize;
            }
            else
            {
                // Add the BrokeredMessage to the current batch
                batchList.Add(brokeredMessage);
                batchSize += messageSize;
            }
        }

        // The final batch is sent outside of the loop
        await messageSender.SendBatchAsync(batchList);
        Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
    }

    /// <summary>
    /// Calculates the lenght in bytes of an object and returns the size.
    /// </summary>
    private static int GetObjectSize(object objectToTest)
    {
        var bf = new BinaryFormatter();
        using (var ms = new MemoryStream())
        {
            bf.Serialize(ms, objectToTest);
            return ms.ToArray().Length;
        }
    }
}
Sampson answered 28/6, 2017 at 22:44 Comment(2)
Your GetObjectSize doesn't seem to produce accurate results. For a message with no extra properties, it returns 369 bytes, but the actual overhead as measured after Send is only 54 bytes.Lownecked
@Mikhail, yeah I agree it is not accurate. You can try sean post to build a more accurate function..Sampson

© 2022 - 2024 — McMap. All rights reserved.