How to make consume method as non blocking in confluent kafka for dot net
Asked Answered
A

2

9

I use Confluent Kafka .Net library Version 1.2.1, I have implemented consumer to consume messages from a topic, the problem is Consume method blocks the main thread and keeps waiting until a message is posted, but I would like to make it non blocking or to run in parallel. could some one help me on this?

below is the code I'm using for Consumer

using (var consumer = new ConsumerBuilder<Ignore, string>(config) 
    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
    .SetPartitionsAssignedHandler((c, partitions) =>
    {
        Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");        
    })
    .SetPartitionsRevokedHandler((c, partitions) =>
    {
        Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
    })
    .Build())
{
    consumer.Subscribe(topicName);

    while (true)
    {
        try
        {
            var consumeResult = consumer.Consume(cancellationToken);

            if (consumeResult!=null)
            {
                if (consumeResult.IsPartitionEOF)
                {
                    Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

                    continue;
                }

                Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");

                Console.WriteLine($"Received message => {consumeResult.Value}");

            }
        }

        catch (ConsumeException e)
        {
            Console.WriteLine($"Consume error: {e.Error.Reason}");
        }
    }
}

I couldn't find an Async method for consumer but Producer has ProduceAsync to serve the purpose.

Attainture answered 17/12, 2019 at 7:53 Comment(0)
O
12

There's no async consume (yet). You might not be aware that calling consume simply returns messages from an internal queue - doesn't correspond to a network fetch to a Kafka broker. Messages are fetched in background threads, and caching is very aggressive by default (tunable via parameters outlined here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). If there are messages available, the consumer can deliver them to your application at > 500,000 msgs / second (for small message sizes). To achieve parallelism, use multiple consumers (generally in separate processes, and note it's best to try to fully utilize each consumer for maximum efficiency), or use multiple threads after consuming a message.

With the above said, an async consume method would be useful in cases where you want to block on multiple types of IO simultaneously (e.g. kafka consume and http responses). It would also be useful to fit in with standard C# patterns, in particular hosted services. For these reasons, we'll like to add it in the future.

For now, for the first use case, you can just use another thread (an extra thread that isn't busy isn't going to materially affect performance):

Task t = Task.Run(() => consumer.Consume(ct);

For the second use case, just set up a long running thread instead.

Obligato answered 17/12, 2019 at 17:2 Comment(1)
Thank you Matt very much for this answer! I was trying to find how exactly the consumer works, and I needed to get this exact info regarding the internal queue. I supposed it works that way but could not find any doc online (inside github or somewhere else) to prove me right. I believe this information should definitely be part of the package's documentation.Somber
N
0

For people searching this in the future, here is a solution using an extension method and ValueTask. It will return immediatelly if a message is in the buffer, otherwise it waits on a background thread.

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

public static class ConsumerExtensions
{
    public static async ValueTask<ConsumeResult<TKey, TValue>> ConsumeAsync<TKey, TValue>(this IConsumer<TKey, TValue> consumer, CancellationToken ct)
    {
        try
        {
            var res = consumer.Consume(0);
            if (res != null)
            {
                return res;
            }

            return await Task.Run(() => consumer.Consume(ct), ct);
        }
        catch (OperationCanceledException)
        {
            throw new TaskCanceledException($"Kafka consumer [{consumer.Name}] was canceled.");
        }
        catch (Exception e)
        {
            throw new Exception($"Kafka consumer [{consumer.Name}] has failed.", e);
        }
    }
}
Normandnormandy answered 23/9, 2021 at 14:8 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.