How to get message from a kafka topic with a specific offset
Asked Answered
K

3

10

We have an HDP cluster with 3 kafka brokers ( from hortonworks )

We want to run kafka console consumer in order to get one message from topic with specific offset

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo01:2181  --topic lopet.lo.pm--partition 0 --offset 34537263 --max-messages 1

But we get the following:

Where we are wrong?

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Partition-offset based consumption is supported in the new consumer only.
Option                                   Description
------                                   -----------
--blacklist <blacklist>                  Blacklist of topics to exclude from
                                           consumption.
--bootstrap-server <server to connect    REQUIRED (unless old consumer is
  to>                                      used): The server to connect to.
--consumer-property <consumer_prop>      A mechanism to pass user-defined
                                           properties in the form key=value to
                                           the consumer.
--consumer.config <config file>          Consumer config properties file. Note
                                           that [consumer-property] takes
                                           precedence over this config.
--csv-reporter-enabled                   If set, the CSV metrics reporter will
                                           be enabled
--delete-consumer-offsets                If specified, the consumer path in
                                           zookeeper is deleted when starting up
--enable-systest-events                  Log lifecycle events of the consumer
                                           in addition to logging consumed
                                           messages. (This is specific for
                                           system tests.)
--formatter <class>                      The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--from-beginning                         If the consumer does not already have
                                           an established offset to consume
                                           from, start with the earliest
                                           message present in the log rather
                                           than the latest message.
--key-deserializer <deserializer for
  key>
--max-messages <Integer: num_messages>   The maximum number of messages to
                                           consume before exiting. If not set,
                                           consumption is continual.
--metrics-dir <metrics directory>        If csv-reporter-enable is set, and
                                           this parameter isset, the csv
                                           metrics will be outputed here
--new-consumer                           Use the new consumer implementation.
                                           This is the default.
--offset <consume offset>                The offset id to consume from (a non-
                                           negative number), or 'earliest'
                                           which means from beginning, or
                                           'latest' which means from end
                                           (default: latest)
Kaenel answered 11/12, 2019 at 17:21 Comment(3)
because those fields are not serving for that purposesWallah
CAN you please show me what is the right syntax?Kaenel
so I need to exclude the - --max-messages 1 from the CLI?Kaenel
I
19

Partition-offset based consumption is supported in the new consumer only.

kafka-console-consumer should use --bootstrap-server, as the warning mentioned.

And you are missing a space before --partition

But otherwise, --partition x --offset y is correct.


Full command

kafka-console-consumer \
  --bootstrap-server kafka0:9092 \
  --topic lopet.lo.pm \
  --partition 0 \
  --offset 34537263 \
  --max-messages 1

Using kcat is another option, if you want to install it

Immethodical answered 11/12, 2019 at 21:36 Comment(3)
Nice Answer, Is it possible with Spring Kafka?Carmody
@RCv I'm not sure I understand what you're asking. You can seek any consumer to any offset, and consume a limited number of messages from that partition, regardless of frameworksImmethodical
Thanks, @One, Yeah My question is not really focused. My question was we can consume the messages using the spring Kafka active listeners. Is it possible to get the specific message of Kafka using this spring Kafka library?.Carmody
C
2

If someone wants to do programmatically with spring java. You can read my blog to get the idea.

https://rcvaram.medium.com/kafka-customer-get-what-needs-only-45d95f9b1105

Spring-Kafka provides an abstract layer for consumers via the listeners.

To consume a specific offset message, You have to do follow the steps.

  1. We have to assign the consumer to the topic partition using consumer.assign method of consumer class.
  2. After assigning the consumer, we have to the seek offset where it needs to look when we resume the consumer using consume.seek.
  3. We can resume the consumer and with the help of poll method, It will consume the message from the kafka. Before that, our consumerFactory should have set the max.poll.records to 1. Then the consumer will return only one record to us.
  4. finally, we can commit and pause the consumer.

If you are going to use Kafka consumer then you should be aware of concurrent processes as well since Kafka consumers are not thread-safe.

My code is in my GitHub repository and I published this in the maven repository as well.

Carmody answered 30/1, 2022 at 3:41 Comment(5)
This isn't using Spring, though, you only need kafka-clients.jar. Plus, you should still be able to seek after using subscribe method; you just don't have guaranteed access to a specific partitionImmethodical
Yeah, the consumer from client jar but, I used spring Kafka consumer factory to create a consumer. and I extend my class with AbstractSeekConsumerAware class because it takes care of much of the underlying complexity. In that case, this consumer is a product of spring configurations ryt. That's why I said, This is for spring java users. For more details, you can check my repoCarmody
I didn't get your last point. @OneCricketeer. If you provide some more details or some references then that would be great for me. ThanksCarmody
1) AFAICT, the factory is pointless since you're only using it to construct a producer with a hard coded property map, and you never set/override the AbstractSeekConsumerAware methods in the code 2) You say "we have to .assign the consumer. That's true for getting a specific record in a partition, sure, but it's not true to just get any single record, which can be done with .subscribeImmethodical
Thanks OneCricketeer, I got your points but not sure about that. Once I checked, i will update my answer. Thanks a lotCarmody
P
1

kcat does this well

kcat -b broker-hostname:9092 \
  -t topic-name \
  -p partition \
  -o offset \
  -c count-of-messages
Puttier answered 7/12, 2022 at 21:22 Comment(1)
but seems to be in maintenance mode.Kean

© 2022 - 2025 — McMap. All rights reserved.