Scaling Socket.IO to multiple Node.js processes using cluster
Asked Answered
U

4

72

Tearing my hair out with this one... has anyone managed to scale Socket.IO to multiple "worker" processes spawned by Node.js's cluster module?

Lets say I have the following on four worker processes (pseudo):

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {

  socket.on('join', function(rooms) {
    rooms.forEach(function(room) {
      socket.join(room);
    });
  });

  socket.on('leave', function(rooms) {
    rooms.forEach(function(room) {
      socket.leave(room);
    });
  });

});

// Emit a message every second
function send() {
  io.sockets.in('room').emit('data', 'howdy');
}

setInterval(send, 1000);

And on the browser...

// on the client
socket = io.connect();
socket.emit('join', ['room']);

socket.on('data', function(data){
  console.log(data);
});

The problem: Every second, I'm receiving four messages, due to four separate worker processes sending the messages.

How do I ensure the message is only sent once?

Uranometry answered 19/8, 2013 at 9:35 Comment(3)
Which version of socket.io are you using? Socket.IO 0.6 is designed as a single process server. See 3rdEden's answer in this stackoverflow post. https://mcmap.net/q/276091/-how-can-i-scale-socket-ioDahle
0.9.16 using RedisStoreUranometry
You can use SocketCluster (interface of socket is compatible with Socket.io): github.com/topcloud/socketclusterHals
M
114

Edit: In Socket.IO 1.0+, rather than setting a store with multiple Redis clients, a simpler Redis adapter module can now be used.

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

The example shown below would look more like this:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));
  io.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

If you have a master node that needs to publish to other Socket.IO processes, but doesn't accept socket connections itself, use socket.io-emitter instead of socket.io-redis.

If you are having trouble scaling, run your Node applications with DEBUG=*. Socket.IO now implements debug which will also print out Redis adapter debug messages. Example output:

socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms
socket.io:server attaching client serving req handler +2ms
socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms
socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms
socket.io-redis ignore same uid +0ms

If both your master and child processes both display the same parser messages, then your application is properly scaling.


There shouldn't be a problem with your setup if you are emitting from a single worker. What you're doing is emitting from all four workers, and due to Redis publish/subscribe, the messages aren't duplicated, but written four times, as you asked the application to do. Here's a simple diagram of what Redis does:

Client  <--  Worker 1 emit -->  Redis
Client  <--  Worker 2  <----------|
Client  <--  Worker 3  <----------|
Client  <--  Worker 4  <----------|

As you can see, when you emit from a worker, it will publish the emit to Redis, and it will be mirrored from other workers, which have subscribed to the Redis database. This also means you can use multiple socket servers connected the the same instance, and an emit on one server will be fired on all connected servers.

With cluster, when a client connects, it will connect to one of your four workers, not all four. That also means anything you emit from that worker will only be shown once to the client. So yes, the application is scaling, but the way you're doing it, you're emitting from all four workers, and the Redis database is making it as if you were calling it four times on a single worker. If a client actually connected to all four of your socket instances, they'd be receiving sixteen messages a second, not four.

The type of socket handling depends on the type of application you're going to have. If you're going to handle clients individually, then you should have no problem, because the connection event will only fire for one worker per one client. If you need a global "heartbeat", then you could have a socket handler in your master process. Since workers die when the master process dies, you should offset the connection load off of the master process, and let the children handle connections. Here's an example:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.sockets.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  io.sockets.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

In the example, there are five Socket.IO instances, one being the master, and four being the children. The master server never calls listen() so there is no connection overhead on that process. However, if you call an emit on the master process, it will be published to Redis, and the four worker processes will perform the emit on their clients. This offsets connection load to workers, and if a worker were to die, your main application logic would be untouched in the master.

Note that with Redis, all emits, even in a namespace or room will be processed by other worker processes as if you triggered the emit from that process. In other words, if you have two Socket.IO instances with one Redis instance, calling emit() on a socket in the first worker will send the data to its clients, while worker two will do the same as if you called the emit from that worker.

Moten answered 6/9, 2013 at 4:57 Comment(10)
Good answer. Thanks! worked to some extent. When i emit io.sockets.emit('userstreamssock', postid); from master, I'm not getting it in the workers. Not sure why.Eating
Just for info: It doesn't work anymore with socket.io > 1.0. The redis adapter has to be used. socket.io/docs/using-multiple-nodes I haven't succeeded yet to get an example running with cluster and socket.io 1.1.0.Marc
@Marc Me neither.Running socket.io 1.3.5, I haven't found anything that works. Adding Sticky Session,changing HAProxy config... none of that gets socket to work with cluster.Strychnine
I have added an example for Socket.IO 1.0+ and have tested it on 1.3.5. Do note that for the master node, socket.io-emitter should be used, since it is a non-listening process, but I omitted it to make the answer simpler.Moten
do you really need express in every child proces? or a server for that matter? Can you also create a socketio instance by just giving it a port to listen to?Quickstep
im getting an error on the frontend... socket.io.min.js:2 GET http://localhost:3000/socket.io/?EIO=3&transport=polling&t=LYqSrsK 404 (Not Found)Edaedacious
Isn't this have to be server.listen(80) ?Misgovern
Hi, I've set the app.listen port to 8080 but socket.io doesn't listen there for no reasonRadcliff
@Moten , You are using io object in same file, I try to use in different file with require concept, But io is passing four times as differentWordless
can you update this for 2022 stuffAmagasaki
H
2

Let the master handle your heartbeat (example below) or start multiple processes on different ports internally and load balance them with nginx (which supports also websockets from V1.3 upwards).

Cluster with Master

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {
    socket.on('join', function(rooms) {
        rooms.forEach(function(room) {
            socket.join(room);
        });
    });

    socket.on('leave', function(rooms) {
        rooms.forEach(function(room) {
            socket.leave(room);
        });
    });

});

if (cluster.isMaster) {
    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    // Emit a message every second
    function send() {
        console.log('howdy');
        io.sockets.in('room').emit('data', 'howdy');
    }

    setInterval(send, 1000);


    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    }); 
}
Henshaw answered 1/9, 2013 at 16:41 Comment(3)
Not a bad suggestion, but then it's still just one master process in charge of potentially 500,000 websocket connections... doesn't really tackle the 'scalability' issue across multiple servers/processes per serverUranometry
How about this: Use 2 layers of load balancers. AWS example: The first layer distributes the workload between multiple machines with an elastic load balancer. The second layer distributes the workload between multiple instances on the machine. You could run cpu.count node instances and distribute workload to them via nginx or use the node cluster (in this case no need for nginx). I'd prefer the nginx version. For automatic scaling use OpsWork and let it handle your scaling based on cpu load. It will add and remove machines automatically and is quite easy to setup.Henshaw
when i am using var socket = require('socket.io')(1338); i am getting this error Error: listen EADDRINUSE :::1338 how to implement on same !Dekker
C
1

This actually looks like Socket.IO succeeding at scaling. You would expect a message from one server to go to all sockets in that room, regardless of which server they happen to be connected to.

Your best bet is to have one master process that sends a message each second. You can do this by only running it if cluster.isMaster, for example.

Channel answered 1/9, 2013 at 2:15 Comment(6)
It's succeeding at 'sharing' the sockets, but not succeeding at figuring out which messages not to duplicate. Cluster is a great idea, but then it's not really 'scaling'... it's one process managing the work of 4Uranometry
@Lee What logic do you expect it to use for deciding whether to "duplicate" messages? When you send a message to a room, it goes to everyone in the room - this is the expected behavior. You could have a room for each process if you want each one to send messages on an interval.Channel
I guess better logic would be for socket.emit to somehow be in sync across processes. Not sure how to achieve that. The 'one room per process' approach doesn't solve scalability when it's 10 different servers with 4 cores each... but it could be a good idea when there's just one server involved.Uranometry
@Lee The way Socket.IO is usually used is that some event that happens on one server (an http request, for example) triggers a message to a room. You would expect this message to go to everyone in the room, not just people who happen to be connected to the same server. "one process managing the work of 4" - I'm not sure what your actual logic is, but sending a message every second is not going to be taxing.Channel
my aim was really to figure out how to do exactly that, but at scale. Right now it's not taxing at all for, say, 10,000 clients... but what when it's a million? The app I'm building has a ton of web socket connections for quite a high-demand stats application, and the API could easily hit 10 million+ socket transactions/day in short order. I just want to be ready to scale this out as necessary - still not sure how to do that beyond a 1 server, 1 process model.Uranometry
@Lee Without more information on what you're actually doing, its going to be difficult to help you. I assume your server doesn't just ping all of the clients every second?Channel
B
0

Inter-process communication is not enough to make socket.io 1.4.5 working with cluster. Forcing websocket mode is also a must. See WebSocket handshake in Node.JS, Socket.IO and Clusters not working

Ballarat answered 9/4, 2016 at 4:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.