@CuiPengFei,
So in my travels trying to find an answer to this myself I stumbled upon a repository that explains how to handle gracefully cleaning up the connections from disconnected clients.
The encapsulate all of the SSE EventOutput logic into a Service/Manager. In this they spin up a thread that checks to see if the EventOutput has been closed by the client. If so they formally close the connection (EventOutput#close()). If not they try to write to the stream. If it throws an Exception then the client has disconnected without closing and it handles closing it. If the write is successful then the EventOutput is returned to the pool as it is still an active connection.
The repo (and the actual class) are available here. Ive also included the class without imports below in case the repo is ever removed.
Note that they bind this to a Singleton. The store should be globally unique.
public class SseWriteManager {
private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService messageExecutorService;
private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class);
public SseWriteManager() {
messageExecutorService = Executors.newScheduledThreadPool(1);
messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS);
}
public void addSseConnection(String id, EventOutput eventOutput) {
logger.info("adding connection for id={}.", id);
connectionMap.put(id, eventOutput);
}
private class messageProcessor implements Runnable {
@Override
public void run() {
try {
Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator();
while (iterator.hasNext()) {
boolean remove = false;
Map.Entry<String, EventOutput> entry = iterator.next();
EventOutput eventOutput = entry.getValue();
if (eventOutput != null) {
if (eventOutput.isClosed()) {
remove = true;
} else {
try {
logger.info("writing to id={}.", entry.getKey());
eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build());
} catch (Exception ex) {
logger.info(String.format("write failed to id=%s.", entry.getKey()), ex);
remove = true;
}
}
}
if (remove) {
// we are removing the eventOutput. close it is if it not already closed.
if (!eventOutput.isClosed()) {
try {
eventOutput.close();
} catch (Exception ex) {
// do nothing.
}
}
iterator.remove();
}
}
} catch (Exception ex) {
logger.error("messageProcessor.run threw exception.", ex);
}
}
}
public void shutdown() {
if (messageExecutorService != null && !messageExecutorService.isShutdown()) {
logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown.");
messageExecutorService.shutdown();
} else {
logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown().");
}
}}