socket.io client automatically disconnecting in long Node.js function
Asked Answered
S

3

6

I am using socket.io to communicate the swift client of my app with the server. Essentially, the client joins a socket connection upon opening the app and a job is instantly added to a Redis queue (it's a job that takes anywhere from a few seconds to like 15ish seconds). There's a response from the server to the client of the job id. While this job is processing, SOMETIMES the client will disconnect. There doesn't seem to be a rhyme or reason behind this, as the time of disconnection is totally inconsistent and it's also not like the disconnection is happening at a specific point in the function. I thought maybe I was manually disconnecting from the client side so I set up socket emissions right before each disconnect on the client side (when these emissions were emitted to the server, the server prints something that tells me where the disconnect came from). This showed me that the disconnect is automatic, because the emission is never received by the client before ending the socket connection. This is running on Heroku. Here's my code:

//queue initialization
const queue = new Queue('queue', process.env.REDIS_URL)

//client pings this endpoint to get the job id in the queue
app.post('/process', async function(request, response) {
  let job = await queue({request: request.body});
  console.log("Logging job as " + job.id)
  response.json({ id: job.id });
});

queue.process(10, async (job) => { //10 is the max workers per job
    console.log("Started processing")
    const client = await pool.connect()
    let item = job.data.request
    let title = item.title
    let subtitle = item.subtitle
    let id = item.id
    io.to(id).emit("Processing1", ""); //added emissions like these because I thought maybe the socket was timing out, but this didn't help
    console.log("Processing1");

    try {
      await client.query('BEGIN')
        let geoData = await //promise of geocoding endpoint api function
        let lengthOfGeoData = geoData.context.length
        io.to(id).emit("Processing2", "");
        console.log("Processing2");
        var municipality = ""
        var area = ""
        var locality = ""
        var place = ""
        var district = ""
        var region = ""
        var country = ""
        //for loop to go through geoData and set the above values
      if (municipality != "") {
        console.log("Signing in from " + municipality + ", " + area);
      } else {
        console.log("Signing in from " + area)
      }
      await scrape(municipality, area, id);
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log(err)
    }
    try {
      await client.query('BEGIN')
      const array = await //a function that queries a Postgres db for some rows, makes json objects out of them, and pushes to the 'array' variable
      var array2 = []
      for (a of array) {
        let difference = getDifference(title, subtitle, a.title, a.subtitle) //math function
        if (difference <= 10) {
          array.push(a)
        }
      }
      io.to(id).emit("Processing9", "");
      console.log("Processing9");
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log("ERROR: Failed arrayHelperFunction")
      console.log(err)
    } finally {
      client.release()
      console.log("About to emit this ish to " + id) //should emit to socket here ideally to notify that the processing is done and results can be polled
      io.to(id).emit("finishedLoading", "")
      return array2;
    }
});

//when the client polls the queue after it's received the 'done' notifier from the server
app.post('/poll', async function(request, response) {
  console.log("Polling")
  let id = request.body.id
  const results = await queue(id);
  for (r of results.returnvalue) {
    console.log("Sending " + r.title);
  }
  response.send(results.returnvalue)
});

//scrape
async function scrape(municipality, area, id) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN')
    var location = ""
    if (municipality != "") {
      location = municipality + ", " + area
    } else {
      location = area
    }
    let inDatabase = await client.query('SQL statement AS it_does_exist', [params]);
    io.to(id).emit("Processing3", "");
    console.log("Processing3");
    if (inDatabase.rows[0].it_does_exist == false) { 
      let query = "book clubs near " + location
      var terminationTime = new Date()
      terminationTime.setHours(terminationTime.getHours() + 4);
      let date = ("0" + terminationTime.getDate()).slice(-2);
      let month = ("0" + (terminationTime.getMonth() + 1)).slice(-2);
      let year = terminationTime.getFullYear();
      let hours = terminationTime.getHours();
      let minutes = terminationTime.getMinutes();
      let seconds = terminationTime.getSeconds();
      let timestamp = year + "-" + month + "-" + date + " " + hours + ":" + minutes + ":" + seconds

      try {
        await client.query(`SQL statement`, [params]);
      } catch(err) {
        console.log("FAILURE: scrape() at 1.")
        console.log(err)
      }

      var queryLocation = "New York,New York,United States" //default search origination is here
      var queryGLCode = "US"
      io.to(id).emit("Processing4", "");
      console.log("Processing4");
      try {
        await fetch('https://serpapi.com/locations.json?q='+municipality+'&limit=10', { method : "GET" })
          .then(res => res.json())
          .then((json) => {
            for (let index = 0; index < 10; index++) {
              let locationAPIName = json[index].canonical_name
              let locationAPICode = json[index].country_code
              let resultLatitude = json[index].gps[1];
              let resultLongitude = json[index].gps[0];
            }
          });
      } catch(err) {
        console.log("FAILURE: scrape() at 2.")
        console.log(err)
      }
      io.to(id).emit("Processing5", "");
      console.log("Processing5");
      try {
        await Promise.all([
          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode}).then(data => async function(){
            try {
              await client.query('BEGIN');
              let results = data.events_results
              if (results != null) {
                console.log("first HAD results")
                for (result of results) {
                  var fixedAddress = result.address[0]
                  let address = fixedAddress + ", " + result.address[1]
                      
                  let title = result.title + address

                  var description = result.description

                  let geoData = await geocode(address); //mapbox geocode the address
                  let latitude = Number(geoData.center[0]);
                  let longitude = Number(geoData.center[1]);
                  
                    await client.query(`SQL statement`, [params]);
                  
                }
                io.to(id).emit("Processing6", "");
                console.log("Processing6");
              } else {
                console.log("first DID NOT have results")
              }
              console.log("FIRST BLOCK")
              await client.query('COMMIT');
            } catch(err) {
              console.log("Results[0] not found.")
              console.log(err)
              await client.query('ROLLBACK');
            }
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "10"}).then(data => async function(){
            // same as the one above, just with an offset
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "20"}).then(data => async function(){
            // same as the one above, but with a different offset
          }())
        ])
      } catch(err) {
        console.log("FAILURE: scrape() at 3.")
        console.log(err)
      }

    } else {
      console.log("Location already in the database.")
    }
    await client.query('COMMIT')
  } catch(err) {
    await client.query('ROLLBACK')
    console.log(err)
  } finally {
    client.release()
    return "Resolved";
  }
}

//Client establish socket connection
func establishConnection(_ completion: (() -> Void)? = nil) {
    let socketUrlString: String = appState.server
    self.manager = SocketManager(socketURL: URL(string: socketUrlString)!, config: [.log(false), .reconnects(true), .extraHeaders(["header": "customheader"])])
    self.socket = manager?.defaultSocket
    self.socket?.connect()
    self.socket?.once(clientEvent: .connect, callback: { (data, emitter) in
        if completion != nil{
            completion!()
        }
    })
  //other socket functions
}

//Client initial post request
func process() {
    let server = "serverstring" + "process"
    let title = "title"
    let subtitle = "subtitle"
    let package = BookPackage(title: title, subtitle: subtitle, id: mySocketID) //this is after the initial connection
    print("package is \(package)")
            
    guard let url  = URL(string: server) else { return }

    var urlRequest = URLRequest(url: url)
    
    urlRequest.addValue("application/json", forHTTPHeaderField: "Content-Type")
    urlRequest.addValue("application/json", forHTTPHeaderField: "Accept")
    
    urlRequest.httpMethod = "POST"
    
    guard let data = try? JSONEncoder().encode(package) else { return }
            
    urlRequest.httpBody = data

    let task = URLSession.shared.dataTask(with: urlRequest) {
        (data, response, error) in
        if let error = error {
            print(error)
            return
        }
        guard let data = data else { return }
        guard let dataString = String(data: data, encoding: String.Encoding.utf8) else { return }
        let jsonData = Data(dataString.utf8)
        var decodedJob: Job? = nil
        do {
            decodedJob = try JSONDecoder().decode(Job.self, from: jsonData) //Job is just a struct in the same form as the json object sent back from the server
        } catch {
            print(error.localizedDescription)
        }
        DispatchQueue.main.async {
            self.appState.pendingJob = decodedJob
        }
    }
    // start the task
    task.resume()
}

The only consistent part of this bug is the logs right before the user disconnects (side note: 'reason of disconnect' and 'DISCONNECTED USER' fire on the socket.on('disconnect') event:

https://i.sstatic.net/7fjuU.png

https://i.sstatic.net/z5bmL.png

https://i.sstatic.net/aHNt3.png

https://i.sstatic.net/64WYI.png

Schmid answered 25/9, 2021 at 3:16 Comment(28)
Is the long function blocking the event queue?Fermium
Can you show how the client makes the request that starts all this off? Is it from Javascript or a browser form post?Fermium
@Fermium By 'long function' do you mean the scrap function? I know that function takes a long time to complete because of the serpapi calls, which is why I move it to the background processing queue. Also, I added the requested code, along with the socket connection code from the client side (this is the process() and establishConnection() functions, respectively).Schmid
You're the one who used the term "long function" in the title of your question., Please don't ask me what it is. I'm asking you.Fermium
Please provide a link to the module that provides the Queue class.Fermium
So, the client code runs on an iPhone (since that looks like Swift code)? Perhaps there are power management issues asserted by the iPhone that cause it to drop an inactive socket.io connection? This looks like it's probably a client-side issue. I'd suggest searching for iOS problems with socket.io connections dropping and see if any of the zillions of hits you find looks like your circumstance.Fermium
@Fermium Right, sorry, the long function is the 'scrape' function. The redis queue is npmjs.com/package/bull and yes, the client code is Swift. So does the server side code looks good to you?Schmid
@Fermium I was thinking your answer from 3 years ago here might be related? On my Heroku metrics though, I can see the CPU isn't approaching maximum for the current amount of dynos. Should I outsource the scraping function to a raspberry pi I have at my house? Could this also free up CPU (assuming that's the problem)? #51452437Schmid
The code you show for scrape() has all sorts of await statements in it that are awaiting actual asynchronous operations. That means it won't block for very long and other events (like socket.io ping events) will be able to be processed so I don't think that's an issue. With proper socket.io logging on client and server, you should be able to find out why it is disconnecting. See socket.io/docs/v3/logging-and-debugging for info on socket.io logging.Fermium
@Fermium Okay, I've never done debugging for socket.io before and the documentation isn't that great IMO (mainly some of the things don't work, such as setting environment variable 'DEBUG=*'...Using the 'debug' module, what are the kinds of lines I should be wrapping in debug()--or does setting the environment variable automatically print out all the debug lines w/o me needing to write any code? I set the DEBUG environment variable of my Heroku app to DEBUG=socket.io (this was one of the only ones that worked).Schmid
Setting the right environment variables will automatically enable debugging on the server. For the client, it appears you have to set a localStorage value. The idea is that the debugging code is already in socket.io, you just have to enable it with the right settings. You will probably have to restart the server with the right settings for anything to take effect.Fermium
Let us continue this discussion in chat.Schmid
What's the exact error message, and in what part of the scrape function does this disconnect occur?Stagg
@GandalfTheGrey There is no error message, that's the thing. I just get the debug line '2021-09-27T01:28:19.686Z socket.io:client client close with reason transport close'--after doing some research this doesn't seem to provide any additional information.Schmid
What version of socket.io are you using for both your frontend and backend? If they're different versions, that could be causing issues. Also check your version of Node.Stagg
@GandalfTheGrey Using 4.2.0 for the backend Node.js server and (github.com/socketio/socket.io-client-swift) for the front end swift version. Node version is 14.16.0Schmid
How is your server configuration? Are you reverse proxying your NodeJS server with NGINX? Or is it just Heroku > NodeJS API? You probably already tried this, but try increased the pingtimeout on your server. Perhaps Heroku is buffering the pong response of your client and you don't reach the server timeout threshold and it kills the socket connection. Maybe your job process is stalling and blocking the event loop so the server doesn't send the ping to your client within your clients timeout threshold , causing the client to close the connection.Stagg
I'd debug the way @Fermium said. And if scrape is somehow blocking your Node process, don't offset it to your PI. Are you listening for stalled events? If the socket disconnects occur when the scrape occurs, and you're saying the job process takes couple seconds to 15 seconds, maybe the process is somehow blocking the event loop and bypassing the timeout threshold. Check your timeout settings, and try and log the time, and if the socket disconnects occur when the time it takes the process to finish is past a couple seconds, setting the timeout higher could solve it, but isn't the best fix.Stagg
Server config is just Heroku > node.js--I actually haven't increased the ping timeout. When you say to increase it, do you mean the socket.io ping timeout? There doesn't seem to be any consistency in the amount of time it takes to disconnect (socket.connect event to socket.disconnect event). I'm not sure what listening for stalled events is or means but I'll do some research into it. And by timeout settings, are you again referring to socket connection timeout? Also, I said up the debug, just waiting on a user to encounter this error. I'll report back any new findings. ThanksSchmid
Yes, the socket.io ping timeout. Perhaps the ping/pongs are getting throttled by Heroku through its API gateway/load balancing. It could be the ping from the client to the server gets swallowed up by Heroku, so the server's pong response passes the client's socket pingtimeout and the client closes the connection, OR it could be the opposite. Network loadbalancing settings could also be a culprit, try and check/modify your idle timeout settings to ensure the load balancing isn't closing tcp connections.Stagg
Heroku by default uses dynos, containing your NodeJS process/instance, on the same EC2 instance. This can be causing your connection drops through two means: the load balancing sends a client request to the wrong dyno/node instance that didn't establish the connection so the connection drops OR the load balancing idle timeout settings are really low and throttles the ping/pongs and causes it to bypass the timeout threshold, causing the disconnects. If you aren't already, you can make sure the socket.io rooms are shared between instances with redis, and if you can check the dyno settings.Stagg
It could also just be the client device/browser that's throttling the ping requests somehow, someway like @Fermium mentioned. Also just to be sure the client and server completes the websocket handshake upgrade without dropping, you should enable stick sessions. Also if redis bull is handling the socket connection across your instances, you should be fine. Just increase the socket.io ping timeout on client + server as well as the idle timeout settings on your network load balancer(if you can do that in Heroku).Stagg
I think I would've seen the H12,H15, or H25 errors as described here (devcenter.heroku.com/articles/http-routing#timeouts) if the problem was server timeout settings, no? I'm operating on hobby tier so I think that might mean I just have one dyno allocated to my app. I enabled sticky sessions like you suggested. I'll increase the socket.io ping timeout--I believe default is 20 seconds so I'll try like 40. @GandalfTheGreySchmid
devcenter.heroku.com/articles/http-routing#request-distribution Also they do request and response buffering, so increasing ping timeouts can resolve the issue if this is the case: devcenter.heroku.com/articles/http-routing#request-bufferingStagg
Heroku contains a lot of abstractions over AWS. You can't really know for sure exactly what's going on, but you can get a rough approximation. Websocket libraries are the same but are a bit different in the sense that you can check what's going on under the hood, but are you really going to know all the implications of those abstractions? Socket.io + Heroku means you really have to know how each behave to avoid issues like this. Atleast with raw websockets + AWS, you're in control of what's going on aside from the baseline protocols you have to conform to.Stagg
It's not the http routing that's the issue here, I think. It's how the tcp connection behaves with heroku's load balancer that could be the issue. Although, you should still enable sticky sessions just in case for the http handshake. No, even with the free version, the instance you spin is a vm that uses multiple dynos hosting your node process. These abstractions require buffering and increased time for network packets to traverse. Also their load balancer does round robin handling of requests, which could be the issue here and increasing pingtimeouts won't solve.Stagg
You can find out how many dynos are running by typing commands in the heroku shell. Maybe try getting a connection to drop before running this in the heroku shell: heroku ps -- This will tell you how many dynos and what type are running If you want to specify how many dynos of each type you want running on your instance, you can type this heroku ps:scale web=1 queue=2 --This way you know there's only one dyno with one active instance of your nodejs server to handle network requests, ensuring no connections are dropped due to the load balancer serving requests to the wrong dyno.Stagg
Let us continue this discussion in chat.Schmid
B
2

The solution to your problem is to modify the pingTimeout when initiating the server.

From Socket.io:

The server sends a ping, and if the client does not answer with a pong within pingTimeout ms, the server considers that the connection is closed.

Similarly, if the client does not receive a ping from the server within pingInterval + pingTimeout ms, the client also considers that the connection is closed.

const io = new Server(httpServer, {
  pingTimeout: 30000
});
Burglar answered 30/9, 2021 at 8:25 Comment(1)
Hi, unfortunately I've tried this solution and it doesn't work (tried setting pingTimeout to 50000 milliseconds). Running with DEBUG on, I can see that the reason for the disconnection is 'client close with reason transport close', if this was the problem the reason would be 'ping timeout'.Schmid
T
2

You should be blocking the event loop with await. There is a heartbeat that the client sends every once in a while (which is defined with pingTimeout).

Since no ping is received by the server, it is disconnected.

You should isolate this process. Either find a way to use it with a worker/background process or async, additionally increasing pingTimeout on serverside might help you.

Transceiver answered 3/10, 2021 at 6:2 Comment(2)
I believe you have the most correct answer so far (which is why I gave you the bounty), but for me this is not the solution.Schmid
@Schmid I see, well thank you. But how did it go? Can you start a timer when code execution started? You can use console.time(label); console.timeEnd(); Also, you can console output each time heartbeat is being sent.Transceiver
N
1

You can change the transport from the default to:

 const io = new Server(httpServer, {
  transports: ['polling', 'websocket'],
});

This might resolve the issue, else you can also try canging the upgradeTimeout and pingTimeout

Nephralgia answered 3/2, 2022 at 5:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.