limit concurrency and wait until all promises complete even if some reject
Asked Answered
C

3

6

My question is basically a combination of

I'm aware of Promise.allSettled, but I'm failing to find a good way to also limit concurrency.

What I have so far:

Idea 1 using p-limit:

const pLimit = require('p-limit');
const limit = pLimit(10);

let promises = files.map(pair => {
    var formData = {
        'file1': fs.createReadStream(pair[0]),
        'file2': fs.createReadStream(pair[1])
    };
        
    return limit(() => uploadForm(formData));
});
    
(async () => {
    const result = await Promise.allSettled(promises).then(body => {
        body.forEach(value => {
            if(value.status == "rejected")
                file.write(value.reason + '\n---\n');
        });
    });
})();

My problem with this solution is, that I have to create all promises in the first place and in doing so opening two file streams for each promise and I'll hit the limit of open files.

Idea 2 using p-queue: I tried around with generator functions to create and add new promises in the queue.on 'next' event, but I couldn't get it to work properly and this is probably not the right tool for this job.

Idea 3 using a PromisePool: This looked very promising in the beginning. Some of them support a generator function to create the promises for the pool, but I couldn't find one, who explicitly stated to behave like Promise.allSettled.

I implemented es6-promise-pool only to find out that it will stop after the first promise rejection.

Cheri answered 20/11, 2020 at 14:52 Comment(0)
I
8

It's simple enough to implement it yourself - make an array of functions that, when called, return the Promise. Then implement a limiter function that takes functions from that array and calls them, and once finished, recursively calls the limiter again until the array is empty:

const request = (file) => new Promise((res, rej) => {
  console.log('requesting', file);
  setTimeout(() => {
    if (Math.random() < 0.5) {
      console.log('resolving', file);
      res(file);
    } else {
      console.log('rejecting', file);
      rej(file);
    }
  }, 1000 + Math.random() * 1000);
});
const files = [1, 2, 3, 4, 5, 6];

const makeRequests = files.map(file => () => request(file));
const results = [];
let started = 0;
const recurse = () => {
  const i = started++;
  const makeRequest = makeRequests.shift();
  return !makeRequest ? null : Promise.allSettled([makeRequest()])
    .then(result => {
      results[i] = result[0];
      return recurse();
    })
};
const limit = 2;
Promise.all(Array.from({ length: limit }, recurse))
  .then(() => {
    console.log(results);
  });

If the order of the results doesn't matter, it can be simplified by removing the started and i variables.

Ingeminate answered 20/11, 2020 at 15:18 Comment(1)
I love this solution. And I learned a few things from it. Thanks a lot.Cheri
T
1

The accepted answer works more or less like p-limit. You were having issues with p-limit because the streams were declared outside the limit callback.

This would have solved your problem:

let promises = files.map(pair => {  
    return limit(() => uploadForm({
        'file1': fs.createReadStream(pair[0]),
        'file2': fs.createReadStream(pair[1])
    }));
});
Tangency answered 30/8, 2021 at 7:27 Comment(2)
I wanted to do this, but I can't figure out how to use p-limit without changing my entire application to an ES module and changing all my requires to imports. :(Aveyron
@JasonC try with an older version of p-limit. Usually the esm addition comes with a new major, try the previous one.Tangency
C
0

Here is there modification of jcouyang's gits

Support stop queues while in progress

function promiseAllStepN(n, list) {
  const head = list.slice(0, n);
  const tail = list.slice(n);
  const resolved = [];
  let stop = false;

  return {
    start: () =>
      new Promise((resolve) => {
        let processed = 0;
        function runNext() {
          if (processed === tail.length || stop) {
            resolve(Promise.all(resolved));
            return;
          }
          const promise = tail[processed](processed);
          resolved.push(
            promise.then((result) => {
              runNext();
              return result;
            })
          );
          processed++;
        }
        head.forEach((func) => {
          const promise = func(processed);
          resolved.push(
            promise.then((result) => {
              runNext();
              return result;
            })
          );
        });
      }),
    stop: () => {
      stop = true;
    },
  };
}

How to use:

const { start, stop } = promiseAllStepN(10, promises);

// start run promises in pool
start();

// Stop before all promise finished
stop()
Cowage answered 29/7, 2024 at 7:50 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.