can a kafka consumer filter messages before polling all of them from a topic?
Asked Answered
T

4

22

It was said that consumers can only read the whole topic. No luck doing evaluations on brokers to filter messages.

It implies that we have to consume/receive all messages from a topic and filter them on the client side.

That's too much. I was wondering if we can filter and receive specific types of messages, based on somethings already passed to brokers, such as the msg keys or other things.

from the method, Consumer.poll(timeout), it seems no extra things we can do.

Tourist answered 26/6, 2018 at 19:24 Comment(5)
Your topic should only carry one type of messages. Point is if you do your filtration while inserting data into separate topics then consuming from each topic should be pretty straighforwardCogen
If number of consumers was large, like, a million, and the kafka is used as a pipe line among the consumers' communications, it would not be a solution for creating a specific topic for each consumer, right?Tourist
No. My understanding of consumers is that consumers are dumb; meaning the consumers are just listening to a particular topic and processing whatever comes out of the Kafka queue. The moment you start using Kafka as a "communication pipeline" between these so called "consumers" you are running into trouble. What you need is a ESB if you want to manage communication between several million components.Cogen
Sorry, what's the ESB?Tourist
From my perspective, the original question has been already answered on below responses. Please mark the correct answer or provide a feedback why has not been answered. Thanks! :)Titration
S
10

No, with the Consumer you cannot only receive some messages from topics. The consumer fetches all messages in order.

If you don't want to filter messages in the Consumer, you could use a Streams job. For example, Streams would read from your topic and only push to another topic the messages the consumer is interested in. Then the consumer can subscribe to this new topic.

Suffumigate answered 26/6, 2018 at 20:21 Comment(5)
From my understanding, even with streams you will still bring the data to the Client side.Titration
Not sure how your pipeline looks like but in some cases you can run Streams apps "close" to the cluster where it's relatively cheap to consume and write back to Kafka.Suffumigate
Not sure how this can be supported, I would appreciate if you can give me an example :)Titration
Thanks. Kafka supports a pub/sub model. My original goal was to take the benefit of its high throughput while utilizing it as a pipeline for communications among the consumers (i.e., clients). It might not be the right way for using Kafka.Tourist
issues.apache.org/jira/browse/KAFKA-6020Zoonosis
J
4

Each Kafka topic should contain messages that are logically similar, just to stay on topic. Now, sometimes it might happen that you have a topic, let's say fruits, which contains different attributes of the fruit (maybe in json format). You may have different fruits messages pushed by the producers, but want one of your consumer group to process only apples. Ideally you might have gone with topic names with individual fruit name, but let's assume that to be a fruitless endeavor for some reason (maybe too many topics). In that case, you can override the default partitioning scheme in Kafka to ignore the key and do a random partitioning, and then pass your custom-partitioner class through the partitioner.class property in the producer, that puts the fruit name in the msg key. This is required because by default if you put the key while sending a message, it will always go to the same partition, and that might cause partition imbalance.

The idea behind this is sometimes if your Kafka msg value is a complex object (json, avro-record etc) it might be quicker to filter the record based on key, than parsing the whole value, and extracting the desired field. I don't have any data right now, to support the performance benefit of this approach though. It's only an intuition.

Juxtaposition answered 27/6, 2018 at 6:14 Comment(1)
Maybe a point-to-point messaging application, rather than Kafka, would be more suitable.Tourist
T
2

Once records are already pushed into Kafka cluster, there is not much that you can do. Whatever you want to filter, you will always have to bring the chunks of data to the client.

Unfortunately, the only option is to pass that logic to the Producers, in that way you can push the data into multiple topics based on particular logic you can define.

Titration answered 26/6, 2018 at 20:24 Comment(3)
Agree. Once subscribing to the topic, then the consumer would accept whatever in the topic and a filtering process would be done on the consumer side. The drawback for this is a waste of the network bandwidth.Tourist
@Tourist there is nothing you can do on Consumers side. There are others strategies you can follow in order to save network bandwidth.Titration
It seemed like Solace messaging app, might do more custom stuff. Here is their slogan, "Message Routing, Filtering & Ordering"Tourist
A
0

Kafka Consumer will receive all messages from topic. But if there is any custom message type (MyMessage) that only needs to be consumed then it can be filtered in Deserializer class. If the consumer gets two types of messages like String and MyMessage then String type messages will be ignored and MyMessage type messages will be processed.

public class MyMessageDeserializer implements Deserializer<MyMessage> {

@Override
public MyMessage deserialize(String topic, byte[] data) {
    try {
        if (data == null){
            logger.info("Null received at deserializing");
            return null;
        }
        return objectMapper.readValue(new String(data, "UTF-8"), MyMessage.class);
    } catch (Exception e) {
        logger.error("Deserialization exception: " + e.getMessage());
    }
    return null;
}
}
Affectionate answered 14/3, 2022 at 20:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.