There is only few serializer available like,
org.apache.kafka.common.serialization.StringSerializer
How can we create our own custom serializer ?
There is only few serializer available like,
org.apache.kafka.common.serialization.StringSerializer
How can we create our own custom serializer ?
Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.
We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.
Serializing MyMessage in producer side.
You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer
serialize() method do the work, receiving your object and returning a serialized version as bytes array.
public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}
try {
(serialize your MyMessage object into bytes)
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}
@Override
public void close()
{
}
}
final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
Deserializing MyMessage in consumer side.
You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer
deserialize() method do the work, receiving serialized value as bytes array and returning your object.
public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}
try {
(deserialize value into your MyMessage object)
MyMessage message = new MyMessage();
return message;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}
@Override
public void close()
{
}
}
Then use it like this:
final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {
int kafkaKey = record.key();
MyMessage kafkaValue = record.value();
...
}
No words, only code
Some object, which you sent to Kafka
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class TestDto {
private String name;
private String version;
}
Create Serializer, which will be used by Producer
@Slf4j
public class KafkaValueSerializer implements Serializer<TestDto> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, TestDto data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
log.error("Unable to serialize object {}", data, e);
return null;
}
}
@Override
public void close() {
}
}
Of couser, don't foget about Deserialiser for Consumer
@Slf4j
public class KafkaValueDeserializer implements Deserializer<TestDto> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public TestDto deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class);
} catch (Exception e) {
log.error("Unable to deserialize message {}", data, e);
return null;
}
}
@Override
public void close() {
}
}
Last moment, add serializer/deserializer to application.yml
spring:
kafka:
bootstrap-servers: 192.168.192.168:9092
producer:
value-serializer: com.package.service.kafka.KafkaValueSerializer
consumer:
group-id: groupId
value-deserializer: com.package.service.kafka.KafkaValueDeserializer
That's all. It's not necessary any configuration file or dancing with a tamboirine :)
Send
KafkaTemplate<String, TestDto> kafkaTemplate;
TestDto test = new TestDto("test name", "test-version");
kafkaTemplate.send(topic, testDto);
Listen
@KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(TestDto message) {
log.info("Received message '{}' from Kafka.", message.toString());
}
there is an easier way to do it, basically if you are casting your custom class to bytes in your custom serializer, then you are rebuilding the wheel. kafka already works with bytes.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Bytes;
Bytes bytes = new Bytes(objectMapper.writeValueAsBytes(<customClass>));
kafkaTemplate.send("topic",bytes);
next in your Producter and Consumer configuration
@Bean
public ProducerFactory<String,String>(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"<kafka-server>");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
BytesSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host");
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"group-id");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
finally
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Bytes;
@KafkaListener(topics = "your.topic")
public void getInfoPersona(Bytes message) throws IOException {
<your-custom-class> customClass =
objectMapper.readValue(message.get(), <your-custom-class>.class);
}
You must create your own serializer which implements the interface Serializer
(org.apache.kafka.common.serialization.Serializer
) and then set the producer option key.serializer
/ value.serializer
to it.
© 2022 - 2024 — McMap. All rights reserved.