I have implemented a Kafka consumer as a console app by using BackgroundService on .NET Core 2.2. I am using confluent-kafka-dotnet v1.0.1.1 as a client for Apache Kafka. I'm not so sure about how to process each message.
Since the processing of each message can take some amount of time (up to 24 hours), I am starting a new Task for each message, so that I don't block the consumer from consuming new messages. I think that if I have too many messages, creating a new Task each time is not the right way to go. What is the proper way to process each message then? Is it possible to create some kind of dynamic background service for each message?
If a message is already being processed but application crashes or a rebalance occurs, I end up consuming and processing the same message more than once. Should I commit offset automatically (or right after it was consumed) and store the state of the message (or task) somewhere, like in a database?
I know that there is Hangfire, but I am not sure if I need to use it. If my current approach is totally wrong, please give me some suggestions.
Here is the implementation of ConsumerService:
public class ConsumerService : BackgroundService
{
private readonly IConfiguration _config;
private readonly IElasticLogger _logger;
private readonly ConsumerConfig _consumerConfig;
private readonly string[] _topics;
private readonly double _maxNumAttempts;
private readonly double _retryIntervalInSec;
public ConsumerService(IConfiguration config, IElasticLogger logger)
{
_config = config;
_logger = logger;
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
GroupId = _config.GetValue<string>("Kafka:GroupId"),
EnableAutoCommit = _config.GetValue<bool>("Kafka:Consumer:EnableAutoCommit"),
AutoOffsetReset = (AutoOffsetReset)_config.GetValue<int>("Kafka:Consumer:AutoOffsetReset")
};
_topics = _config.GetValue<string>("Kafka:Consumer:Topics").Split(',');
_maxNumAttempts = _config.GetValue<double>("App:MaxNumAttempts");
_retryIntervalInSec = _config.GetValue<double>("App:RetryIntervalInSec");
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine("!!! CONSUMER STARTED !!!\n");
// Starting a new Task here because Consume() method is synchronous
var task = Task.Run(() => ProcessQueue(stoppingToken), stoppingToken);
return task;
}
private void ProcessQueue(CancellationToken stoppingToken)
{
using (var consumer = new ConsumerBuilder<Ignore, Request>(_consumerConfig).SetValueDeserializer(new MessageDeserializer()).Build())
{
consumer.Subscribe(_topics);
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
// Don't want to block consume loop, so starting new Task for each message
Task.Run(async () =>
{
var currentNumAttempts = 0;
var committed = false;
var response = new Response();
while (currentNumAttempts < _maxNumAttempts)
{
currentNumAttempts++;
// SendDataAsync is a method that sends http request to some end-points
response = await Helper.SendDataAsync(consumeResult.Value, _config, _logger);
if (response != null && response.Code >= 0)
{
try
{
consumer.Commit(consumeResult);
committed = true;
break;
}
catch (KafkaException ex)
{
// log
}
}
else
{
// log
}
if (currentNumAttempts < _maxNumAttempts)
{
// Delay between tries
await Task.Delay(TimeSpan.FromSeconds(_retryIntervalInSec));
}
}
if (!committed)
{
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException ex)
{
// log
}
}
}, stoppingToken);
}
catch (ConsumeException ex)
{
// log
}
}
}
catch (OperationCanceledException ex)
{
// log
consumer.Close();
}
}
}
}
Task.Run
on every new message, It will spawn a new thread for every new msg, which is very resource-consuming. If you want to free the consumer, use a prod-consumer pattern (eitherBlockingCollection<T>
orActionBlock<T>
are a good start) – Fourgon