Should Spring SseEmitter.complete() trigger an EventSource reconnect - how to close connection server-side
Asked Answered
S

3

9

I'm trying to set up a Spring SseEmitter to send a sequence of updates of the status of a running job. It seems to be working but:

Whenever I call emitter.complete() in in my Java server code, the javascript EventSource client calls the registered onerror function and then calls my Java endpoint again with a new connection. This happens in both Firefox and Chrome.

I can probably send an explicit "end-of-data" message from Java and then detect that and call eventSource.close() on the client, but is there a better way?

What is the purpose of emitter.complete() in that case?

Also, if I always have to terminate the connection on the client end, then I guess every connection on the server side will be terminated by either a timeout or a write error, in which case I probably want to manually send back a heartbeat of some kind every few seconds?

It feels like I'm missing something if I'm having to do all this.

Shelah answered 21/3, 2019 at 18:55 Comment(2)
Facing the same problem, where a new request to endpoint is made after emitter::complete is called on the server-side. springBootVersion=2.0.5.RELEASEWomanish
I got something working, but I'm doing it by sending my own custom "stop" event and then closing it on the client side.Shelah
U
6

I have added the following to my Spring boot application to trigger the SSE connection close()

Server Side:

  1. Create a simple controller which returns SseEmitter.
  2. Wrap the backend logic in a single thread executor service.
  3. Send your events to the SseEmitter.
  4. On complete send an event of type complete via the SseEmitter.

    @RestController
    public class SearchController {
    
    @Autowired
    private SearchDelegate searchDelegate;
    
    @GetMapping(value = "/{customerId}/search")
    @ResponseStatus(HttpStatus.OK)
    @ApiOperation(value = "Search Sources", notes = "Search Sources")
    @ApiResponses(value = {
            @ApiResponse(code = 201, message = "OK"),
            @ApiResponse(code = 401, message = "Unauthorized")
    })
    @ResponseBody
    public SseEmitter search(@ApiParam(name = "searchCriteria", value = "searchCriteria", required = true) @ModelAttribute @Valid final SearchCriteriaDto searchCriteriaDto) throws Exception {
        return searchDelegate.route(searchCriteriaDto);
      }
    }
    
    
    
    @Service
    public class SearchDelegate {
    public static final String SEARCH_EVENT_NAME = "SEARCH";
    public static final String COMPLETE_EVENT_NAME = "COMPLETE";
    public static final String COMPLETE_EVENT_DATA = "{\"name\": \"COMPLETED_STREAM\"}";
    
    @Autowired
    private SearchService searchService;
    
    private ExecutorService executor = Executors.newCachedThreadPool();
    
    public SseEmitter route(SearchCriteriaDto searchCriteriaDto) throws Exception {
        SseEmitter emitter = new SseEmitter();
        executor.execute(() -> {
            try {
                if(!searchCriteriaDto.getCustomerSources().isEmpty()) {
                    searchCriteriaDto.getCustomerSources().forEach(customerSource -> {
                        try {
                            SearchResponse searchResponse = searchService.search(searchCriteriaDto);
                            emitter.send(SseEmitter.event()
                                    .id(customerSource.getSourceId())
                                    .name(SEARCH_EVENT_NAME)
                                    .data(searchResponse));
                        } catch (Exception e) {
                            log.error("Error while executing query for customer {} with source {}, Caused by {}",
                                    customerId, source.getType(), e.getMessage());
                        }
                    });
                }else {
                    log.debug("No available customerSources for the specified customer");
                }
                emitter.send(SseEmitter.event().
                        id(String.valueOf(System.currentTimeMillis()))
                        .name(COMPLETE_EVENT_NAME)
                        .data(COMPLETE_EVENT_DATA));
                emitter.complete();
            } catch (Exception ex) {
                emitter.completeWithError(ex);
            }
        });
        return emitter;
       }
    }
    

Client Side:

  1. Since we specified the name of event on our SseEmitter, an event will be dispatched on the browser to the listener for the specified event name; the website source code should use addEventListener() to listen for named events. (Notice: The onmessage handler is called if no event name is specified for a message)
  2. Call the EventSource on the COMPLETE event to release the client connection.

https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events

var sse = new EventSource('http://localhost:8080/federation/api/customers/5d96348feb061d13f46aa6ce/search?nativeQuery=true&queryString=*&size=10&customerSources=1,2,3&start=0');

sse.addEventListener("SEARCH", function(evt) {
   var data = JSON.parse(evt.data);
   console.log(data);
});

sse.addEventListener("COMPLETE", function(evt) {
   console.log(evt);
   sse.close();
});
Unmoor answered 21/11, 2019 at 18:4 Comment(3)
This is pretty much what I ended up doing.Shelah
I am still not sure about emitter.complete() though. I guess it just ends things on the server side? Because I think anything it transmits will encounter a closed sse on the client side.Shelah
I agree! Look at the doc docs.spring.io/spring/docs/current/javadoc-api/org/…Unmoor
Q
3

According to the HTML standard for Server-sent events

Clients will reconnect if the connection is closed; a client can be told to stop reconnecting using the HTTP 204 No Content response code.

So Spring's SseEmitter behaves as expected and the purpose of complete() is to make sure all the events were sent and then to close the connection.

You need to either implement server-side logic that would return 204 http code on subsequent requests (e.g. by checking session id) or to send a special event and close the connection from client side after receiving it as suggested by Ashraf Sarhan

Qualls answered 24/8, 2021 at 20:38 Comment(1)
Oh, ok a 204. That seems more natural to me than closing from the client side.Shelah
C
0

I'll continue sergei-kozelko answer.

You have to return

return new ResponseEntity(emitter, HttpStatus.NO_CONTENT);

Method signature should be:

public ResponseEntity<SseEmitter> route(SearchCriteriaDto searchCriteriaDto)
Clypeate answered 18/3, 2023 at 11:46 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.