I am using socket.io to communicate the swift client of my app with the server. Essentially, the client joins a socket connection upon opening the app and a job is instantly added to a Redis queue (it's a job that takes anywhere from a few seconds to like 15ish seconds). There's a response from the server to the client of the job id. While this job is processing, SOMETIMES the client will disconnect. There doesn't seem to be a rhyme or reason behind this, as the time of disconnection is totally inconsistent and it's also not like the disconnection is happening at a specific point in the function. I thought maybe I was manually disconnecting from the client side so I set up socket emissions right before each disconnect on the client side (when these emissions were emitted to the server, the server prints something that tells me where the disconnect came from). This showed me that the disconnect is automatic, because the emission is never received by the client before ending the socket connection. This is running on Heroku. Here's my code:
//queue initialization
const queue = new Queue('queue', process.env.REDIS_URL)
//client pings this endpoint to get the job id in the queue
app.post('/process', async function(request, response) {
let job = await queue({request: request.body});
console.log("Logging job as " + job.id)
response.json({ id: job.id });
});
queue.process(10, async (job) => { //10 is the max workers per job
console.log("Started processing")
const client = await pool.connect()
let item = job.data.request
let title = item.title
let subtitle = item.subtitle
let id = item.id
io.to(id).emit("Processing1", ""); //added emissions like these because I thought maybe the socket was timing out, but this didn't help
console.log("Processing1");
try {
await client.query('BEGIN')
let geoData = await //promise of geocoding endpoint api function
let lengthOfGeoData = geoData.context.length
io.to(id).emit("Processing2", "");
console.log("Processing2");
var municipality = ""
var area = ""
var locality = ""
var place = ""
var district = ""
var region = ""
var country = ""
//for loop to go through geoData and set the above values
if (municipality != "") {
console.log("Signing in from " + municipality + ", " + area);
} else {
console.log("Signing in from " + area)
}
await scrape(municipality, area, id);
await client.query('COMMIT')
} catch(err) {
await client.query('ROLLBACK')
console.log(err)
}
try {
await client.query('BEGIN')
const array = await //a function that queries a Postgres db for some rows, makes json objects out of them, and pushes to the 'array' variable
var array2 = []
for (a of array) {
let difference = getDifference(title, subtitle, a.title, a.subtitle) //math function
if (difference <= 10) {
array.push(a)
}
}
io.to(id).emit("Processing9", "");
console.log("Processing9");
await client.query('COMMIT')
} catch(err) {
await client.query('ROLLBACK')
console.log("ERROR: Failed arrayHelperFunction")
console.log(err)
} finally {
client.release()
console.log("About to emit this ish to " + id) //should emit to socket here ideally to notify that the processing is done and results can be polled
io.to(id).emit("finishedLoading", "")
return array2;
}
});
//when the client polls the queue after it's received the 'done' notifier from the server
app.post('/poll', async function(request, response) {
console.log("Polling")
let id = request.body.id
const results = await queue(id);
for (r of results.returnvalue) {
console.log("Sending " + r.title);
}
response.send(results.returnvalue)
});
//scrape
async function scrape(municipality, area, id) {
const client = await pool.connect();
try {
await client.query('BEGIN')
var location = ""
if (municipality != "") {
location = municipality + ", " + area
} else {
location = area
}
let inDatabase = await client.query('SQL statement AS it_does_exist', [params]);
io.to(id).emit("Processing3", "");
console.log("Processing3");
if (inDatabase.rows[0].it_does_exist == false) {
let query = "book clubs near " + location
var terminationTime = new Date()
terminationTime.setHours(terminationTime.getHours() + 4);
let date = ("0" + terminationTime.getDate()).slice(-2);
let month = ("0" + (terminationTime.getMonth() + 1)).slice(-2);
let year = terminationTime.getFullYear();
let hours = terminationTime.getHours();
let minutes = terminationTime.getMinutes();
let seconds = terminationTime.getSeconds();
let timestamp = year + "-" + month + "-" + date + " " + hours + ":" + minutes + ":" + seconds
try {
await client.query(`SQL statement`, [params]);
} catch(err) {
console.log("FAILURE: scrape() at 1.")
console.log(err)
}
var queryLocation = "New York,New York,United States" //default search origination is here
var queryGLCode = "US"
io.to(id).emit("Processing4", "");
console.log("Processing4");
try {
await fetch('https://serpapi.com/locations.json?q='+municipality+'&limit=10', { method : "GET" })
.then(res => res.json())
.then((json) => {
for (let index = 0; index < 10; index++) {
let locationAPIName = json[index].canonical_name
let locationAPICode = json[index].country_code
let resultLatitude = json[index].gps[1];
let resultLongitude = json[index].gps[0];
}
});
} catch(err) {
console.log("FAILURE: scrape() at 2.")
console.log(err)
}
io.to(id).emit("Processing5", "");
console.log("Processing5");
try {
await Promise.all([
searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode}).then(data => async function(){
try {
await client.query('BEGIN');
let results = data.events_results
if (results != null) {
console.log("first HAD results")
for (result of results) {
var fixedAddress = result.address[0]
let address = fixedAddress + ", " + result.address[1]
let title = result.title + address
var description = result.description
let geoData = await geocode(address); //mapbox geocode the address
let latitude = Number(geoData.center[0]);
let longitude = Number(geoData.center[1]);
await client.query(`SQL statement`, [params]);
}
io.to(id).emit("Processing6", "");
console.log("Processing6");
} else {
console.log("first DID NOT have results")
}
console.log("FIRST BLOCK")
await client.query('COMMIT');
} catch(err) {
console.log("Results[0] not found.")
console.log(err)
await client.query('ROLLBACK');
}
}()),
searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "10"}).then(data => async function(){
// same as the one above, just with an offset
}()),
searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "20"}).then(data => async function(){
// same as the one above, but with a different offset
}())
])
} catch(err) {
console.log("FAILURE: scrape() at 3.")
console.log(err)
}
} else {
console.log("Location already in the database.")
}
await client.query('COMMIT')
} catch(err) {
await client.query('ROLLBACK')
console.log(err)
} finally {
client.release()
return "Resolved";
}
}
//Client establish socket connection
func establishConnection(_ completion: (() -> Void)? = nil) {
let socketUrlString: String = appState.server
self.manager = SocketManager(socketURL: URL(string: socketUrlString)!, config: [.log(false), .reconnects(true), .extraHeaders(["header": "customheader"])])
self.socket = manager?.defaultSocket
self.socket?.connect()
self.socket?.once(clientEvent: .connect, callback: { (data, emitter) in
if completion != nil{
completion!()
}
})
//other socket functions
}
//Client initial post request
func process() {
let server = "serverstring" + "process"
let title = "title"
let subtitle = "subtitle"
let package = BookPackage(title: title, subtitle: subtitle, id: mySocketID) //this is after the initial connection
print("package is \(package)")
guard let url = URL(string: server) else { return }
var urlRequest = URLRequest(url: url)
urlRequest.addValue("application/json", forHTTPHeaderField: "Content-Type")
urlRequest.addValue("application/json", forHTTPHeaderField: "Accept")
urlRequest.httpMethod = "POST"
guard let data = try? JSONEncoder().encode(package) else { return }
urlRequest.httpBody = data
let task = URLSession.shared.dataTask(with: urlRequest) {
(data, response, error) in
if let error = error {
print(error)
return
}
guard let data = data else { return }
guard let dataString = String(data: data, encoding: String.Encoding.utf8) else { return }
let jsonData = Data(dataString.utf8)
var decodedJob: Job? = nil
do {
decodedJob = try JSONDecoder().decode(Job.self, from: jsonData) //Job is just a struct in the same form as the json object sent back from the server
} catch {
print(error.localizedDescription)
}
DispatchQueue.main.async {
self.appState.pendingJob = decodedJob
}
}
// start the task
task.resume()
}
The only consistent part of this bug is the logs right before the user disconnects (side note: 'reason of disconnect' and 'DISCONNECTED USER' fire on the socket.on('disconnect') event:
https://i.sstatic.net/7fjuU.png
https://i.sstatic.net/z5bmL.png
scrape()
has all sorts ofawait
statements in it that are awaiting actual asynchronous operations. That means it won't block for very long and other events (like socket.io ping events) will be able to be processed so I don't think that's an issue. With proper socket.io logging on client and server, you should be able to find out why it is disconnecting. See socket.io/docs/v3/logging-and-debugging for info on socket.io logging. – Fermiumscrape
occurs, and you're saying the job process takes couple seconds to 15 seconds, maybe the process is somehow blocking the event loop and bypassing the timeout threshold. Check your timeout settings, and try and log the time, and if the socket disconnects occur when the time it takes the process to finish is past a couple seconds, setting the timeout higher could solve it, but isn't the best fix. – Staggheroku ps
-- This will tell you how many dynos and what type are running If you want to specify how many dynos of each type you want running on your instance, you can type thisheroku ps:scale web=1 queue=2
--This way you know there's only one dyno with one active instance of your nodejs server to handle network requests, ensuring no connections are dropped due to the load balancer serving requests to the wrong dyno. – Stagg