Execute batch of promises in series. Once Promise.all is done go to the next batch
Asked Answered
T

6

36

I have an array that contains an array of promises, and each inner array could have either 4k, 2k or 500 promises. In total there are around 60k promises and I may test it with other values as well.

I need to execute the first batch Promise.all(BigArray[0]) of calls/promises in parallel.

Once the first batch of inner array is done, I need to execute the next batch Promise.all(BigArray[1]) and so on and so on. Batch-to-batch needs to process serially.

If I try to execute a Promise.all(BigArray) it throws:

fatal error call_and_retry_2 allocation failed - process out of memory

So in order to prevent running out of memory, I need to execute each batch content in parallel, and batch-to-batch in series.

Here is an example piece of code:

function getInfoForEveryInnerArgument(InnerArray) {
    const CPTPromises = _.map(InnerArray, (argument) => getDBInfo(argument));
    return Promise.all(CPTPromises)
        .then((results) => {
            return doSomethingWithResults(results);
        });
}
function mainFunction() {
    BigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    //the summ of all arguments is over 60k...
    const promiseArrayCombination = _.map(BigArray, (InnerArray, key) => getInfoForEveryInnerArgument(InnerArray));

    Promise.all(promiseArrayCombination).then((fullResults) => {
        console.log(fullResults);
        return fullResults;
    })
}
Triglyph answered 13/5, 2016 at 14:57 Comment(3)
Are you the same user that posted this prior question that contains some relevant context: Execute Promise.all in series?Donnelldonnelly
Are you OK with all 60k operations being run in parallel, but you just want to process the results serially (one sub-array at a time)? Or, do you actually need the operations that each sub-array represents to be run serially and processed serially? The latter would be much safer from a memory and resource point of view in node.js.Donnelldonnelly
Yes I am. I need them to be ran serially. am ok with only the innerArray promises running in parallel. Once innerArray 1 is done, move on to the next 500-4k promises on innerArray 2 and run those in parallel, once innnerArray 500-4k promises are done move on to innerArray 3 and do those 500-4k, once innerArray 3 is done move on to innerArray 4 and do those 500-4k… and so onTriglyph
D
16

Your question is a bit misnamed which may have confused some folks in this question and in the previous version of this question. You are trying to execute a batch of async operations in series, one batch of operations, then when that is done execute another batch of operations. The results of those async operations are tracked with promises. Promises themselves represent async operations that have already been started. "Promises" aren't executed themselves. So technically, you don't "execute a batch of promises in series". You execute a set of operations, track their results with promises, then execute the next batch when the first batch is all done.

Anyway, here's a solution to serializing each batch of operations.

You can create an inner function which I usually call next() that lets you process each iteration. When the promise resolves from processing one innerArray, you call next() again:

function mainFunction() {
    return new Promise(function(resolve, reject) {
        var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
        //the summ of all arguments is over 60k...
        var results = [];

        var index = 0;
        function next() {
            if (index < bigArray.length) {
                getInfoForEveryInnerArgument(bigArray[index++]).then(function(data) {
                    results.push(data);
                    next();
                }, reject);
            } else {
                resolve(results);
            }
        }
        // start first iteration
        next();
    });
}

This also collects all the sub-results into a results array and returns a master promise who's resolved value is this results array. So, you could use this like:

mainFunction().then(function(results) {
    // final results array here and everything done
}, function(err) {
    // some error here
});

You could also use the .reduce() design pattern for iterating an array serially:

function mainFunction() {
    var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    return bigArray.reduce(function(p, item) {
        return p.then(function(results) {
            return getInfoForEveryInnerArgument(item).then(function(data) {
                results.push(data);
                return results;
            })
        });
    }, Promise.resolve([]));
}

This creates more simultaneous promises than the first option and I don't know if that is an issue for such a large set of promises (which is why I offered the original option), but this code is cleaner and the concept is convenient to use for other situations too.


FYI, there are some promise add-on features built for doing this for you. In the Bluebird promise library (which is a great library for development using promises), they have Promise.map() which is made for this:

function mainFunction() {
    var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    return Promise.map(bigArray, getInfoForEveryInnerArgument);

}
Donnelldonnelly answered 13/5, 2016 at 16:3 Comment(0)
F
42

An answer from October 2020. Async/await makes it short: only 10 code lines+JSDoc.

/**
 * Same as Promise.all(items.map(item => task(item))), but it waits for
 * the first {batchSize} promises to finish before starting the next batch.
 *
 * @template A
 * @template B
 * @param {function(A): B} task The task to run for each item.
 * @param {A[]} items Arguments to pass to the task for each call.
 * @param {int} batchSize
 * @returns {Promise<B[]>}
 */
async function promiseAllInBatches(task, items, batchSize) {
    let position = 0;
    let results = [];
    while (position < items.length) {
        const itemsForBatch = items.slice(position, position + batchSize);
        results = [...results, ...await Promise.all(itemsForBatch.map(item => task(item)))];
        position += batchSize;
    }
    return results;
}
Factotum answered 26/10, 2020 at 18:37 Comment(8)
This looks "promising" (pardon the pun) but Promise.all takes an array of promises so your solution is not "Same as Promise.all" since it only takes one task/promise but can call it with an array of different arguments.Heavily
Thanks for the comment @Marc. I agree. I've modified the code comment (first line) to make it more specific.Factotum
This is a great solution! The only adjustment I had to do in my code was to use Promise.allSettled() instead of Promise.all() so the batch execution wouldn't fail if a single promise had rejected.Overmeasure
Thank you for this. I modified to allow task function to have signature like foo(item, idx) by keeping track of iterations variable, incremented in the while loop. The line that calls task becomes Promise.all(itemsForBatch.map((item, idx) => task(item, iterations * batchSize + idx)))Rivers
Do you have an example of a usage for the task function ? I fail to see what kind of tasks I would do on an array of promises since this takes place before the promises are executedRipplet
@ThomasFournet the task can be any resource-intensive async function that you want to run on each item of the array. The promises are for each task. Promise.all() thrn waits for each batch of these. A bundle of tasks in each batch run in parallel. Does this answer your question?Factotum
@DavidVeszelovszki Alright that was because I already sent an array of promises as the items parameter. Now I get it. Thanks for your function, it works really great. I personally placed the task parameter as the last parameter and I check and if the type of task parameter isn't a function. Then if it's not, Promise.all directly takes the items.Ripplet
Do you think we can create a version like Promise.all(iterable, batchSize)?Gaye
D
16

Your question is a bit misnamed which may have confused some folks in this question and in the previous version of this question. You are trying to execute a batch of async operations in series, one batch of operations, then when that is done execute another batch of operations. The results of those async operations are tracked with promises. Promises themselves represent async operations that have already been started. "Promises" aren't executed themselves. So technically, you don't "execute a batch of promises in series". You execute a set of operations, track their results with promises, then execute the next batch when the first batch is all done.

Anyway, here's a solution to serializing each batch of operations.

You can create an inner function which I usually call next() that lets you process each iteration. When the promise resolves from processing one innerArray, you call next() again:

function mainFunction() {
    return new Promise(function(resolve, reject) {
        var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
        //the summ of all arguments is over 60k...
        var results = [];

        var index = 0;
        function next() {
            if (index < bigArray.length) {
                getInfoForEveryInnerArgument(bigArray[index++]).then(function(data) {
                    results.push(data);
                    next();
                }, reject);
            } else {
                resolve(results);
            }
        }
        // start first iteration
        next();
    });
}

This also collects all the sub-results into a results array and returns a master promise who's resolved value is this results array. So, you could use this like:

mainFunction().then(function(results) {
    // final results array here and everything done
}, function(err) {
    // some error here
});

You could also use the .reduce() design pattern for iterating an array serially:

function mainFunction() {
    var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    return bigArray.reduce(function(p, item) {
        return p.then(function(results) {
            return getInfoForEveryInnerArgument(item).then(function(data) {
                results.push(data);
                return results;
            })
        });
    }, Promise.resolve([]));
}

This creates more simultaneous promises than the first option and I don't know if that is an issue for such a large set of promises (which is why I offered the original option), but this code is cleaner and the concept is convenient to use for other situations too.


FYI, there are some promise add-on features built for doing this for you. In the Bluebird promise library (which is a great library for development using promises), they have Promise.map() which is made for this:

function mainFunction() {
    var bigArray = [[argument1, argument2, argument3, argument4], [argument5, argument6, argument7, argument8], ....];
    return Promise.map(bigArray, getInfoForEveryInnerArgument);

}
Donnelldonnelly answered 13/5, 2016 at 16:3 Comment(0)
R
3

@jfriend00 Just adding to your answer using async/await with reduce:

function runPromisesInSeries(bigArray, getInfoForEveryInnerArgument) {
  try {
    return bigArray.reduce(async (acc, cItem) => {
      const results = await acc
      const data = await getInfoForEveryInnerArgument(cItem)
      results.push(data)
      return results
    }, Promise.resolve([]))
  } catch (err) {
    throw err
  }
}
Rilda answered 19/10, 2018 at 9:19 Comment(0)
C
2

In addition, if original array is not of promises but of objects that should be processed, batch processing can be done without an external dependency using combination of Array.prototype.map(), Array.prototype.slice() and Promise.all():

// Main batch parallelization function.
function batch(tasks, pstart, atonce, runner, pos) {
  if (!pos) pos = 0;
  if (pos >= tasks.length) return pstart;
  var p = pstart.then(function() {
    output('Batch:', pos / atonce + 1);
    return Promise.all(tasks.slice(pos, pos + atonce).map(function(task) {
      return runner(task);
    }));
  });
  return batch(tasks, p, atonce, runner, pos + atonce);
}

// Output function for the example
function output() {
  document.getElementById("result").innerHTML += Array.prototype.slice.call(arguments).join(' ') + "<br />";
  window.scrollTo(0, document.body.scrollHeight);
}

/*
 * Example code.
 * Note: Task runner should return Promise.
 */
function taskrunner(task) {
  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      output('Processed:', task.text, 'Delay:', task.delay);
      resolve();
    }, task.delay);
  });
}

var taskarray = [];
function populatetasks(size) {
  taskarray = [];
  for (var i = 0; i < size; i++) {
    taskarray.push({
      delay: 500 + Math.ceil(Math.random() * 50) * 10,
      text: 'Item ' + (i + 1)
    });
  }
}

function clean() {
  document.getElementById("result").innerHTML = '';
}

var init = Promise.resolve();
function start() {
  var bsize = parseInt(document.getElementById("batchsize").value, 10),
    tsize = parseInt(document.getElementById("taskssize").value, 10);
  populatetasks(tsize);
  init = batch(taskarray.slice() /*tasks array*/ , init /*starting promise*/ , bsize /*batch size*/ , taskrunner /*task runner*/ );
}
<input type="button" onclick="start()" value="Start" />
<input type="button" onclick="clean()" value="Clear" />&nbsp;Batch size:&nbsp;
<input id="batchsize" value="4" size="2"/>&nbsp;Tasks:&nbsp;
<input id="taskssize" value="10" size="2"/>
<pre id="result" />
Customhouse answered 28/11, 2016 at 18:16 Comment(1)
See the first edit for non-recursive version of batch.Customhouse
H
0

You can do it recursively, for example here I needed to put about 60k documents in mongo, but it was too big, to do it in one step, therefore I take 1k documents, send them to the mongo, after it is finished I take another 1k documents etc.

exports.rawRecursive = (arr, start) => {
        //ending condition
        if (start > arr.length) {
            return;
        }

        Rawmedicament.insertManyAsync(_.slice(arr, start, start + 1000)).then(() => {
            //recursive
            exports.rawRecursive(arr, start + 1000);
        });
};

If you want to notice, when everything is done, you can in ending condition put callback or if you like Promises you can call resolve() there.

Huntress answered 13/5, 2016 at 15:7 Comment(0)
E
0

Dynamically batching more promises

A simple implementation where you can have a queue of tasks batched to run in parallel and add more dynamically:

class TaskQueue {
  constructor ({
    makeTask,
    initialData = [],
    getId = data => data.id,
    batchSize = 15,
    onComplete = () => {},
  }) {
    if (!makeTask) throw new Error('The "makeTask" parameter is required');

    this.makeTask = makeTask;
    this.getId = getId;
    this.batchSize = batchSize;
    this.onComplete = onComplete;
    this.queue = new Map();

    this.add(initialData);
  }

  add(...data) {
    data.forEach(item => {
      const id = this.getId(item);
      if (this.queue.has(id)) return;

      this.queue.set(id, item);
    });

    // running automatically on create or additional items added
    this.runNextBatch();
  }

  runNextBatch () {
    if (this.queueStarted) return;
    if (this.queue.size === 0) return;

    this.queueStarted = true;
    const currentBatchData = Array.from(this.queue.values()).slice(0, this.batchSize);

    const tasks = currentBatchData.map(data => {
      const id = this.getId(data);

      // Have some error handling implemented in `makeTask`
      this.makeTask(data)
        .finally(() => this.queue.delete(id));
    });

    return Promise.all(tasks)
      .then(() => {
        this.queueStarted = false;
        this.runNextBatch();
      })
      .finally(() => {
        this.queueStarted = false;
        if (this.queue.size === 0) this.onComplete();
      });
  }
}

// Usage
const lotOfFilesForUpload = [{ uri: 'file://some-path' }, { uri: 'file://some-other-path' }];

const upload = (file) => console.log('fake uploading file: ', file);

const taskQueue = new TaskQueue({
  initialData: lotOfFilesForUpload,
  getId: file => file.uri,
  makeTask: file => upload(file),
  onComplete: () => console.log('Queue completed'),
});

// You can add more tasks dynamically
taskQueue.add({ uri: 'file://yet-another-file' });

Elijaheliminate answered 12/5, 2020 at 16:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.