How to track async threads and handle broken pipe errors with SSE emitter?
Asked Answered
S

1

6

I'm looking to catch IOExceptions caused by a broken pipe before the async threads dispatch the result to Tomcat. Essentially the client disconnects and the error bubbles up to Tomcat before I can catch the issue. I have no control over the client(s).

I have a Executors.newCachedThreadPool() in my Controller and my endpoint looks like this:

    @GetMapping("/mySSEStream")
    public SseEmitter sseEmitter() {
        SseEmitter emitter = new SseEmitter(-1L);
        MyRunner streamingRunner = new MyRunner(emitter);
        cachedThreadPool.execute(streamingRunner);
        return emitter;
    }

MyRunner:

public class MyRunner implements Runnable {
    private final SseEmitter sseEmitter;

    public StreamingRunner(SseEmitter sseEmitter) {
        this.sseEmitter = sseEmitter;
    }

    @Override
    public void run() {
        try {
            sendData();
        } catch (IOException ioException) {
            sseEmitter.completeWithError(ioException);
        } finally {
            try {
                sseEmitter.complete();
            } catch (IllegalStateException illegalStateException) {
                log.debug("SSE Emitter already closed...");
            }
        }
    }

So the controller thread creates the SSE emitter, a second thread (named http-nio-9998-exec-1) accepts the request, returns a 200, and begins emitting data. Then I get a debug log saying that the Dispatcher Servlet is "Exiting but response remains open for further handling". The client disconnect happens. Then the third thread (named http-nio-9998-exec-2) begins processing the broken pipe error. I get a message WebAsyncManager - Async error, dispatch to /mySSEStream. However, no matter how I try to catch the error, it bubbles up to Tomcat and responds with a 500 error. I see this in the debugger. Obviously the 500 doesn't arrive at the client, but our endpoint metrics are littered with 500 errors.

I can't catch this error anywhere. I've tried using the AsyncHandlerInterceptor to no avail. That recommends using the WebAsyncManager to register a deferred result, but I couldn't get that working. I've tried @ControllerAdvice and @ExceptionHandler, but that thread talks directly to the Tomcat container. I've also tried Filter but that doesn't work for outbound messages, it only filters incoming from Tomcat.

I'm sure I'm missing something here and any help is greatly appreciated.

Thanks.

Sandry answered 10/3, 2023 at 20:30 Comment(1)
Did you find a solution? I've got the same issueAutograft
R
0

Probably not the answer you're looking for, but after many fruitless hours, this is the solution I came up with. I extended my logback configuration to not log these errors. I found the logger name in the ApplicationDispatcher where the error itself is logged.

public class SseBrokenPipeLogFilter extends Filter<ILoggingEvent> {

@Override
public FilterReply decide(ILoggingEvent iLoggingEvent) {
    var throwable = iLoggingEvent.getThrowableProxy();
    if (throwable != null
            && throwable.getClassName().equals("java.io.IOException")
            && throwable.getMessage().equals("Broken pipe")
            && iLoggingEvent.getLoggerName().contains("org.apache.catalina.core.ContainerBase")) {
        return FilterReply.DENY;
    }
    return FilterReply.NEUTRAL;
}
}

This is the logback-spring.xml file, it's just Spring Boot defaults with the extra filter.

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>${CONSOLE_LOG_THRESHOLD}</level>
        </filter>
        <filter class="cz.okd.okdbe.configuration.SseBrokenPipeLogFilter" />
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>${CONSOLE_LOG_CHARSET}</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE" />
    </root>
</configuration>
Referential answered 28/10, 2024 at 19:58 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.