I am using kubernetes microserice in which I have create single Kafka micro service to consume all topic and perform its activities. Code sample below:
const { Kafka, logLevel } = require("kafkajs");
const kafka = new Kafka({
logLevel: logLevel.INFO,
clientId: "kafka9845",
brokers:["90.45.78.123"],
connectionTimeout:30000,
sessionTimeout:30000,
requestTimeout:30000,
heartbeatInterval:10000,
retry: {
initialRetryTime:5000,
retries:200
}
});
const topicGroup1 = ["topic1","topic2"];
const topicGroup2 = ["topic3", "topic4", "topic5"];
const consumer1 = kafka.consumer({groupId: 'consumer1', fromBegining:true});
const funConsumer1 = async ( consumer1 ) => {
await topicGroup1.forEach((topic) => {
consumer1.subscribe({topic:topic });
});
await funConsumer1.run({
autoCommit: false,
eachMessage: async (task) => {
console.log(task);
await funConsumer1.commitOffsets([{topic: task.topic, offset : (Number(task.message.offset) +1).toString()}])
}
});
}
const consumer2 = kafka.consumer({groupId: 'consumer2', fromBegining:true});
const funConsumer2 = async ( consumer2 ) => {
await topicGroup2.forEach((topic) => {
consumer2.subscribe({topic:topic });
});
await funConsumer2.run({
autoCommit: false,
eachMessage: async (task) => {
console.log(task);
await funConsumer2.commitOffsets([{topic: task.topic, offset : (Number(task.message.offset) +1).toString()}])
}
});
}
consumer1.on('consumer.crash', async (payload) => {
try {
consumer1.disconnect();
} catch(error) {
} finally {
setTimeout( async () => {
await consumer1.connect();
funConsumer1(consumer1).catch(console.error);
}, 5000);
}
});
consumer2.on('consumer.crash', async (payload) => {
try {
consumer2.disconnect();
} catch(error) {
} finally {
setTimeout( async () => {
await consumer2.connect();
funConsumer2(consumer2).catch(console.error);
}, 5000);
}
});
const funConnect = () => {
consumer1.connect();
consumer2.connect();
funConsumer1(consumer1).catch(console.error);
funConsumer2(consumer2).catch(console.error);
}
funConnect();
process.on('SIGINT', function () {
consumer1.disconnect();
consumer2.disconnect();
});
I am using "kafkajs":"^2.0.0" and "kafka-node":"^5.0.0"
Everyday we are producing million of message inside different topic
Problem: I have analysis consumer processing in queue way.
Requirement: Both consumer should consume message in parallel.