I've develope a Spring XML + Kafka example. In this example I wanted to understand how to trigger the consumer side of it. I don't want to use main method for consumer.
MainApp.java
public class MainApp {
private static final Faker FAKER = Faker.instance().instance();
public static void main(String[] args) throws InterruptedException {
ApplicationContext context = new ClassPathXmlApplicationContext("/context.xml");
EmployeeProducer employeeProducer = (EmployeeProducer) context.getBean("employeeProducer");
Employee employee = Employee.builder()
.empId(ThreadLocalRandom.current().nextInt(1,100))
.firstName(FAKER.name().firstName())
.lastName(FAKER.name().lastName())
.gender(getGender())
.build();
System.out.println("Employee : "+ employee);
employeeProducer.sendMessage("t-employee", employee);
Thread.sleep(500_000_000);
}
private static String getGender(){
int ramdomN = ThreadLocalRandom.current().nextInt(0,1);
String sex;
if (ramdomN == 0) {
sex = "M";
} else {
sex = "F";
}
return sex;
}
KafkaProducer.java
public class EmployeeProducer {
public void sendMessage(String topicName, Employee employee) {
Properties properties = new Properties();
properties.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
KafkaProducer<String, Employee> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.send(new ProducerRecord<>(topicName, employee));
kafkaProducer.close();
}
}
KafkaConsumer.java
public class EmployeeConsumer {
public void consumeMessage() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG, Employee.class);
KafkaConsumer<String, Employee> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(List.of(AppConfigs.topicName));
while (true) {
ConsumerRecords<String, Employee> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Employee> record : records) {
System.out.println("Key: " + record.key() + ", Value:" + record.value());
System.out.println("Partition:" + record.partition() + ",Offset:" + record.offset());
}
}
}
}
context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="employeeProducer" class="com.example.EmployeeProducer" />
<bean id="employeeConsumer" class="com.example.EmployeeConsumer" />
<!-- <bean class="org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor" />
<bean class="org.springframework.kafka.config.KafkaListenerEndpointRegistry"/>-->
</beans>
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spring-conflient-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.2</version>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.15.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.19.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.28</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>6.0.18</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>6.0.18</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>6.0.18</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.4</version>
</dependency>
</dependencies>
</project>