Is there a way to use confluent Kafka Dotnet JSON serializer WITHOUT schema registry,
Asked Answered
R

2

6

Looking at https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/JsonSerialization/Program.cs it requires Schema Registry URL. Is there a simple way to serialize/deserialize JSON without that extra complexity?

Ridgeling answered 5/2, 2021 at 4:4 Comment(0)
C
3

Try this:

Model:

public class KafkaMessage : ISerializer<KafkaMessage>, IDeserializer<KafkaMessage>
{
    public MessageType MessageType { get; set; }
    public string MessageData { get; set; }

    public byte[] Serialize(KafkaMessage data, SerializationContext context)
    {
        using (var ms = new MemoryStream())
        {
            string jsonString = JsonSerializer.Serialize(data);
            var writer = new StreamWriter(ms);

            writer.Write(jsonString);
            writer.Flush();
            ms.Position = 0;

            return ms.ToArray();
        }
    }

    KafkaMessage IDeserializer<KafkaMessage>.Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        return JsonSerializer.Deserialize<KafkaMessage>(data.ToArray());
    }
}

Producer:

using (var producer = new ProducerBuilder<Null, KafkaMessage>(_config)
    .SetValueSerializer(new KafkaMessage())
    .Build())
{
    KafkaMessage km = new KafkaMessage { MessageType = messageType, MessageData = messageData };
    var deliveryResult = await producer
        .ProduceAsync(_topic, new Message<Null, KafkaMessage> { Value = km })
        .ContinueWith(task => !task.IsFaulted);

    result = deliveryResult;
}

Consumer:

using (var consumer = new ConsumerBuilder<Ignore, KafkaMessage>(_config)
    .SetValueDeserializer(new KafkaMessage())
    .SetLogHandler((_, logHandler) => {  })
    .SetErrorHandler((_, errorHandler) => { })
    .Build())
{ 
    consumer.Subscribe(_topic);
    try
    {
        while (!token.IsCancellationRequested)
        {
            var result = consumer.Consume(token);
            var messageData = result.Message.Value;
            Console.log($"Output: { messageData.MessageData }")
        }
    }
    catch (OperationCanceledException)
    {
        /* Close connection */
        consumer.Close();
    }
}
Catafalque answered 19/10, 2021 at 20:53 Comment(5)
You shouldn't name the serializer as KafkaMessage if this is a type used elsewhere. In other words, separate the serializer/deserialization and model classesSandusky
Can you please provide an example separating the serializer/deserialization/model classes? To serialize/deserialize a model, the model must IMPLEMENT iSerializer/iDeserializer.Catafalque
"Must" according to what? The model actually doesn't. That's like saying all Strings or Int32 types must implement those interfaces. Look at the existing serializers as examples - github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/…Sandusky
In other words, you can use generics like ConsumerBuilder<Ignore, KafkaMessage> and then you have .SetVauleDeserializer(new MyCustomDeserializer()), but MyCustomDeserializer just defines Deserialize method, doesn't actually hold any stateful fieldsSandusky
Yes sorry I agree, a better approach would have been to separate serializer/deserialization and model classes, using generics. Thanks for helping to improve my code.Catafalque
S
0

The link you've provided is for JSON Schema, not plain JSON.

JSON is a plaintext format. Use the Utf8Serializer and send strings after converting any model class or dictionary into a JSON string.

However, this doesn't guarantee (on the server-side) that your messages adhere to any agreed upon format (i.e. a required set of fields is defined), so that's where you'd want introduce a schema and use a registry

Sandusky answered 5/2, 2021 at 17:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.