I have confluent kafka installed running on docker. In the topic, I have 10 partitions. The problem is that I cannot consume messages from that topic, but I can Produce messages on the topic. I am trying to consume from the topic using C# confluent.kafka driver 1.5.1
(latest) with librd.kafka 1.5.0
(latest).
The docker-compose
file I start kafka with is the following
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
networks:
- bridge_network
ports:
- "3001:3001"
environment:
ZOOKEEPER_CLIENT_PORT: 3001
ZOOKEEPER_TICK_TIME: 3000
broker:
image: confluentinc/cp-kafka
hostname: broker
depends_on:
- zookeeper
ports:
- "3002:3002"
networks:
- bridge_network
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:3002'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
kafka_manager:
image: sheepkiller/kafka-manager
hostname: kafka_manager
depends_on:
- zookeeper
ports:
- '9000:9000'
networks:
- bridge_network
environment:
ZK_HOSTS: 'zookeeper:3001'
networks:
bridge_network:
driver: bridge
driver_opts:
com.docker.network.enable_ipv6: "false"
My consumer configuration in C# is the following:
var consumer = new ConsumerBuilder<string, string>(new Dictionary<string, string>
{
{ "bootstrap.servers", "PLAINTEXT://localhost:3002" },
{ "group.id", "some-test-group" },
{ "auto.offset.reset", "latest"},
{ "compression.codec", "gzip" },
{ "enable.auto.commit", "false" }
}).Build();
consumer.Subscribe("some-test-topic");
while (true)
{
var cr = consumer.Consume(30_000);
if (cr == null || cr.Message.Key == null || cr.Message.Value == null)
{
Console.WriteLine("that's it");
break;
}
Console.WriteLine(cr.Message.Key + ": " + cr.Message.Value);
}
I'm sure there are messages in the topic's partitions as I can examine the topic using kafka tool 2.0
Configuration I used for kafka tool :
I'm pretty sure I have missed something in the config file but after 2 days of documentaton reading and slamming my head in the wall i still cannot find the issue. So can anyone help?