How to peek all messages in Azure Service Bus queue?
Asked Answered
U

4

8

I'd like to peek all messages from several Azure Service Bus queues. After that I want to filter them after queueName, insertDate and give the opportunity to make a full text search on the body.

Currently, I'm using the Microsoft.Azure.ServiceBus package to create a ManagementClient for gathering queue information and then use a MessageReceiver to peek the messages.

var managementClient = new ManagementClient(connectionString);

var queue = await managementClient.GetQueueRuntimeInfoAsync(queueName);

var count = queue.MessageCount;

var receiver = new MessageReceiver(connectionString, queueName);

var messagesOfQueue = new List<Message>();

for (var i = 1; i <= count; i++)
{
   messagesOfQueue.Add(await receiver.PeekAsync());
}

Is there a better way to get all messages? Or is there even a way to only peek messages that apply to a filter?

I've also tried to use the QueueClient.PeekBatch Method from the WindowsAzure.ServiceBus package. But that method didn't return all messages although I've set the correct messageCount parameter.

And then there is also the package Azure.Messaging.ServiceBus... What's up with all these packages?

So which of the packages should I use and what is the best way for peeking messages of queues based on some filters?

Usurer answered 26/3, 2021 at 9:48 Comment(0)
U
7

The solution I'm currently using and which works as expected looks like this:

var receiver = serviceBusClient.CreateReceiver(queueName);

var messagesOfQueue = new List<ServiceBusReceivedMessage>();
var previousSequenceNumber = -1L;
var sequenceNumber = 0L;

do
{
  var messageBatch = await receiver.PeekMessagesAsync(int.MaxValue, sequenceNumber);

  if (messageBatch.Count > 0)
  {
    sequenceNumber = messageBatch[^1].SequenceNumber;

    if (sequenceNumber == previousSequenceNumber)
      break;

    messagesOfQueue.AddRange(messageBatch);

    previousSequenceNumber = sequenceNumber;
  }
  else
  {
    break;
  }
} while (true);

It uses the nuget package Azure.Messaging.ServiceBus.

Usurer answered 4/10, 2021 at 10:10 Comment(0)
P
2

Currently you're receiving a single message from the receiver. A better option would be to receive messages in batch using PeekBatchAsync(Int64, Int32) method of MessageReceiver.

Here's the sample code to do so (untested though):

var messagesOfQueue = new List<Message>();
var sequenceNumber = 0;
var batchSize = 100;//number of messages to receive in a single call
do
{
    var messages = await receiver.PeekBatchAsync(sequenceNumber, batchSize);
    messagesOfQueue.AddRange(messages);
    if (messages.Count > 0)
    {
        sequenceNumber = messages[messages.Count-1].SequenceNumber;
    }
    else
    {
        break;
    }
} while (true);
Pentstemon answered 26/3, 2021 at 10:11 Comment(3)
Okay but as I said I've already tried the PeekBatch Method and couldn't peek all messages in one batch. Or is the maximum amount of messages restricted to 100 and I have to call the PeekBatch Method multiple times?Usurer
I think you tried PeekBatch message from the older SDK. Can you try it using the latest SDK (Azure.Messaging.ServiceBus). Regarding the batch size as 100, that was just a random number I chose. Yes, with this approach you will need to call PeekBatch message multiple times.Pentstemon
Thanks for your help. Based on your comment I figured out a working solution.Usurer
S
2

The solution avoids getting the message with the same SequenceNumber twice.

Sequence numbers monotonically increase. And I've tested most cases except rolling over sequenceNumber to 0 when it reaches the maximum value (Long.MaxValue).

using Azure.Messaging.ServiceBus;

private static async Task<List<ServiceBusReceivedMessage>> PeekAllMessages(string serviceBusConnectionString, string queueName)
{
    var client = new ServiceBusClient(serviceBusConnectionString);
    var receiver = client.CreateReceiver(queueName);

    var messages = new List<ServiceBusReceivedMessage>();

    var batchSize = 20;
    var sequenceNumber = 0L;
    do
    {
        var messageBatch = await receiver.PeekMessagesAsync(batchSize, sequenceNumber);

        if (messageBatch.Count <= 0)
        {
            break;
        }

        // Increasing the SequenceNumber by 1 to avoid getting the message with the same SequenceNumber twice
        sequenceNumber = messageBatch[^1].SequenceNumber + 1;

        messages.AddRange(messageBatch);

    } while (true);

    return messages;
}
Soniferous answered 21/10, 2021 at 9:10 Comment(0)
A
0
public long currentSequenceNumber; // default value for long is 0L

do{
        currentSequence++;
        message = reader.peekMessage(currentSequence);
        currentSequence = message.getSequenceNumber();

}while(message!=null)

Just keep increasing the sequence Number. That's it.

Adenoidectomy answered 22/9, 2024 at 11:51 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.