How to Consume from specific TopicPartitionOffset with Confluent.Kafka in .Net
Asked Answered
J

1

15

I need my consumer to consume from an specific TopicPartitionOffset(here from offset 278). Suppose that Messages have been produced by some Producer in Specific topic like ="Test_1" before. Here is my Code

using System;
using Confluent.Kafka;

public class ConsumingTest
{
    public static void Main(string[] args)
    {
        var consumerConfig = new ConsumerConfig
                                 {
                                     BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
                                 };

        using (var consumer = new Consumer<Null, string>(consumerConfig))
        {
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
            consumer.Subscribe("Test_1");

            var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));

            consumer.Assign(topicPartitionOffset);
            consumer.Seek(topicPartitionOffset);

            while (true)
                try
                {
                    var cr = consumer.Consume();

                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine(e.Message);
                }
        }
    }
}

at line ----> var cr = consumer.Consume(); Consumer Consumes but nothing Happens. What is the problem.

I have already done AutoOffsetReset = AutoOffsetResetType.Earliest in ConsumerConfig , and Consumer Consumes All messages from all offsets but, this is not what I'm looking for.

Jeromejeromy answered 29/12, 2018 at 10:17 Comment(0)
J
17

Solved: I found the solution which described as below:

  • added this

consumer.Assign(new TopicPartitionOffset(topicName, 0, new Offset(lastConsumedOffset))) Before trying to Consume, and

  • Removed these

consumer.Subscribe("Test_1") and consumer.Seek(...)

So Updated code is something like this which perfectly works:

using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume();
                        Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Value}");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error}");
                    }
                }

                consumer.Close();
            }
Jeromejeromy answered 31/12, 2018 at 13:32 Comment(4)
Where did you find this solution? I can't find any documentation on .net beyond the very basic produce and consumeWightman
what is lastConsumedOffset?Spermatozoon
@BrunoFerreira if you want always to read from start, then it's 0. i.imgur.com/2PIglmB.jpgOsmious
@BrunoFerreira lastConsumedOffset = consumer.Position(), I thinkFenestra

© 2022 - 2024 — McMap. All rights reserved.