How to use server-sent-events in express.js
Asked Answered
D

6

23

I setup my REST server with express.js. Now I want to add sse to this server. After I implemented this sse package, I get an error. I know that I get this error, when would try to use res.send twice, but I am not.

ERROR: Error: Can't set headers after they are sent.                                            
    at ServerResponse.OutgoingMessage.setHeader (http.js:690:11)                            
    at ServerResponse.header (/home/root/node_modules/express/lib/response.js:718:10)         
    at ServerResponse.send (/home/root/node_modules/express/lib/response.js:163:12)            
    at app.get.str (/home/root/.node_app_slot/main.js:1330:25)                                 
    at Layer.handle [as handle_request] (/home/root/node_modules/express/lib/router/layer.js:95:5)    
    at next (/home/root/node_modules/express/lib/router/route.js:131:13)                       
    at sse (/home/root/node_modules/server-sent-events/index.js:35:2)                          
    at Layer.handle [as handle_request] (/home/root/node_modules/express/lib/router/layer.js:95:5)  
    at next (/home/root/node_modules/express/lib/router/route.js:131:13)                          
    at Route.dispatch (/home/root/node_modules/express/lib/router/route.js:112:3)

Is it possible that I can't use the express methods anymore within the sse function? For example:

app.get('/events', sse, function(req, res) {
    res.send('...');
});

Furthermore, I found this solution and this. Is it possible to make sse with the res.write function or in another way without using another package?

Donitadonjon answered 7/1, 2016 at 14:9 Comment(0)
A
8

You can definitely achieve this without other packages.

I wrote a blog post about this, part 1 sets out the basics.

You mustn't close the SSE as that breaks the functionality. The whole point is that it is an open HTTP connection. This allows for new events to be pushed to the client at any point.

Avar answered 11/1, 2016 at 7:27 Comment(0)
B
64

I disagree with using Socket.IO to implement basic Server-Sent Events. The browser API is dead simple and the implementation in Express requires only a couple of changes from a normal response route:

app.get('/streaming', (req, res) => {

    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Access-Control-Allow-Origin', '*');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // flush the headers to establish SSE with client

    let counter = 0;
    let interValID = setInterval(() => {
        counter++;
        if (counter >= 10) {
            clearInterval(interValID);
            res.end(); // terminates SSE session
            return;
        }
        res.write(`data: ${JSON.stringify({num: counter})}\n\n`); // res.write() instead of res.send()
    }, 1000);

    // If client closes connection, stop sending events
    res.on('close', () => {
        console.log('client dropped me');
        clearInterval(interValID);
        res.end();
    });
});
  • Set the appropriate headers as per the spec
  • Use res.flushHeaders() to establish SSE connection
  • Use res.write() instead of res.send() to send data
  • To end stream from the server, use res.end()

The snippet above uses setInterval() to simulate sending data to the client for 10 seconds, then it ends the connection. The client will receive an error for the lost connection and automatically try to re-establish the connection. To avoid this, you can close the client on error, or have the browser send a specific event message that the client understands means to close gracefully. If the client closes the connection, we can catch the 'close' event to gracefully end the connection on the server and stop sending events.

express: 4.17.1 node: 10.16.3

Bequeath answered 25/11, 2019 at 23:42 Comment(8)
Great answer. My only suggestion is that you're usually supposed to also add a Connection: keep-alive header (masteringjs.io/tutorials/express/server-sent-events). Definitely don't use socket.io, because that uses websockets, which is NOT the same thing as server-sent events. SSE is over HTTP, websockets are a distinct protocol.Favoritism
Heroku is unable to detect req.on('close') events though, they result in a H27 error, read more here. Any solution to this issue?Cipolin
@Cipolin Did you mean res.on('close')? The node docs don't indicate the req emits a 'close' eventIa
In my case, browser did not recieve the streamed data after res.write(...). I had to explicitely call res.end() after res.write(...) to recieve data.Footstalk
I want to keep that route open until the user disconnects. Is there any time limit that we need to set? Similar to PHP max execution time? Will "keep-alive" handle that thing?Cothurnus
what's the significance of the \n\n? When I just used one of them, it didn't work.Quadrat
It is part of the event stream format standard to separate messages (developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/…)Bequeath
Very good answer, also note that if you are using a compression middleware (like the "compression" package) it may prevent your messages to be sent when calling res.write(data) because it wants to compress before sending so it needs more data, a workaround is to call res.write(data) followed by a res.flush()Southland
A
8

You can definitely achieve this without other packages.

I wrote a blog post about this, part 1 sets out the basics.

You mustn't close the SSE as that breaks the functionality. The whole point is that it is an open HTTP connection. This allows for new events to be pushed to the client at any point.

Avar answered 11/1, 2016 at 7:27 Comment(0)
G
6

This adds a complete, runnable example to John's excellent answer and makes a tweak, adding the Connection: keep-alive header. Also included is a client to read the stream and handle the possibility of multiple chunks arriving at once, which seems to be a characteristic of fetch.

JSON isn't strictly necessary but is useful to separate the data payload from the SSE metadata.

server.js:

const express = require("express");
const app = express();

app.use(express.static("public"));

app.get("/stream", (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });

  let counter = 0;
  const interval = setInterval(() => {
    const chunk = JSON.stringify({chunk: counter++});
    res.write(`data: ${chunk}\n\n`);
  }, 100);

  res.on("close", () => {
    clearInterval(interval);
    res.end();
  });
});

const listener = app.listen(process.env.PORT || 3000, () =>
  console.log(`Your app is listening on port ${listener.address().port}`)
);

public/index.html:

<!DOCTYPE html>
<html lang="en">
<head><title>SSE POC</title></head>
<body>
<script>
(async () => {
  const response = await fetch("/stream", {
    headers: {
      "Accept": "text/event-stream",
    },
  });

  if (!response.ok) {
    throw Error(response.statusText());
  }

  for (const reader = response.body.getReader(); ; ) {
    const {value, done} = await reader.read();

    if (done) {
      break;
    }

    const chunk = new TextDecoder().decode(value);
    const subChunks = chunk.split(/(?<=})\n\ndata: (?={)/);

    for (const subChunk of subChunks) {
      const payload = subChunk.replace(/^data: /, "");
      document.body.innerText = JSON.parse(payload).chunk;
    }
  }
})();
</script>
</body>
</html>

After node server.js, navigate your browser to localhost:3000. You can also test the stream directly with curl localhost:3000/stream.

I won't repeat the notes from John's answer, but, in short we set the necessary headers and flush them to begin the connection, then use res.write to send a chunk of data. Call res.end() to terminate the connection on the server or listen for res.on("close", ...) for the client closing the connection.

The client uses fetch and response.body.getReader() which can be read with const {value, done} = await reader.read() and decoded with TextDecoder().decode(value).

See also https://masteringjs.io/tutorials/express/server-sent-events

Express 4.18.2, Node 18.16.0, Chrome Version 114.0.5735.110 (Official Build) (64-bit)

Grindery answered 20/4, 2021 at 19:1 Comment(0)
C
2

Self-promotion: I wrote the ExpreSSE package that provides middlewares for working with SSE in express, you can find it on npm: @toverux/expresse.

A simple example:

router.get('/events', sse(/* options */), (req, res) => {
    let messageId = parseInt(req.header('Last-Event-ID'), 10) || 0;

    someModule.on('someEvent', (event) => {
        //=> Data messages (no event name, but defaults to 'message' in the browser).
        res.sse.data(event);
        //=> Named event + data (data is mandatory)
        res.sse.event('someEvent', event);
        //=> Comment, not interpreted by EventSource on the browser - useful for debugging/self-documenting purposes.
        res.sse.comment('debug: someModule emitted someEvent!');
        //=> In data() and event() you can also pass an ID - useful for replay with Last-Event-ID header.
        res.sse.data(event, (messageId++).toString());
    });
});

There is also another middleware to push the same events to multiple clients.

Comedy answered 29/5, 2017 at 15:49 Comment(0)
B
1

It appears from the documentation on the library you're using that you should use a res.sse when using that as middleware on a function. See: https://www.npmjs.com/package/server-sent-events

But, all this is actually doing from their code is wrapping res.write as you mentioned. See: https://github.com/zacbarton/node-server-sent-events/blob/master/index.js#L11

Buchan answered 7/1, 2016 at 14:16 Comment(0)
D
0

New Answer:

Just use socket.io, it's so much easier and better! https://www.npmjs.com/package/socket.io#in-conjunction-with-express

basic setup:

const express = require('express');
const PORT = process.env.PORT || 5000;
const app = express();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
// listen to socket connections
io.on('connection', function(socket){
  // get that socket and listen to events
  socket.on('chat message', function(msg){
    // emit data from the server
    io.emit('chat message', msg);
  });
});
// Tip: add the `io` reference to the request object through a middleware like so:
app.use(function(request, response, next){
  request.io = io;
  next();
});
server.listen(PORT);
console.log(`Listening on port ${PORT}...`);

and in any route handler, you can use socket.io:

app.post('/post/:post_id/like/:user_id', function likePost(request, response) {
  //...
  request.io.emit('action', 'user liked your post');
})

client side:

<script src="/socket.io/socket.io.js"></script>
<script src="https://code.jquery.com/jquery-1.11.1.js"></script>
<script>
  $(function () {
    var socket = io();
    $('form').submit(function(e){
      e.preventDefault(); // prevents page reloading
      socket.emit('chat message', $('#m').val());
      $('#m').val('');
      return false;
    });
    socket.on('chat message', function(msg){
      $('#messages').append($('<li>').text(msg));
    });
  });
</script>

full example: https://socket.io/get-started/chat/

Original Answer:

Someone (user: https://stackoverflow.com/users/451634/benny-neugebauer | from this article: addEventListener on custom object) literally gave me a hint on how to implement this without any other package except express! I have it working!

First, import Node's EventEmitter:

const EventEmitter = require('events');

Then create an instance:

const Stream = new EventEmitter();

Then create a GET route for event streaming:

app.get('/stream', function(request, response){
  response.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });

  Stream.on("push", function(event, data) {
    response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
  });
});

In this GET route, you are writing back that the request is 200 OK, content-type is text/event-stream, no cache, and to keep-alive.

You are also going to call the .on method of your EventEmitter instance, which takes 2 parameters: a string of the event to listen for and a function to handle that event(that function can take as much params as it is given)

Now.... all you have to do to send a server event is to call the .emit method of your EventEmitter instance:

Stream.emit("push", "test", { msg: "admit one" });

The first parameter is a string of the event you want to trigger (make sure that it is the same as the one in the GET route). Every subsequent parameter to the .emit method will be passed to the listener's callback!

That is it!

Since your instance was defined in a scope above your route definitions, you can call the .emit method from any other route:

app.get('/', function(request, response){
  Stream.emit("push", "test", { msg: "admit one" });
  response.render("welcome.html", {});
});

Thanks to how JavaScript scoping works, you can even pass that EventEmitter instance around to other function, even from other modules:

const someModule = require('./someModule');

app.get('/', function(request, response){
  someModule.someMethod(request, Stream)
  .then(obj => { return response.json({}) });
});

In someModule:

function someMethod(request, Stream) { 
  return new Promise((resolve, reject) => { 
    Stream.emit("push", "test", { data: 'some data' });
    return resolve();
  }) 
}

That easy! No other package needed!

Here is a link to Node's EventEmitter Class: https://nodejs.org/api/events.html#events_class_eventemitter

My example:

const EventEmitter = require('events');
const express = require('express');
const app = express();

const Stream = new EventEmitter(); // my event emitter instance

app.get('/stream', function(request, response){
  response.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });

  Stream.on("push", function(event, data) {
    response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
  });
});

setInterval(function(){
  Stream.emit("push", "test", { msg: "admit one" });
}, 10000)
Delanadelancey answered 5/3, 2018 at 2:25 Comment(13)
Your example need to end the current response after each "push" event in order to be fully functional. (response.end() is missing in the push event listener)Telstar
Also, in the client browser, make sure the event handler matches the event name. In this example code above, you need to addEventListener named "test". If you want to use "onmessage", the event name needs to be "message" instead of "test"...Holter
@Nicolas just tried it and it didn't work. i think that defeats the purpose of SSE. there needs to be an open connection so the server can push the events. calling end will close the connection.Delanadelancey
@Delanadelancey Yes you're right, since it's a stream never ended (in theory) between the server and the client. Got some disconnect issues too due to a load balancer proxy in the middle, so i use a simple ping at regular interval to avoid that behavior. `Telstar
@Nicolas i did see the same thing too. this was before i learned about socket.io npm package. that is what i use now!Delanadelancey
Yes i used socket too before but wanna use SSR for a specific one shot project for a donation event, i think it all depend on your project needs. If you're interessed in such technologies you can also give a try to a Node js framework like Meteor meteor.comTelstar
The "Original Answer" worked for me and was more inline with my use case. Ty tyStraightforward
@Delanadelancey This does not authenticate the client's websocket connection though. Anyone can connect to the websocket url since it uses the wss protocol rather than http. The advantage of server sent events is that since it uses http, you can use your already built http authentication mechanisms to authenticate the client and push events.Cipolin
@Cipolin see this - socket.io/docs/v3/server-api/#socket-handshakeDelanadelancey
This is not easier and require a client side library that may be on the heavy side depending on what you're trying to do.Gessner
Not sure how it would be heavy on the client; no extra library is needed to listed to SSE, just create an event source - developer.mozilla.org/en-US/docs/Web/API/EventSource @GessnerDelanadelancey
I am able to capture the events and broadcast it to an endpoint. But my only concern is that whether every client listens to the endpoint will get the same events or it is a different event stream opened for each of the client ? I am using it part of an account provision request so I only need to send the relevant events for the specific account.Absorptance
@Happy Coder there are many ways of going about it. For one, you can make this an authentication protected route where the user has to have some authentication. From there, you could create a map of all requests/stream responses by user id in some service class. That way, you could write to all the response event streams for a particular user by their id.Delanadelancey

© 2022 - 2024 — McMap. All rights reserved.