Correct way to restart or shutdown the stream using UncaughtExceptionHandler
Asked Answered
S

2

7

I have a stream app with below driver code for real-time message transformation.

String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);

source.transform(() -> new MyTransformer()).to(...);

KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("UncaughtExceptionHandler " + e.getMessage());
        System.exit(0);
    }
});


streams.cleanUp();
streams.start();

Runtime.getRuntime().addShutdownHook(new  Thread(streams::close));

After few minutes of execution, app throws the below exception and then not progressing through the stream.

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 13 more

I tried to flush out the /tmp/kafka-streams/TRANSFORMATION-APP directory and restarted the app but again throws the same exception. One thing I noticed was that app works fine until it transforms all backlog messages but throws exception after processing some of the new messages!

Sometimes it also throws the below uncaught exceptions.

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance

After throwing (one of) these exceptions, app is still running but not progressing through the stream.

What is the correct way to handle these errors?. Is it possible to restart the stream programmatically, without killing the app? This app is under monit. At at worst case, I would prefer to terminate the app properly (without any message loss), so that monit can restart it.

The input topic has 100 partitions and I have set num.stream.threads to 100 in the app configuration. The app is on Kafka 0.10.1.1-cp1.

Skunk answered 22/2, 2017 at 17:24 Comment(0)
A
7

Kakfa 0.10.1.x has some bugs with regard to multi threading. You can either upgrade to 0.10.2 (AK released today, CP 3.2 should follow shortly) or you apply the following workaround:

  • use single thread execution only
  • if you need more thread, start more instances
  • for each instance, configure a different state directory

You might also need to delete your local state directory (only once) before restarting to get into a overall consistent application state.

In any case, there will be no data loss. Kafka Streams guarantees at-least-once processing semantics even in case of failure. This applies to you local stores too -- after you delete local state dir, on startup those state will be recreated from the underlying Kafka changelog topics (it an expensive operation though).

The UncaughtExceptionHandler does only provide you a way to figure out that a thread died. It does not (directly) help to restart your application. To recover died threads, you need to close KafkaStreams instance completely and create/start a new one. We hope to add better support for this in the future.

Arteritis answered 22/2, 2017 at 21:12 Comment(5)
Thanks. I restarted the app with single thread. It was running fine for a while but throws failed to rebalance exception which was caught in the UncaughtExceptionHandler . To resolve that I have increased the rebalance.backoff.ms and zookeeper.session.timeout.ms parameters for internal producer/consumer. Now it seems running fine!Skunk
As a follow up, is there any best practices to follow for closing the stream instance completely? I use streams::close before start the stream in my `UncaughtExceptionHandler'. I don't the consequences..Skunk
That's ok. You should only no call streams::close at the same time by two threads -- this might deadlock. Otherwise, it's fine to close in exception handler.Arteritis
@MatthiasJ.Sax If it should not be called by more than one thread at a time, then why not make it synchronized in the KafkaStreams class (just asking!)Pogrom
There is an update to this question at: kafka-tutorials.confluent.io/error-handling/kstreams.htmlCultivator
A
4

I understand that this question was asked far ago, but will post update about new Kafka-Streams feature. Since Kafka-Streams 2.8.0, you have the ability to automatically replace failed stream thread (that caused by uncaught exception) using KafkaStreams method void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh); with StreamThreadExceptionResponse.REPLACE_THREAD. With that, failed message will be reprocessed on new replaced stream. For more details please take a look at Kafka Streams Specific Uncaught Exception Handler

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

Prior to Kafka-Streams 2.8.0, you can implement logic of restarting failed KafkaStreams by your own. The idea is like the following:

KafkaStreams kafkaStreams = createYourKafkaStreams();
kafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, kafkaStreams));

private KafkaStreams.StateListener createErrorStateListener(String sourceTopicName, KafkaStreams kafkaStreams) {
    return (newState, oldState) -> {
        if (newState == KafkaStreams.State.ERROR) {
            log.error("Kafka Stream is in ERROR state for source topic [{}]", sourceTopicName);
            replaceFailedKafkaStream(kafkaStreams, sourceTopicName);
        }
    };
}

// invoke this method either right after stream died, or by scheduling
private void replaceFailedKafkaStream(KafkaStreams kafkaStreams, String sourceTopicName) {
    kafkaStreams.close();
    KafkaStreams newKafkaStreams = createYourKafkaStreams();
    newKafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, newKafkaStreams));
    newKafkaStreams.start();
}
Addend answered 20/6, 2021 at 17:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.