How to create Custom serializer in kafka?
Asked Answered
V

4

9

There is only few serializer available like,

org.apache.kafka.common.serialization.StringSerializer

How can we create our own custom serializer ?

Vociferation answered 20/10, 2016 at 12:12 Comment(0)
S
17

Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.

We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.

Serializing MyMessage in producer side.

You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer

serialize() method do the work, receiving your object and returning a serialized version as bytes array.

public class MyValueSerializer implements Serializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public byte[] serialize(String topic, MyMessage message)
    {
        if (message == null) {
            return null;
        }

        try {

            (serialize your MyMessage object into bytes)

            return bytes;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

Deserializing MyMessage in consumer side.

You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer

deserialize() method do the work, receiving serialized value as bytes array and returning your object.

public class MyValueDeserializer implements Deserializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public MyMessage deserialize(String s, byte[] value)
    {
        if (value == null) {
            return null;
        }

        try {

            (deserialize value into your MyMessage object)

            MyMessage message = new MyMessage();
            return message;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

Then use it like this:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {

    int kafkaKey = record.key();
    MyMessage kafkaValue = record.value();

    ...
}
Segregate answered 20/10, 2016 at 15:46 Comment(5)
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);Vociferation
Above mentioned it is not as per syntax, Then How can kafka knows about deserializerVociferation
Deserializer is the third argument of the constructor: myValueDeserializer. All of this code was taken from working code, just changed some names.Segregate
Why do you save "isKey" in configure()? Can you explain when configure() and close() should not be empty methods?Ripleigh
@user1879313 For this code, there is no reason, but the Confluent Serializers, for example, use the boolean field to do different logic on the Schema Registry client and subsequently close the HTTP client in the close() method.Pleasantry
C
12

No words, only code

  1. Some object, which you sent to Kafka

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class TestDto {
    
        private String name;
        private String version;
    
    }
    
  2. Create Serializer, which will be used by Producer

    @Slf4j
    public class KafkaValueSerializer implements Serializer<TestDto> {
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public byte[] serialize(String topic, TestDto data) {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (JsonProcessingException e) {
                log.error("Unable to serialize object {}", data, e);
                return null;
            }
        }
    
        @Override
        public void close() {
        }
    }
    
  3. Of couser, don't foget about Deserialiser for Consumer

    @Slf4j
    public class KafkaValueDeserializer implements Deserializer<TestDto> {
    
        private ObjectMapper objectMapper = new ObjectMapper();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public TestDto deserialize(String topic, byte[] data) {
            try {
                return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class);
            } catch (Exception e) {
                log.error("Unable to deserialize message {}", data, e);
                return null;
            }
        }
    
        @Override
        public void close() {
        }
    }
    
  4. Last moment, add serializer/deserializer to application.yml

    spring:
        kafka:
          bootstrap-servers:  192.168.192.168:9092
          producer:
              value-serializer: com.package.service.kafka.KafkaValueSerializer
          consumer:
              group-id: groupId
              value-deserializer: com.package.service.kafka.KafkaValueDeserializer
    

That's all. It's not necessary any configuration file or dancing with a tamboirine :)

  1. Send

    KafkaTemplate<String, TestDto> kafkaTemplate;
    
    TestDto test = new TestDto("test name", "test-version");
    kafkaTemplate.send(topic, testDto);
    
  2. Listen

    @KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void listen(TestDto message) {
    
        log.info("Received message '{}' from Kafka.", message.toString());
    }
    
Contessacontest answered 16/8, 2018 at 9:53 Comment(5)
Spring Kafka already has a JSON serializer and deserializer, so probably could remove those from this answerPleasantry
Is it possible to send Map<String, TestDto> object through kafka ?Alphard
You can convert Map to Json and send json as exampleContessacontest
Have a question: what if , we dont the incoming message object structure ? then how we can write the deserializer in such instances. ?Prorogue
If you don’t know what the message looks like it’s imposible to deserialise.Caine
F
0

there is an easier way to do it, basically if you are casting your custom class to bytes in your custom serializer, then you are rebuilding the wheel. kafka already works with bytes.

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Bytes;

Bytes bytes = new Bytes(objectMapper.writeValueAsBytes(<customClass>));
kafkaTemplate.send("topic",bytes);

next in your Producter and Consumer configuration

@Bean
public ProducerFactory<String,String>(){
Map<String, Object> configProps = new HashMap<>();
    configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "<kafka-server>");
    configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
    configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            BytesSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}


@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host");
    props.put(
            ConsumerConfig.GROUP_ID_CONFIG,
            "group-id");
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            BytesDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

finally

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Bytes;


@KafkaListener(topics = "your.topic")
public void getInfoPersona(Bytes message) throws IOException {
    <your-custom-class> customClass = 
    objectMapper.readValue(message.get(), <your-custom-class>.class);
}
Fuchsin answered 23/7, 2022 at 15:24 Comment(0)
A
-2

You must create your own serializer which implements the interface Serializer (org.apache.kafka.common.serialization.Serializer) and then set the producer option key.serializer / value.serializer to it.

Ambulacrum answered 20/10, 2016 at 12:37 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.