We are doing some integration test for out application using Cucumber and we are having some issues testing a @KafkaListener
. We managed to use an EmbeddedKafka and produce data into it.
But the consumer never receives any data and we don't know what is going on.
This is our code:
Producer Config
@Configuration
@Profile("test")
public class KafkaTestProducerConfig {
private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
@Autowired
protected EmbeddedKafkaBroker embeddedKafka;
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
embeddedKafka.getBrokersAsString());
props.put(SCHEMA_REGISTRY_URL, "URL");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, GenericRecord> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, GenericRecord> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Consumer Config
@Configuration
@Profile("test")
@EnableKafka
public class KafkaTestConsumerConfig {
@Autowired
protected EmbeddedKafkaBroker embeddedKafka;
private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
props.put(SCHEMA_REGISTRY_URL, "URL");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
return props;
}
@Bean
public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
KafkaAvroDeserializer avroDeserializer = new KafkaAvroDeserializer();
avroDeserializer.configure(consumerProperties(), false);
return new DefaultKafkaConsumerFactory<>(consumerProperties(), new StringDeserializer(), avroDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
Integration Test
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = Application.class)
@ActiveProfiles("test")
@EmbeddedKafka(topics = {"TOPIC1", "TOPIC2", "TOPIC3"})
public class CommonStepDefinitions implements En {
protected static final Logger LOGGER = LoggerFactory.getLogger(CommonStepDefinitions.class);
@Autowired
protected KafkaTemplate<String, GenericRecord> kafkaTemplate;
}
Step Definitions
public class KafkaStepDefinitions extends CommonStepDefinitions {
private static final String TEMPLATE_TOPIC = "TOPIC1";
public KafkaStepDefinitions(){
Given("given statement", () -> {
OperationEntity operationEntity = new OperationEntity();
operationEntity.setFoo("foo");
kafkaTemplate.send(TEMPLATE_TOPIC, AvroPojoTransformer.pojoToRecord(operationEntity));
});
}
}
The Consumer This same code is working fine for the production Bootstrap server, but its never reached with the Embedded Kafka
@KafkaListener(topics = "${kafka.topic1}", groupId = "groupId")
public void consume(List<GenericRecord> records, Acknowledgment ack) throws DDCException {
LOGGER.info("Batch of {} records received", records.size());
//do something with the data
ack.acknowledge();
}
Everything in the logs looks fine, but we don't know what is missing.
Thanks in advance.
partitions assigned
INFO log, I can pretty much guarantee the problem is on the publishing side. Again, if you can post the logs, I can take a look. – Scissor