Kafka-streams filter messages using headers
Asked Answered
H

2

0

We're trying to use kafka streams for our project to read data from one topic and write to another, and we have a use case to use KafkaHeaders as a mechanism to filter our certain records.

Example, in input topic, you get data of all students belonging to a school. And now in output topic, you only want information about a subset of students based on their class.

Record:

student_name | student_id | student_class

Initially we thought we will use the student object to do this, but that means that we need to deserialize the object and then do the filtering. Instead of that, what we want to do is pass headers with each record. This header will have class information of the students.

Header:

class: v

We were wondering if there's a way to do this using kafka streams. We thought we can use the header in filter function, but filter function doesn't have header information.

    kstreams.filter((k,v) -> {
       howToAccessHeaders?
})

We also tried using process function, but again, it's not clear to use how to filter out the record there.

kstream.process(new CustomerProcessor())

CustomProcessor:

class CustomProcessor implements Processor<String, byte[], String, byte[]> {

    @Override
    public void process(Record<String, byte[]> record) {
        if(record.headers().lastHeader("class").value().toString() == "v"){
            //Without return value, how does the record gets filtered? 
        }
    }
}

Is there something else that we can do to filter the records using the headers? Or is that not possible using kafkaStreams?

PS: We tried using the tranform and transformValues functions, but they are now deprecated.

Highams answered 19/7 at 12:4 Comment(1)
Access the processor context, then forward non matching records. docs.confluent.io/platform/current/streams/developer-guide/… Also, you'll need to use .equals() to compare stringsAncon
E
2

Writing a custom Processor is the right way to go. To forward records downstream, ie, the records you want to keep, you would use ProcessorContext#forward(...). The corresponding ProcessorContext object is passed into the Processor via the Processor#init(...) method you need to overwrite. -- If you don't call forward() on a input record, it's dropped on the floor and filtered out.

Cf https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#accessing-processor-context

Elliottellipse answered 19/7 at 14:16 Comment(1)
Thanks, I was trying to find this information. I will try and filter out by not using forward for some messages.Highams
S
2

I implemented a similar thing using this Processor interface. We can access the headers using record.headers() similar to what you have done and context.forward(record) only the filtered one's.

For filtering, we can use the specification pattern, which is easy to extend in the future without changing the existing code in the processor: https://medium.com/@ferid.aksahin98/specification-design-pattern-210b5df70252

Subaxillary answered 28/7 at 2:27 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.