Kafkajs run multiple consumer in single file
Asked Answered
I

1

0

I am using kubernetes microserice in which I have create single Kafka micro service to consume all topic and perform its activities. Code sample below:

const { Kafka, logLevel } = require("kafkajs");
const kafka = new Kafka({
   logLevel: logLevel.INFO,
   clientId: "kafka9845",
   brokers:["90.45.78.123"],
   connectionTimeout:30000,
   sessionTimeout:30000,
   requestTimeout:30000,
   heartbeatInterval:10000,
   retry: {
      initialRetryTime:5000,
      retries:200
   }
});

const topicGroup1 = ["topic1","topic2"];
const topicGroup2 = ["topic3", "topic4", "topic5"];

const consumer1 = kafka.consumer({groupId: 'consumer1', fromBegining:true});
const funConsumer1 = async ( consumer1 ) => {
   await topicGroup1.forEach((topic) => {
      consumer1.subscribe({topic:topic });
   });
   await funConsumer1.run({
    autoCommit: false,
    eachMessage: async (task) => {
        console.log(task);
        await funConsumer1.commitOffsets([{topic: task.topic, offset : (Number(task.message.offset) +1).toString()}])
    }
  });
 }

  const consumer2 = kafka.consumer({groupId: 'consumer2', fromBegining:true});
  const funConsumer2 = async ( consumer2 ) => {
    await topicGroup2.forEach((topic) => {
      consumer2.subscribe({topic:topic });
    });
    await funConsumer2.run({
      autoCommit: false,
      eachMessage: async (task) => {
        console.log(task);
        await funConsumer2.commitOffsets([{topic: task.topic, offset : (Number(task.message.offset) +1).toString()}])
     }
     });
   }

  consumer1.on('consumer.crash', async (payload) => {
    try {
      consumer1.disconnect();
    } catch(error) {

   } finally {
    setTimeout( async () => {
        await consumer1.connect();
        funConsumer1(consumer1).catch(console.error);
    }, 5000);
   }
 });

  consumer2.on('consumer.crash', async (payload) => {
   try {
      consumer2.disconnect();
   } catch(error) {

   } finally {
     setTimeout( async () => {
        await consumer2.connect();
        funConsumer2(consumer2).catch(console.error);
    }, 5000);
   }
 });


 const funConnect = () => {
   consumer1.connect();
   consumer2.connect();
   funConsumer1(consumer1).catch(console.error);
   funConsumer2(consumer2).catch(console.error);
 }

funConnect();
process.on('SIGINT', function () {
  consumer1.disconnect();
  consumer2.disconnect();
});

I am using "kafkajs":"^2.0.0" and "kafka-node":"^5.0.0"

Everyday we are producing million of message inside different topic

Problem: I have analysis consumer processing in queue way.

Requirement: Both consumer should consume message in parallel.

Irresistible answered 20/5, 2023 at 5:17 Comment(1)
@brian-fitzpatrick please answer to my questionsIrresistible
B
0

If you're using Kubernetes, then just make a Deployment for each consumer process. The code provided doesn't have the consumers interacting with one another, so they don't have to be in the same process.

Otherwise, there's no reason that one group/consumer cannot subscribe to multiple topics. You should not use foreach to subscribe, however. Pass the whole array to subscribe function

You can check which topic is being consumed from one consumer that is subscribed to multiple topics using a if/switch case within the eachMessage block

Bahuvrihi answered 20/5, 2023 at 13:28 Comment(10)
thanks for your message. I have 6 consumer so I can't create 6 different microserivce. Each consumer I have assigned different to different groupIrresistible
I see no reason you couldn't make 6Bahuvrihi
Also as mentioned, I believe each of your consumers is only getting one topic when you're using a foreach loop to subscribeBahuvrihi
I have create group means 1 consumer can subscribe with multiple topic.Irresistible
Yes, it can, but wasn't your question about two separate groups working in parallel? One group with 6 topics won't be as scalable as separate deploymentsBahuvrihi
Yes, one group can read 5 topics... Yes, two groups can read 2 and 3 topics simultaneously, but not in one deployment... You also do not need both kafkajs and kafka-node. Pick only oneBahuvrihi
I want some deep understanding of connectionTimeout, sessionTimeout, requestTimeout, heartbeatInterval and reblancing. I have observed I am getting error log for reblancingIrresistible
Please open a new post to ask new questionsBahuvrihi
Can it's possible to achieve by creating multiple replicas of current microservices.Irresistible
Please refer my last commentBahuvrihi

© 2022 - 2025 — McMap. All rights reserved.