Looking at https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/JsonSerialization/Program.cs it requires Schema Registry URL. Is there a simple way to serialize/deserialize JSON without that extra complexity?
Is there a way to use confluent Kafka Dotnet JSON serializer WITHOUT schema registry,
Asked Answered
Try this:
Model:
public class KafkaMessage : ISerializer<KafkaMessage>, IDeserializer<KafkaMessage>
{
public MessageType MessageType { get; set; }
public string MessageData { get; set; }
public byte[] Serialize(KafkaMessage data, SerializationContext context)
{
using (var ms = new MemoryStream())
{
string jsonString = JsonSerializer.Serialize(data);
var writer = new StreamWriter(ms);
writer.Write(jsonString);
writer.Flush();
ms.Position = 0;
return ms.ToArray();
}
}
KafkaMessage IDeserializer<KafkaMessage>.Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
return JsonSerializer.Deserialize<KafkaMessage>(data.ToArray());
}
}
Producer:
using (var producer = new ProducerBuilder<Null, KafkaMessage>(_config)
.SetValueSerializer(new KafkaMessage())
.Build())
{
KafkaMessage km = new KafkaMessage { MessageType = messageType, MessageData = messageData };
var deliveryResult = await producer
.ProduceAsync(_topic, new Message<Null, KafkaMessage> { Value = km })
.ContinueWith(task => !task.IsFaulted);
result = deliveryResult;
}
Consumer:
using (var consumer = new ConsumerBuilder<Ignore, KafkaMessage>(_config)
.SetValueDeserializer(new KafkaMessage())
.SetLogHandler((_, logHandler) => { })
.SetErrorHandler((_, errorHandler) => { })
.Build())
{
consumer.Subscribe(_topic);
try
{
while (!token.IsCancellationRequested)
{
var result = consumer.Consume(token);
var messageData = result.Message.Value;
Console.log($"Output: { messageData.MessageData }")
}
}
catch (OperationCanceledException)
{
/* Close connection */
consumer.Close();
}
}
Can you please provide an example separating the serializer/deserialization/model classes? To serialize/deserialize a model, the model must IMPLEMENT iSerializer/iDeserializer. –
Catafalque
"Must" according to what? The model actually doesn't. That's like saying all Strings or Int32 types must implement those interfaces. Look at the existing serializers as examples - github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/… –
Sandusky
In other words, you can use generics like
ConsumerBuilder<Ignore, KafkaMessage>
and then you have .SetVauleDeserializer(new MyCustomDeserializer())
, but MyCustomDeserializer
just defines Deserialize
method, doesn't actually hold any stateful fields –
Sandusky Yes sorry I agree, a better approach would have been to separate serializer/deserialization and model classes, using generics. Thanks for helping to improve my code. –
Catafalque
The link you've provided is for JSON Schema, not plain JSON.
JSON is a plaintext format. Use the Utf8Serializer
and send strings after converting any model class or dictionary into a JSON string.
However, this doesn't guarantee (on the server-side) that your messages adhere to any agreed upon format (i.e. a required set of fields is defined), so that's where you'd want introduce a schema and use a registry
© 2022 - 2024 — McMap. All rights reserved.
KafkaMessage
if this is a type used elsewhere. In other words, separate the serializer/deserialization and model classes – Sandusky