Spring-Kafka cannot convert AVRO GenericData.Record to Acknowledgment
Asked Answered
B

2

11

Using Spring Boot, I am trying to set up my Kafka consumers in batch receiving mode:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter()); // I know this one won't work
    factory.setBatchListener(true);
    return factory;
}

@Bean
public ConsumerFactory<GenericData.Record, GenericData.Record> consumerFactory() {
    Map<String, Object> dataRiverProps = getDataRiverProps();
    dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
    return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}

And this is what the actual consumer looks like:

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = 'kafkaListenerContainerFactory')
public void consumeAvro(List<GenericData.Record> list, Acknowledgment ack) {
    messageProcessor.addMessageBatchToExecutor(list);
    while (messageProcessor.getTaskSize() > EXECUTOR_TASK_COUNT_THRESHOLD) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            LOGGER_ERROR.error(ExceptionUtils.getStackTrace(e.getCause()));
        }
    }
}

The exceptions I am getting look like this:

nested exception is org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [org.apache.avro.generic.GenericData$Record] to type [org.springframework.kafka.support.Acknowledgment]
        at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:46)
        at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:191)
        at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:174)
        at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66)

The Kafka messages are AVRO messages, and I would like to retrieve them as JSON strings. Is there a ready-for-use AVRO converter for GenericData.Record that I can plug in the ConcurrentKafkaListenerContainerFactory? Thanks!

Belia answered 7/2, 2019 at 23:45 Comment(6)
So this is what I have found. If I remove Acknowledgement from the signature then it will work just fine: public void consumeAvro(List<Message<GenericData.Record>> list) { ...} I was hoping to be able to manually commit, as you can see from my original post I was trying to throttle the consumer. So if I don't manually acknowledge by calling ack.acknowledge() after the while loop, does it mean that it will automatically acknowledge after the while loop, as well as acknowledge regularly based on the auto.commit.interval.ms setting? Thanks!Belia
Can someone point me to some example to show I can possibly implement a message converter for GenericData.Record? Thanks!Belia
do you want to convert to GenericData.Record POJO?Scleroprotein
Thanks @Prabhakar! I really have no idea where to even start. If someone can even show me an example that will be very helpful!Belia
sure I will create an example.Scleroprotein
if you are still looking for sample program, please see below answerScleroprotein
U
12

Just add below property to your kafka consumer configs

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

Uranometry answered 1/7, 2021 at 10:11 Comment(3)
Only to complete a little bit this answer, if you have your config in a properties file, the equivalent one for this would be spring.kafka.properties.specific.avro.reader: trueSankaran
@Sankaran you are my saviour, dude!Futures
I have similar case to this, but instead of GenericRecord, I got unknown [B] class type. Can you please help me answering my question here #78436663 ??Communicate
S
6

Here is an example of how to consume messages in batch mode.

Sample Kafka demo for Batch listener with avro message formart

The App has a Custom Message Convertor which converts Avro messages to Pojo directly.It uses Schema files in the classPath. The schema files are named with convention "topicName".avsc

public class AvroSchemaMessageConverter extends MessagingMessageConverter {

  private AvroMapper avroMapper;
  private SchemaRegistry schemaRegistry;
  private KafkaHeaderMapper headerMapper;


  public AvroSchemaMessageConverter(AvroMapper avroMapper, SchemaRegistry schemaRegistry) {
    this.avroMapper = avroMapper;
    this.schemaRegistry = schemaRegistry;
    if (JacksonPresent.isJackson2Present()) {
      this.headerMapper = new DefaultKafkaHeaderMapper();
    } else {
      this.headerMapper = new SimpleKafkaHeaderMapper();
    }
  }

  @Override
  protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
    System.out.printf(record.value().getClass().getName());
    ByteBuffer buffer = ByteBuffer.wrap((byte[])record.value());
    JavaType javaType = TypeFactory.defaultInstance().constructType(type);
    try {
      return avroMapper.readerFor(javaType).with(schemaRegistry.getAvroSchema(record.topic()))
        .readValue(buffer.array(), buffer.arrayOffset(), buffer.limit());
    } catch (IOException e) {
      throw new ConversionException("Failed to convert AvroMessage", e);
    }
  }

  @Override
  public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
    MessageHeaders headers = message.getHeaders();
    Object topicHeader = headers.get(KafkaHeaders.TOPIC);
    String topic = null;
    if (topicHeader instanceof byte[]) {
      topic = new String(((byte[]) topicHeader), StandardCharsets.UTF_8);
    } else if (topicHeader instanceof String) {
      topic = (String) topicHeader;
    } else if (topicHeader == null) {
      Assert.state(defaultTopic != null, "With no topic header, a defaultTopic is required");
    } else {
      throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
        + topicHeader.getClass());
    }
    String actualTopic = topic == null ? defaultTopic : topic;
    Integer partition = headers.get(KafkaHeaders.PARTITION_ID, Integer.class);
    Object key = headers.get(KafkaHeaders.MESSAGE_KEY);
    Object payload = convertPayload(message, topic);
    Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class);
    Headers recordHeaders = initialRecordHeaders(message);
    if (this.headerMapper != null) {
      this.headerMapper.fromHeaders(headers, recordHeaders);
    }
    return new ProducerRecord(topic == null ? defaultTopic : topic, partition, timestamp, key,
      payload,
      recordHeaders);
  }

  protected Object convertPayload(Message<?> message, String topic) {
    try {
      return avroMapper.writer(schemaRegistry.getAvroSchema(topic))
        .writeValueAsBytes(message.getPayload());
    } catch (JsonProcessingException e) {
      throw new ConversionException("Failed to convert object to AvroMessage", e);
    }
  }

Here is how we need to configure ConsumerFactory and KafkaListenerContainerFactory:

@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
  kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true); // This is needed for batch listener
    factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    return factory;

  }

  @Bean
  public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
    KafkaTemplate kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
    kafkaTemplate.setMessageConverter(converter());
    return kafkaTemplate;
  }

  @Bean
  public RecordMessageConverter converter() {
    return new AvroSchemaMessageConverter(avroMapper(), schemaRegistry());
  }

  @Bean
  public SchemaRegistry schemaRegistry() {
    return new SchemaRegistry();
  }

  @Bean
  public AvroMapper avroMapper() {
    AvroMapper mapper = new AvroMapper();
    mapper.configure(Feature.IGNORE_UNKNOWN, true);
    mapper.setSerializationInclusion(Include.NON_NULL);
    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    return mapper;
  }

}
Scleroprotein answered 19/2, 2019 at 4:52 Comment(3)
Thank you so much for taking the time to answer my question @Prabhakar! This is really helpful! One more question I have. What if the AVRO schema changes? Does it mean that as long as the new schema is backward-compatible, i don't have to change my code? Thanks again!Belia
Yes schema changes should be backward compatible.If there is a need to make breaking changes, the best way is to create new topic for that.Scleroprotein
Thanks @Prabhakar! Really appreciate it!Belia

© 2022 - 2024 — McMap. All rights reserved.