Async.js - ETIMEDOUT and Callback was already called
Asked Answered
D

1

6

I keep getting an ETIMEDOUT or ECONNRESET error followed by a Callback was already called error when I run index.js.

At first I thought it was because I was not including return prior to calling the onEachLimitItem callback. So I included it per the async multiple callbacks documentation. Still not solving it. I've also tried removing the error event and removing the callback to onEachLimit in the error event, but neither has worked. I've looked at the other SO questions around the issue of Callback already called, but because they aren't concerned with streams, I didn't find a solution.

My understanding is that if the stream encounters an error like ECONNRESET, it will return the callback in the error event and move on to the next stream, but this doesn't seem to be the case. It almost seems if the error resolves itself i.e. it re-connects and tries sending the errored steam up to Azure again and it works, then it triggers the 'finish' event, and we get the Callback already called.

Am I handling the callbacks within the stream events correctly?

var Q = require('q');
var async = require('async');
var webshot = require('webshot');
var Readable = require('stream').Readable;
var azure = require('azure-storage');

var blob = azure.createBlobService('123', '112244');
var container = 'awesome';

var countries = [
    'en-us', 'es-us', 'en-au', 'de-at', 'pt-br', 'en-ca', 'fr-ca', 'cs-cz', 'ar-ly', 'es-ve',
    'da-dk', 'fi-fi', 'de-de', 'hu-hu', 'ko-kr', 'es-xl', 'en-my', 'nl-nl', 'en-nz', 'nb-no',
    'nn-no', 'pl-pl', 'ro-ro', 'ru-ru', 'ca-es', 'es-es', 'eu-es', 'gl-es', 'en-gb', 'es-ar',
    'nl-be', 'bg-bg', 'es-cl', 'zh-cn', 'es-co', 'es-cr', 'es-ec', 'et-ee', 'fr-fr', 'el-gr',
    'zh-hk', 'en-in', 'id-id', 'en-ie', 'he-il', 'it-it', 'ja-jp', 'es-mx', 'es-pe', 'en-ph'
];

var uploadStreamToStorage = function (fileName, stream, onEachLimitItem) {
    var readable = new Readable().wrap(stream);
    var writeable = blob.createWriteStreamToBlockBlob(container, fileName);

    readable.pipe(writeable);

    writeable.on('error', function (error) {
        return onEachLimitItem.call(error);
    });

    writeable.on('finish', function () {
        onEachLimitItem.call(null);
    });
};

var takeIndividualScreenshot = function (ID, country, onEachLimitItem) {
    var fileName = ID + '-' + country + '.jpg';
    var url = 'https://example.com/' + country + '/' + ID;

    webshot(url, function (error, stream) {
        if (error) { throw 'Screenshot not taken'; }

        uploadStreamToStorage(fileName, stream, onEachLimitItem);

    });
};

var getAllCountriesOfId = function (ID) {
    var deferred = Q.defer();
    var limit = 5;

    function onEachCountry(country, onEachLimitItem) {
        takeIndividualScreenshot(ID, country, onEachLimitItem);
    }

    async.eachLimit(countries, limit, onEachCountry, function (error) {
        if (error) { deferred.reject(error); }
        deferred.resolve();
    });

    return deferred.promise;
};

var createContainer = function () {
    var df = Q.defer();
    var self = this;

    blob.createContainerIfNotExists(this.container, this.containerOptions, function (error) {

        if (error) { df.reject(error); }

        df.resolve(self.container);
    });

    return df.promise;
};

createContainer()
    .then(function () {
        return getAllCountriesOfId('211007');
    })
    .then(function () {
        return getAllCountriesOfId('123456');
    })
    .fail(function (error) {
        log.info(error);
    });

enter image description here

Deform answered 23/10, 2015 at 19:30 Comment(1)
One thing I'm noticing is that when I run this on my VM (3gb of RAM) vs locally (8gb of RAM), is that the script continues to throw the ECONNRESET error when I run it on my VM.Deform
D
4

You are letting your callback get called twice, as you already know. The question is; do you want to stop on all errors as you are iterating the stream or do you want to accumulate all errors from the stream?

There are multiple ways to catch and handle the errors which you are already doing, but because you aren't throwing the error object leading to additional calls from your data stream to fatally error.

The actual problem in your code is due to the scope of your return. When you are handling the error and trying to return the callback and halt script execution the scope of hour return is local to the streams error handler, not the global script hence the script continuing and catching moving on to the next valid stream.

writeable.on('error', function (error) {
   // This 'return' is in the local scope of 'writable.on('error')'
  return onEachLimitItem.call(error);
});

It could perhaps set an array, then handle the error outside of that functions local scope. i.e.

// Set the array's scope as global to the writable.on() error 
var errResults = [];
writeable.on('error', function (error) {
  // Push the local scoped 'error' into the global scoped 'errResults' array
  errResults.push(error);
});

writeable.on('finish', function () {
  // Are there any errors?
  return (errResults.length > 0) ?
    onEachLimitItem.call(errors) : onEachLimitItem.call(null);
});

The above is just one way you could tackle the problem.

I am not sure if you have read the error handling help provided from Joyent (original node.js language backers) but it should give you a good idea of your options when handling the error(s).

https://www.joyent.com/developers/node/design/errors

Darsie answered 26/10, 2015 at 0:37 Comment(4)
Hi @jas-, I am using Q's .fail error listener, but just sending the error to the logger that I'm using (I've added that to the above script). I'd like to accumulate all errors from the stream, and not have the script come crashing down - I'm just not sure how to do this. Reading over Joyent link now.Deform
I am not familiar with Q, but as I mentioned your script is not halting on errors and getting called twice. This can occur (and is in your case) because of the scope with which you are returning the callback error handler is local to the 'writeable.on()' error handling function.Darsie
Could you provide an example of how I would handle the error? Thanks, this is all new for me.Deform
The documentation should provide you with the options available. You must decide which method is going to work best for what you are trying to accomplish.Darsie

© 2022 - 2024 — McMap. All rights reserved.