I use Confluent Kafka .Net library Version 1.2.1, I have implemented consumer to consume messages from a topic, the problem is Consume method blocks the main thread and keeps waiting until a message is posted, but I would like to make it non blocking or to run in parallel. could some one help me on this?
below is the code I'm using for Consumer
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build())
{
consumer.Subscribe(topicName);
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult!=null)
{
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
Console.WriteLine($"Received message => {consumeResult.Value}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
I couldn't find an Async method for consumer but Producer has ProduceAsync to serve the purpose.