How to properly implement kafka consumer as a background service on .NET Core
Asked Answered
B

1

10

I have implemented a Kafka consumer as a console app by using BackgroundService on .NET Core 2.2. I am using confluent-kafka-dotnet v1.0.1.1 as a client for Apache Kafka. I'm not so sure about how to process each message.

  1. Since the processing of each message can take some amount of time (up to 24 hours), I am starting a new Task for each message, so that I don't block the consumer from consuming new messages. I think that if I have too many messages, creating a new Task each time is not the right way to go. What is the proper way to process each message then? Is it possible to create some kind of dynamic background service for each message?

  2. If a message is already being processed but application crashes or a rebalance occurs, I end up consuming and processing the same message more than once. Should I commit offset automatically (or right after it was consumed) and store the state of the message (or task) somewhere, like in a database?

I know that there is Hangfire, but I am not sure if I need to use it. If my current approach is totally wrong, please give me some suggestions.

Here is the implementation of ConsumerService:

public class ConsumerService : BackgroundService
{
    private readonly IConfiguration _config;
    private readonly IElasticLogger _logger;
    private readonly ConsumerConfig _consumerConfig;
    private readonly string[] _topics;
    private readonly double _maxNumAttempts;
    private readonly double _retryIntervalInSec;

    public ConsumerService(IConfiguration config, IElasticLogger logger)
    {
        _config = config;
        _logger = logger;
        _consumerConfig = new ConsumerConfig
        {
            BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
            GroupId = _config.GetValue<string>("Kafka:GroupId"),
            EnableAutoCommit = _config.GetValue<bool>("Kafka:Consumer:EnableAutoCommit"),
            AutoOffsetReset = (AutoOffsetReset)_config.GetValue<int>("Kafka:Consumer:AutoOffsetReset")
        };
        _topics = _config.GetValue<string>("Kafka:Consumer:Topics").Split(',');
        _maxNumAttempts = _config.GetValue<double>("App:MaxNumAttempts");
        _retryIntervalInSec = _config.GetValue<double>("App:RetryIntervalInSec");
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Console.WriteLine("!!! CONSUMER STARTED !!!\n");
        
        // Starting a new Task here because Consume() method is synchronous
        var task = Task.Run(() => ProcessQueue(stoppingToken), stoppingToken);

        return task;
    }

    private void ProcessQueue(CancellationToken stoppingToken)
    {
        using (var consumer = new ConsumerBuilder<Ignore, Request>(_consumerConfig).SetValueDeserializer(new MessageDeserializer()).Build())
        {
            consumer.Subscribe(_topics);

            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(stoppingToken);

                        // Don't want to block consume loop, so starting new Task for each message  
                        Task.Run(async () =>
                        {
                            var currentNumAttempts = 0;
                            var committed = false;

                            var response = new Response();

                            while (currentNumAttempts < _maxNumAttempts)
                            {
                                currentNumAttempts++;

                                // SendDataAsync is a method that sends http request to some end-points
                                response = await Helper.SendDataAsync(consumeResult.Value, _config, _logger);

                                if (response != null && response.Code >= 0)
                                {
                                    try
                                    {
                                        consumer.Commit(consumeResult);
                                        committed = true;
                                        
                                        break;
                                    }
                                    catch (KafkaException ex)
                                    {
                                        // log
                                    }
                                }
                                else
                                {
                                    // log
                                }
                                
                                if (currentNumAttempts < _maxNumAttempts)
                                {
                                    // Delay between tries
                                    await Task.Delay(TimeSpan.FromSeconds(_retryIntervalInSec));
                                }
                            }
                                                    
                            if (!committed)
                            {
                                try
                                {
                                    consumer.Commit(consumeResult);
                                }
                                catch (KafkaException ex)
                                {
                                    // log
                                }
                            }
                        }, stoppingToken);
                    }
                    catch (ConsumeException ex)
                    {
                        // log
                    }
                }
            }
            catch (OperationCanceledException ex)
            {
                // log
                consumer.Close();
            }
        }
    }
}
Beam answered 24/6, 2019 at 9:39 Comment(3)
Don´t Task.Run on every new message, It will spawn a new thread for every new msg, which is very resource-consuming. If you want to free the consumer, use a prod-consumer pattern (either BlockingCollection<T> or ActionBlock<T> are a good start)Fourgon
The offset commit strategy largely depends on: 1) Is the process idempotent? 2) Is the processing order important ?Fourgon
Task.Run doesn't spawn new threads. It takes free thread from thread pool and if your code is I/O bound everything will be fineArturoartus
S
2

Agree with Fabio that you should not Task.Run in order to process a message since you'll end up with lots of threads wasting resources and switching their execution so that performace will suffer.

Moreover, it's okay to process consumed message in the same thread since Kafka uses pull model and your application can process a message in its own pace.

Regarding processing message more than once, I'd suggest to store an offset of the processed message in order to skip already processed messages. Since offset is a long-based number, you can easily skip messages with offset less than committed earlier. Of course, this works only if you have one partition because Kafka provides offset counter and order guarantees on partition level

You can find example of Kafka Consumer in my article. If you have questions, feel free to ask, I'm glad to help you

Shilling answered 18/1, 2021 at 15:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.