I want to use kafkajs in my Node.js project. Let me show my code.
Producer:
const producer = kafka.producer();
await producer.connect();
const items = await getItems(); // getting somehow 5k items to produce
await producer.send({
topic: "items",
messages: items.map(c => ({ value: JSON.stringify(c) })),
});
// even if I split here on chunks like this, in consumer I get batch with more than 100 items
/*
const chunked = _.chunk(items, 100);
for (var chunk of chunked) {
await producer.send({
topic: config.kafka.topics.tm.itemsToParse,
messages: chunk.map(c => ({ value: JSON.stringify(c) })),
headers: { from: "csv_parser" },
});
}
*/
Consumer:
const consumer = kafka.consumer({ groupId: "groupId" });
await consumer.connect();
await consumer.subscribe({ topic: "items" });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
for (chunk of _.chunk(batch.messages, 100)) {
if (!isRunning()) break;
// I can handle by 100 items
let items = chunk.map(m => JSON.parse(m.value));
/*
handle items somehow, max ~30 sec
*/
for (message of chunk) resolveOffset(message.offset); // took from example in kafkajs docs
await heartbeat();
}
},
});
One instance can handle by 100 items (maximum). So now if one consumer take this batch with 5k items, no one else will take it too (from same groupId, to handle it in parallel). The questions are:
- Is it real to split batch to read it simultaneously from many consumers?
- Can I configure consumer to consume defined count of messages in batch?
- Should producer send batches with correct batch size (by 100 items)? => producer must adapt to consumer?
- How to correctly send batches from producer if I need defined size of batch?