Get which promise completed in Promise.race
Asked Answered
J

7

30

Context: I need to make a large number of asynchronous calls (think around 300 to 3000 ajax calls) that are parallelizable. However, I do not want to strain the browser or server by calling them all at once. I also didn't want to run them sequentially because of the long time it would take to finish. I settled on running five or so at a time and derived this function to do so:

async function asyncLoop(asyncFns, concurrent = 5) {
    // queue up simultaneous calls 
    let queue = [];
    for (let fn of asyncFns) {
        // fire the async function and add its promise to the queue
        queue.push(fn());
        // if max concurrent, wait for the oldest one to finish
        if (queue.length >= concurrent) {
            await queue.shift();
        }
    }
    // wait for the rest of the calls to finish
    await Promise.all(queue);
};

Where asyncFns is an iterable of (not yet called) asynchronous functions.

Problem: This works, however I found that it's not always true that oldest is the first to be complete. I wanted to modify the function so that it uses Promise.race to wait until the first promise succeeds, then continue from there. Yet, I don't know which promise to remove:

        // if max concurrent, wait for the first one to finish
        if (queue.length >= concurrent) {
            await Promise.race(queue);
            // ??? get race's completed promise
            // queue.splice(queue.indexOf(completed), 1);
        }

I could splice it out of the queue (which is now more of a set I guess) if I just knew the index of which one completed. It doesn't look like I can get the original promise from the derived one that race returns. Suggestions?

Jaclin answered 20/3, 2017 at 5:33 Comment(3)
Have a look at this answerCretan
My take will be thus: const [ idx, result ] = await Promise.race(promisesArr.map((promise, idx) => promise.then((result) => [ idx, result ]); This won't cover exceptions though. To complete it, I have a special function handy (safelyExecuteAsync) which returns a promise of a tuple [ error, result ]. With it, the code becomes: const [ idx, [error, result] ] = await Promise.race(promisesArr.map((promise, idx) => safelyExecuteAsync(promise).then((tuple) => [ idx, tuple ]);Bozcaada
Promise returns a Promise object, but that returned Promise object itself is not the same Promise object that resolved. It has the same value, but wrapped in a different Promise. Think of it this way - the Promose.race function (or any other async function) itself has a to set up its own Promise in which to perform its asynchronous work. That's the only one that caller ever gets to see. Even if Promise.race is used as a wait-able semaphore, the returned promise itself is no use (value's can be duplicates).Dogtooth
S
11

The "remove from queue" step should happen by the completed promise itself (using then) instead of relying on the returned promise from Promise.race. It seems this is the only way around it.

async function asyncLoop(asyncFns, concurrent = 5) {
    // queue up simultaneous calls 
    let queue = [];
    let ret = [];
    for (let fn of asyncFns) {
        // fire the async function, add its promise to the queue, and remove
        // it from queue when complete
        const p = fn().then(res => {
            queue.splice(queue.indexOf(p), 1);
            return res;
        });
        queue.push(p);
        ret.push(p);
        // if max concurrent, wait for one to finish
        if (queue.length >= concurrent) {
            await Promise.race(queue);
        }
    }
    // wait for the rest of the calls to finish
    await Promise.all(queue);
};

Npm module: https://github.com/rxaviers/async-pool

Sequacious answered 28/12, 2017 at 12:7 Comment(2)
While the original accepted answer (community wiki) works, this one has the added bonus of efficiency (you don't remapped every promise in the queue) and is simpler/more intuitive.Jaclin
Shouldn't the function end with return Promise.all(ret)?Warnock
V
16

Credits to @Dan D. who deleted their answer shortly after posting:

let [completed] = await Promise.race(queue.map(p => p.then(res => [p])));

This creates a promise for each of the elements in the queue that when the promise completes returns the promise. Then by racing those you get the promise that first completed.

Originally there was not brackets around completed or p. Since p is a promise and has a then method, the promise was chained again, returning the promise's resolved value rather than the promise (thus it didn't work). I assume that's why the answer was deleted. By wrapping the promise in an array, then using an Array Destructuring assignment, you can prevent it from chaining again, getting the promise.

Volley answered 20/3, 2017 at 5:33 Comment(2)
I can't see how this is an answer - one would expect 25 requests that take 1 second with a max concurrency of 5 requests, to take 5 seconds minimum - yet using that fragment of code alone instead of await Promise.race(queue); - 25 requests takes 2 seconds to complete - clearly not possibleKania
ahhh, I didn't see how that code relates to splicing queue.splice(queue.indexOf(completed), 1);Kania
S
11

The "remove from queue" step should happen by the completed promise itself (using then) instead of relying on the returned promise from Promise.race. It seems this is the only way around it.

async function asyncLoop(asyncFns, concurrent = 5) {
    // queue up simultaneous calls 
    let queue = [];
    let ret = [];
    for (let fn of asyncFns) {
        // fire the async function, add its promise to the queue, and remove
        // it from queue when complete
        const p = fn().then(res => {
            queue.splice(queue.indexOf(p), 1);
            return res;
        });
        queue.push(p);
        ret.push(p);
        // if max concurrent, wait for one to finish
        if (queue.length >= concurrent) {
            await Promise.race(queue);
        }
    }
    // wait for the rest of the calls to finish
    await Promise.all(queue);
};

Npm module: https://github.com/rxaviers/async-pool

Sequacious answered 28/12, 2017 at 12:7 Comment(2)
While the original accepted answer (community wiki) works, this one has the added bonus of efficiency (you don't remapped every promise in the queue) and is simpler/more intuitive.Jaclin
Shouldn't the function end with return Promise.all(ret)?Warnock
E
3

Here's a minimalistic implementation that returns the promise that wins the Promise.race. It uses JavaScript iterators, so it doesn't create new arrays/maps:

/**
 * When any promise is resolved or rejected, 
 * returns that promise as the result.
 * @param  {Iterable.<Promise>} iterablePromises An iterable of promises.
 * @return {{winner: Promise}} The winner promise.
 */
async function whenAny(iterablePromises) {
  let winner;

  await Promise.race(function* getRacers() {
    for (const p of iterablePromises) {
      if (!p?.then) throw new TypeError();
      const settle = () => winner = winner ?? p;
      yield p.then(settle, settle);
    }
  }());

  // return the winner promise as an object property, 
  // to prevent automatic promise "unwrapping"
  return { winner }; 
}

// test it

function createTimeout(ms) {
  return new Promise(resolve => 
    setTimeout(() => resolve(ms), ms));
}

async function main() {
  const p = createTimeout(500);
  const result = await whenAny([
    createTimeout(1000),
    createTimeout(1500),
    p
  ]);

  console.assert(result.winner === p);
  console.log(await result.winner);
}

main().catch(e => console.warn(`caught on main: ${e.message}`));
Earing answered 7/11, 2020 at 2:31 Comment(0)
K
2

Rather than a single queue, why not have 5 "serial" queues

async function asyncLoop(asyncFns, concurrent = 5) {
    const queues = new Array(concurrent).fill(0).map(() => Promise.resolve());
    let index = 0;
    const add = cb => {
        index = (index + 1) % concurrent;
        return queues[index] = queues[index].then(() => cb());
    };
    let results = [];
    for (let fn of asyncFns) {
        results.push(add(fn));
    }
    await Promise.all(results);
};

OK ... firstly, it's not pretty, but it seems to work - however, this assumes asyncFns is an Array - probably simple to "fix" for an Object using Object.values

const asyncLoop = (asyncFns, concurrent = 5) => {
    let inFlight = 0;
    let pending = [];
    const end = result => {
        inFlight--;
        var job = pending.shift();
        job && job();
        return result;
    };
    const begin = (fn) => {
        if (inFlight < concurrent) {
            inFlight++;
            return fn();
        }
        let resolver;
        const promise = new Promise(resolve => {
            resolver = () => {
                inFlight ++;
                resolve(fn());
            }
        });
        pending.push(resolver);
        return promise;
    }
    return Promise.all(asyncFns.map(fn => begin(fn).then(end)));
};

const fns = new Array(25).fill(0).map((v, index) => () => new Promise(resolve => {
    let timeout = 1000;
    if (index == 6  || index == 11) {
        timeout = 2000;
    }
    setTimeout(resolve, timeout, index);
}));
console.time('timeToComplete');
asyncLoop(fns, 5).then(result => {
    console.timeEnd('timeToComplete');
    console.log(JSON.stringify(result));
});
Kania answered 20/3, 2017 at 5:55 Comment(4)
Interesting solution! I wouldn't have thought of making 5 strings of promise chains. Unfortunately not all the calls take a uniform time (and why I tried to use race). Imagine 25 async functions, 24 that take 1s and one that takes 5s. My original solution would take 5s to do five calls, then 4s to do the other 20 totaling 9s. This solution would have one queue that would take 9s. If the queue was truly rolling it would take 6s. However, this does definitely improve over my original: If two calls were 5s, spaced 6 calls apart, my original would take 13s and this solution would take 9s.Jaclin
yeah, I know there's a flaw in the answer - however, you do know that browsers limit the number of simultaneous requests anyway, rightKania
@JonathanGawrych - I think I've come up with a solutionKania
I get the feeling Bluebird promises Promise.map may be a solution, I believe you can pass it a concurrency parameter!!! (damn my feeble mind)Kania
R
2

I wanted something similar, but I wasn't satisfied with any of these answers.

Here is what I came up with. It doesn't quite answer your question, but may help you get part of the way there.

It uses something similar to Jonathan Gawrych's answer.

Maybe this will help someone else:

/**
 * Used like:
 * dealWithPromisesAsTheyResolve([
 *   new Promise((res, rej) => setTimeout(res, 2000, 2000)),
 *   new Promise((res, rej) => setTimeout(res, 1000, 1000)),
 *   new Promise((res, rej) => setTimeout(res, 4000, 4000)),
 *   new Promise((res, rej) => setTimeout(res,    0,    0)),
 *   new Promise((res, rej) => setTimeout(rej, 3000, 3000)),
 * ], num => console.log(num), err => console.log(`error: ${err}`));
 *
 * Will output:
 *   0
 *   1000
 *   2000
 *   error: 3000
 *   4000
 */

async function dealWithPromisesAsTheyResolve(promises, resolveCallback, rejectCallback) {
  var _promises = new Map();
  promises.forEach(promise => _promises.set(
    promise,
    promise
      .then(value => [null, value, promise])
      .catch(error => [error, null, promise])
  ));

  while (_promises.size > 0) {
    let [error, value, promise] = await Promise.race(_promises.values());
    _promises.delete(promise);
    if (error) {
      rejectCallback(error);
    } else {
      resolveCallback(value);
    }
  }
}

You could modify it to accept a limit and add new a new promise each time one completes.

Risarise answered 7/3, 2019 at 6:43 Comment(0)
P
2

Totally inspired by Jonathan Gawrych, here's what I did to deal with an infinite stream of promises, of which I wanted to have 10 running in parallel at all times.

async function* concurrentResolver(promisesIter, numInParallel) {
  const pending = [];

  for (let i = 0; i < numInParallel; i++) {
    const next = promisesIter.next();
    if (next.done) {
      break;
    }
    pending.push(next.value);
  }

  while (pending.length) {
    const darkMagic = pending.map((p) => p.then((_) => [p]));
    const [promise] = await Promise.race(darkMagic);
    pending.splice(pending.indexOf(promise), 1);

    const next = promisesIter.next();
    if (!next.done) {
      pending.push(next.value);
    }

    // the following `await` is instantaneous, since
    // the promise has already been resolved.
    yield await promise;
  }
}

And here's some code to test it:

function* promisesGenerator({ howMany, timeEachResolves }) {
  for (let i = 0; i < howMany; i++) {
    yield new Promise((res) =>
      setTimeout(res, timeEachResolves, "fake server res")
    );
  }
}

const promisesIter = promisesGenerator({ howMany: 30, timeEachResolves: 3000 });
const numInParallel = 10;

for await (const res of concurrentResolver(promisesIter, numInParallel)) {
  console.log(`at ${new Date().toLocaleTimeString()}: ${res}`);
}

/*
Output from Chrome's console:
(10) at 7:06:44 PM: fake server res
(10) at 7:06:47 PM: fake server res
(10) at 7:06:50 PM: fake server res
*/
Pentastich answered 3/8, 2021 at 16:13 Comment(0)
Q
0

To see which promise completed first in Promise.race you can use this:

// There is no way to know which promise is resolved/rejected.
// So we map it to a new promise to return the index wether it fails or succeeeds.
let indexPromises = promises.map((p, index) => p.then(() => index, () => {throw index;}));

I took it from here: https://blog.jdriven.com/2016/12/promise-me-you-wont-use-promise-race/

An example:

const pInstances1 = ['a', 'b', 'c', 'd']
const pInstances2 = ['a', 'b', 'c', 'd', 'e', 'f'];

const prom1 = [];
for (let index of pInstances1.keys()) {
    prom1.push(new Promise(res => setTimeout(()=>res(index), index * 1000)));
}

const prom2 = [];
for (let index of pInstances2.keys()) {
    prom2.push(new Promise(res => setTimeout(()=>res(index), index * 1000)));
}

(async () => {
    const indexPromises = await Promise.race([Promise.all(prom1), 
    Promise.all(prom2)].map((p, index) => p.then(() => index, () => {throw index;})));
console.log(indexPromises);
})();
Quasar answered 18/4 at 11:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.