Update Mar 23, 2018: Kafka 1.0 provides much better and easier handling for bad error messages ("poison pills") via KIP-161 than what I described below. See default.deserialization.exception.handler in the Kafka 1.0 docs.
This could potentially be things like messages that I can't deserialize properly [...]
Ok, my answer here focuses on the (de)serialization issues as this might be the most tricky scenario to handle for most users.
[...] or perhaps the processing/filtering logic fails in some unexpected way (I have no external dependencies so there should be no transient errors of that sort).
The same thinking (for deserialization) can also be applied to failures in the processing logic. Here, most people tend to gravitate towards option 2 below (minus the deserialization part), but YMMV.
I was considering wrapping all my processing/filtering code in a try catch and if an exception was raised then routing to an "error topic". Then I can study the message and modify it or fix my code as appropriate and then replay it on to master. If I let any exceptions propagate, the stream seems to get jammed and no more messages are picked up.
- Is this approach considered best practice?
Yes, at the moment this is the way to go. Essentially, the two most common patterns are (1) skipping corrupted messages or (2) sending corrupted records to a quarantine topic aka a dead letter queue.
- Is there a convenient Kafka streams way to handle this? I don't think there is a concept of a DLQ...
Yes, there is a way to handle this, including the use of a dead letter queue. However, it's (at least IMHO) not that convenient yet. If you have any feedback on how the API should allow you to handle this -- e.g. via a new or updated method, a configuration setting ("if serialization/deserialization fails send the problematic record to THIS quarantine topic") -- please let us know. :-)
- What are the alternative ways to stop Kafka jamming on a "bad message"?
- What alternative error handling approaches are there?
See my examples below.
FWIW, the Kafka community is also discussing the addition of a new CLI tool that allows you to skip over corrupted messages. However, as a user of the Kafka Streams API, I think ideally you want to handle such scenarios directly in your code, and fallback to CLI utilities only as a last resort.
Here are some patterns for the Kafka Streams DSL to handle corrupted records/messages aka "poison pills". This is taken from http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
Option 1: Skip corrupted records with flatMap
This is arguably what most users would like to do.
- We use
flatMap
because it allows you to output zero, one, or more output records per input record. In the case of a corrupted record we output nothing (zero records), thereby ignoring/skipping the corrupted record.
- Benefit of this approach compared to the others ones listed here: We need to manually deserialize a record only once!
- Drawback of this approach:
flatMap
"marks" the input stream for potential data re-partitioning, i.e. if you perform a key-based operation such as groupings (groupBy
/groupByKey
) or joins afterwards, your data will be re-partitioned behind the scenes. Since this might be a costly step we don't want that to happen unnecessarily. If you KNOW that the record keys are always valid OR that you don't need to operate on the keys (thus keeping them as "raw" keys in byte[]
format), you can change from flatMap
to flatMapValues
, which will not result in data re-partitioning even if you join/group/aggregate the stream later.
Code example:
Serde<byte[]> bytesSerde = Serdes.ByteArray();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
// Input topic, which might contain corrupted messages
KStream<byte[], byte[]> input = builder.stream(bytesSerde, bytesSerde, inputTopic);
// Note how the returned stream is of type KStream<String, Long>,
// rather than KStream<byte[], byte[]>.
KStream<String, Long> doubled = input.flatMap(
(k, v) -> {
try {
// Attempt deserialization
String key = stringSerde.deserializer().deserialize(inputTopic, k);
long value = longSerde.deserializer().deserialize(inputTopic, v);
// Ok, the record is valid (not corrupted). Let's take the
// opportunity to also process the record in some way so that
// we haven't paid the deserialization cost just for "poison pill"
// checking.
return Collections.singletonList(KeyValue.pair(key, 2 * value));
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return Collections.emptyList();
}
);
Option 2: dead letter queue with branch
Compared to option 1 (which ignores corrupted records) option 2 retains corrupted messages by filtering them out of the "main" input stream and writing them to a quarantine topic (think: dead letter queue). The drawback is that, for valid records, we must pay the manual deserialization cost twice.
KStream<byte[], byte[]> input = ...;
KStream<byte[], byte[]>[] partitioned = input.branch(
(k, v) -> {
boolean isValidRecord = false;
try {
stringSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException ignored) {}
return isValidRecord;
},
(k, v) -> true
);
// partitioned[0] is the KStream<byte[], byte[]> that contains
// only valid records. partitioned[1] contains only corrupted
// records and thus acts as a "dead letter queue".
KStream<String, Long> doubled = partitioned[0].map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
// Don't forget to actually write the dead letter queue back to Kafka!
partitioned[1].to(Serdes.ByteArray(), Serdes.ByteArray(), "quarantine-topic");
Option 3: Skip corrupted records with filter
I only mention this for completeness. This option looks like a mix of options 1 and 2, but is worse than either of them. Compared to option 1, you must pay the manual deserialization cost for valid records twice (bad!). Compared to option 2, you lose the ability to retain corrupted records in a dead letter queue.
KStream<byte[], byte[]> validRecordsOnly = input.filter(
(k, v) -> {
boolean isValidRecord = false;
try {
bytesSerde.deserializer().deserialize(inputTopic, k);
longSerde.deserializer().deserialize(inputTopic, v);
isValidRecord = true;
}
catch (SerializationException e) {
// log + ignore/skip the corrupted message
System.err.println("Could not deserialize record: " + e.getMessage());
}
return isValidRecord;
}
);
KStream<String, Long> doubled = validRecordsOnly.map(
(key, value) -> KeyValue.pair(
// Must deserialize a second time unfortunately.
stringSerde.deserializer().deserialize(inputTopic, key),
2 * longSerde.deserializer().deserialize(inputTopic, value)));
Any help greatly appreciated.
I hope I could help. If yes, I'd appreciate your feedback on how we could improve the Kafka Streams API to handle failures/exceptions in a better/more convenient way than today. :-)
quarantine topic
approach seems risky as a bad producer could result in high overhead, especially if multiple consumers of that topic keep busy pushing the same malformed message to that quarantine topic 2> TheflatMap
approach sounds more intuitive, and potential re-partitioning overhead could be minimized with KStream<byte[], Long> doubled = input.flatMap( .. validate deserialization ofk
andv
and have the drawback of having to deserialize (safely this time) the key again; as the cost (of deserialization) of the key is much less than the cost for the value – Tehee