How to use the ServiceBus EventData Offset Value
Asked Answered
L

2

16

I have some code that uses the Service Bus Event Data, and I suspect that I need to use the offset property as, currently, my program is (or seems to be) re-running the same Event Hub data over and over again.

My code is as follows:

public class EventHubListener : IEventProcessor
{
    private static EventHubClient _eventHubClient;        
    private const string EhConnectionStringNoPath = "Endpoint=...";
    private const string EhConnectionString = EhConnectionStringNoPath + ";...";
    private const string EhEntityPath = "...";        

    public void Start()
    {
        _eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
        EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();            
        EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);

        foreach (string partitionId in eventHub.PartitionIds)
        {
            defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
            {
                PartitionId = partitionId
            }, new EventProcessorCheckpointManager());

            Console.WriteLine("Processing : " + partitionId);
        }
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData eventData in messages)
        {                
            string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
            MyData data = JsonConvert.DeserializeObject<MyData>(bytes);

As I get the same messages over and over again, I suspect that I need to do something like this:

string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);

However, Offset is a string, even though it seems to be a numeric value ("12345" for example). The documentation on context.CheckPointAsync() made it seem like that might be the answer; however, issuing that at the end of the loop seems to make no difference.

So, I have a two part question:

  1. What is offset? Is it what I think it is (i.e. a numeric marker to a point in the stream) and, if so, why is it a string?
  2. Why would I be getting the same messages over again? As I understand Event Hubs, although they guarantee at least once, once a Checkpoint has been issues, I shouldn't be getting the same messages back.

EDIT:

After a while of messing about, I've come up with something that avoids this problem; however, I certainly wouldn't claim it's a solution:

var filteredMessages =
            messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
            .OrderBy(a => a.EnqueuedTimeUtc);

Using the EventProcessorHost seemed to actually make the problem worse; that is, not only were historical events being replayed, but they seemed to be replayed in a random order.

EDIT:

I came across this excellent article by @Mikhail, which does seem to address my exact issue. However; and presumably the root of my problem (or one of them, assuming this is correct, then I'm unsure why using the EventProcessorHost doesn't just work out of the box as @Mikhail said himself in the comments). However, the ServiceBus version of ICheckpointManager only has a single interface method:

namespace Microsoft.ServiceBus.Messaging
{

    public interface ICheckpointManager
    {
        Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
    }
}
Lupulin answered 5/6, 2018 at 7:36 Comment(9)
Are you using EventProcessorHost? If so, calling CheckPointAsync should be enough, it will take care of offsets.Speaks
Can you post the whole code of the class in which ProcessEventsAsync is defined?Checkerbloom
@Mikhail - no, I'm just implementing IEventProcessorLupulin
But you are not calling CheckPointAsync anywhere do you? You need to call it. For example after a certain period of time or after processing a batch of messages. See github.com/DeHeerSoftware/SemanticLogging.EventHub/blob/master/… for an exampleCheckerbloom
@pm_2 May I ask why not? You'll end up re-implementing it yourself.Speaks
@Mikhail it wasn't a conscious decision - I'm still playing about with this stuff. Are you saying that calling CheckPointAsync doesn't work out of the box, and requires this additional package?Lupulin
@PeterBons I am, I just left it out of the code sample, because it was something that I tried to fix the issue... but it didn't seem toLupulin
You better try using EventProcessorHost as @Mikhail suggested, see github.com/DeHeerSoftware/SemanticLogging.EventHub/blob/master/… for an example.Checkerbloom
@pm_2 I would guess you save the checkpoint, but then don't load it... or something along these lines.Speaks
B
1

Your title should be event hub, rather than service bus. For your question:

  1. Although event hub has similar design as Kafka, but one big difference is that you should manage offsets by yourself. Event hub broker has completely no idea about your consumer group's offset.
  2. So event hub sdk provide some help class to store offset in storage account, but you still need to call checkpoint manually after processing the message.
Bujumbura answered 11/10, 2018 at 3:45 Comment(0)
S
1
  1. What is offset? Is it what I think it is (i.e. a numeric marker to a point in the stream) and, if so, why is it a string?

    The offset is the pointer within a stream. The offset of an event changes as events gets removed from your Event Hub when the Message Retention policy has elapsed. So a message that was once at offset 10, maybe at offset 0 several days later because older messages were dropped from the stream. This has a good diagram: Event Hubs: Stream Offsets.

  2. Why would I be getting the same messages over again? As I understand Event Hubs, although they guarantee at least once, once a Checkpoint has been issues, I shouldn't be getting the same messages back.

    You may be getting the same messages again if you are using the low-level EventReceiver offset since messages expire from the Event Hub when the Message Retention policy elapses (ie. Default is 1 day). Sequence number is a better field to leverage because it does not change.

    When checkpointing succeeds, it tells us the last event that was successfully processed, so you shouldn't be getting the same event back because when the client starts, it'll create a stream to a position in the event stream after that event. You can file an issue on GitHub.

EventProcessorHost is helpful as it tries to balance the processing of partitions between the number of instances running. (ie. Consider a 6 partition Event Hub. If you have 2 EventProcessorHosts connected to the same Event Hub reading with the same consumer group, they'll end up balancing the processing of those partitions with 3 each.) It also reconnects when there are transient failures like network loss.

It supports checkpointing to durable storage like Azure Storage Blob. Here is a sample: Process Events using an EventProcessorClient

Splenomegaly answered 12/3, 2021 at 1:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.