Throttle amount of promises open at a given time
Asked Answered
L

7

14

The following TypeScript performs each call to doSomething(action) one at a time. (Meaning the second item in the list does not get a call made until the first one is done).

async performActionsOneAtATime() {
    for (let action of listOfActions) {
        const actionResult = await doSomethingOnServer(action);
        console.log(`Action Done: ${actionResult}`);
    }
 }

This one will send all the requests to the server right away (without waiting for any responses):

async performActionsInParallel() {
    for (let action of listOfActions) {
        const actionResultPromise = doSomething(action);
        actionResultPromise.then((actionResult) => {
            console.log(`Action Done: ${actionResult}`);
        });
    }
}

But what I really need is a way to throttle them. Maybe have 10 or 20 calls open at a time. (One at at a time is too slow, but all 600 will overload the server.)

But I am having a hard time figuring this out.

Any suggestions on how I can throttle the number of calls to X open at a time?

(This question uses TypeScript, but I would be fine with an ES6 JavaScript answer.)

Leland answered 14/7, 2016 at 22:48 Comment(3)
i use underscore.js (my go to js lib) for this kind of thing. Look at debounce and throttleConnelley
@Connelley - The debounce and throttle implementations I have seen are time based. (Wait x amount of time between calls.) Is underscores queue based? (X number of concurrent calls at a time.?Leland
You may want to take a look at my answer. It's somewhat queue based (assuming the list is an array with the pop() method). Once a promise returns, it notifies the list to start fetching the next one.Jedlicka
F
19

You can do this in one short function. (Returns values in order per Mulan's suggestion. Thanks!)

/**
 * Performs a list of callable actions (promise factories) so
 * that only a limited number of promises are pending at any
 * given time.
 *
 * @param listOfCallableActions An array of callable functions,
 *     which should return promises.
 * @param limit The maximum number of promises to have pending
 *     at once.
 * @returns A Promise that resolves to the full list of values
 *     when everything is done.
 */
function throttleActions(listOfCallableActions, limit) {
  // We'll need to store which is the next promise in the list.
  let i = 0;
  let resultArray = new Array(listOfCallableActions.length);

  // Now define what happens when any of the actions completes.
  // Javascript is (mostly) single-threaded, so only one
  // completion handler will call at a given time. Because we
  // return doNextAction, the Promise chain continues as long as
  // there's an action left in the list.
  function doNextAction() {
    if (i < listOfCallableActions.length) {
      // Save the current value of i, so we can put the result
      // in the right place
      let actionIndex = i++;
      let nextAction = listOfCallableActions[actionIndex];
      return Promise.resolve(nextAction()).then(result => {
        // Save results to the correct array index.
        resultArray[actionIndex] = result;
      }).then(doNextAction);
    }
  }

  // Now start up the original <limit> number of promises.
  // i advances in calls to doNextAction.
  let listOfPromises = [];
  while (i < limit && i < listOfCallableActions.length) {
    listOfPromises.push(doNextAction());
  }
  return Promise.all(listOfPromises).then(() => resultArray);
}

// Test harness:

function delay(name, ms) {
  return new Promise((resolve, reject) => setTimeout(() => {
    console.log(name);
    resolve(name);
  }, ms));
}

var ps = [];
for (let i = 0; i < 10; i++) {
  ps.push(() => {
    console.log(`begin ${i}`);
    return delay(`complete ${i}`, Math.random() * 3000);
  });
}

throttleActions(ps, 3).then(result => console.log(result));
Fortuna answered 15/7, 2016 at 1:11 Comment(1)
Thank you so much! The one and only solution for throttling promises.Cence
C
3

EDIT

Jeff Bowman has vastly improved his answer to resolve meaningful values. Feel free to view the history of this answer to understand why the resolved values are so important/useful.


throttlep

This solution closely mimics the native Promise.all

How it's the same …

  • Resolves promises as quickly as possible
  • Resolves an array of values in the same order as the inputs
  • Rejects as soon as it encounters one reject

How it's different …

  • Number parameter limits the number of simultaneously-running Promises
  • Array input accepts promise creators (thunks); not actual Promises

// throttlep :: Number -> [(* -> Promise)]
const throttlep = n=> Ps=>
  new Promise ((pass, fail)=> {
    // r is the number of promises, xs is final resolved value
    let r = Ps.length, xs = []
    // decrement r, save the resolved value in position i, run the next promise
    let next = i=> x=> (r--, xs[i] = x, run(Ps[n], n++))
    // if r is 0, we can resolve the final value xs, otherwise chain next
    let run = (P,i)=> r === 0 ? pass(xs) : P().then(next(i), fail)
    // initialize by running the first n promises
    Ps.slice(0,n).forEach(run)
  })

// -----------------------------------------------------
// make sure it works

// delay :: (String, Number) -> (* -> Promise)
const delay = (id, ms)=>
  new Promise (pass=> {
    console.log (`running: ${id}`)
    setTimeout(pass, ms, id)
  })

// ps :: [(* -> Promise)]
let ps = new Array(10)
for (let i = 0; i < 10; i++) {
  ps[i] = () => delay(i, Math.random() * 3000)
}

// run a limit of 3 promises in parallel
// the first error will reject the entire pool
throttlep (3) (ps) .then (
  xs => console.log ('result:', xs),
  err=> console.log ('error:', err.message)
)

Console output

Inputs are run in order; Resolved results are in the same order as the inputs

running: 0
running: 1
running: 2
=> Promise {}
running: 3
running: 4
running: 5
running: 6
running: 7
running: 8
running: 9
result: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Practical use

Let's look at a more practical code example. This code is tasked with fetching a set of images from a server. This is how we might use throttlep to throttle the amount of simultaneous requests to 3 at a time

// getImage :: String -> Promise<base64>
let getImage = url=> makeRequest(url).then(data => data.base64, reqErrorHandler)

// actions :: [(* -> Promise<base64>)]
let actions = [
  ()=> getImage('one.jpg'),
  ()=> getImage('two.jpg'),
  ()=> getImage('three.jpg'),
  ()=> getImage('four.jpg'),
  ()=> getImage('five.jpg')
]

// throttle the actions then do something...
throttlep (3) (actions) .then(results => {
  // results are guaranteed to be ordered the same as the input array
  console.log(results)
  // [<base64>, <base64>, <base64>, <base64>, <base64>]
})
Cyclostyle answered 17/7, 2016 at 21:47 Comment(4)
Thanks for the feedback! I was thinking that it'd be useful to have the answers like Promise.all, but didn't get a chance to in my first pass. Added 6 lines to my answer to update, and credited you.Fortuna
@JeffBowman happy to collaborate with you. I've edited my answer as well to prevent people from becoming confused ^_^Cyclostyle
Your style of testing provided a much cleaner output, so I borrowed that from you too ^_^Cyclostyle
You had a bug in your code (an exception would have raised many times before finishing). It can be solved by changing the run function like so: let run = (P,i)=> { if(r === 0) { pass(xs); } else if(P){ P().then(next(i), fail); } }; Thank you and @JeffBowmansupportsMonica for you solutions!Jolinejoliotcurie
B
2

There isn't anything built-in for this, so you'll have to build your own. AFAIK, there's no library for this yet, either.

First, start with a "deferral" - a promise that allows external code to resolve it:

class Deferral<T> {
    constructor() {
        this.promise = new Promise<T>((resolve, reject) => {
            this.resolve = resolve;
            this.reject = reject;
        });
    }

    promise: Promise<T>;
    resolve: (thenableOrResult?: T | PromiseLike<T>) => void;
    reject: (error: any) => void;
}

Then you can define a "wait queue", which represents all the code blocks that are waiting to enter the critical section:

class WaitQueue<T> {
    private deferrals: Deferral<T>[];

    constructor() {
        this.deferrals = [];
    }

    get isEmpty(): boolean {
        return this.deferrals.length === 0;
    }

    enqueue(): Promise<T> {
        const deferral = new Deferral<T>();
        this.deferrals.push(deferral);
        return deferral.promise;
    }

    dequeue(result?: T) {
        const deferral = this.deferrals.shift();
        deferral.resolve(result);
    }
}

Finally you can define an async semaphore, as such:

export class AsyncSemaphore {
    private queue: WaitQueue<void>;
    private _count: number;

    constructor(count: number = 0) {
        this.queue = new WaitQueue<void>();
        this._count = count;
    }

    get count(): number { return this._count; }

    waitAsync(): Promise<void> {
        if (this._count !== 0) {
            --this._count;
            return Promise.resolve();
        }
        return this.queue.enqueue();
    }

    release(value: number = 1) {
        while (value !== 0 && !this.queue.isEmpty) {
            this.queue.dequeue();
            --value;
        }
        this._count += value;
    }
}

Example usage:

async function performActionsInParallel() {
    const semaphore = new AsyncSemaphore(10);
    const listOfActions = [...];
    const promises = listOfActions.map(async (action) => {
        await semaphore.waitAsync();
        try {
            await doSomething(action);
        }
        finally {
            semaphore.release();
        }
    });
    const results = await Promise.all(promises);
}

This method first creates a throttler and then immediately starts all the asynchronous operations. Each asynchronous operation will first (asynchronously) wait for the semaphore to be free, then perform the action, and finally release the semaphore (allowing another one in). When all asynchronous operations have completed, all the results are retrieved.

Warning: this code is 100% completely untested. I haven't even tried it once.

Booby answered 15/7, 2016 at 1:2 Comment(0)
J
0

You can do this with a pub-sub pattern. I too am not familiar with typescipt, and I don't know if this is happening in the browser or at the backend. I'll just write the pseudoCode for this (assuming it's backend):

//I'm assuming required packages are included e.g. events = require("events");
let limit = 10;
let emitter = new events.EventEmitter();

for(let i=0; i<limit; i++){
    fetchNext(listOfActions.pop());
}

function fetchNext(action){
    const actionResultPromise = doSomething(action);
    actionResultPromise.then((actionResult) => {
        console.log(`Action Done: ${actionResult}`);
        emitter.emit('grabTheNextOne', listOfActions.pop());
    });
}

emitter.on('grabTheNextOne', fetchNext);

EventEmitter is part of NodeJS, if you are working in Node. If in the browser, you can use the normal events model. The key idea here is the Publish-Subscribe pattern.

Jedlicka answered 14/7, 2016 at 23:22 Comment(0)
I
0

It is possible to throttle the Promises with generator. In the example below, we are throttling them so that

function asyncTask(duration = 1000) {
  return new Promise(resolve => {
    setTimeout(resolve, duration, duration)
  })
}


async function main() {
  const items = Array(10).fill(() => asyncTask()) {
    const generator = batchThrottle(3, ...items)
    console.log('batch', (await generator.next()).value)
    for await (let result of generator) {
      console.log('remaining batch', result)
    }
  }

  {
    const generator = streamThrottle(3, ...items)
    console.log('stream', await generator.next())
    for await (let result of generator) {
      console.log('remaining stream', result)
    }
  }

}

async function* batchThrottle(n = 5, ...items) {
  while (items.length) {
    const tasks = items.splice(0, n).map(fn => fn())
    yield Promise.all(tasks)
  }
}

async function* streamThrottle(n = 5, ...items) {
  while (items.length) {
    const tasks = items.splice(0, n).map(fn => fn())
    yield* await Promise.all(tasks)
  }
}
main().catch()
Infamous answered 31/1, 2019 at 14:59 Comment(0)
B
0

Here is a version of a throttle function using async await syntax:

async function throttle(tasks, max) {
    async function run(_, i) {
        values[i] = await tasks[i]();
        if (max < tasks.length) return run(_, max++);
    };
    const values = [];
    try {
        await Promise.all(tasks.slice(0, max).map(run));
    } catch (error) {
        max = tasks.length; // don't allow new tasks to start
        throw error;
    }
    return values;
}

// Demo
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));

const tasks = Array.from({length: 10}, (_, i) => 
    async () => {
        console.log(`task ${i} starts`);
        await delay((1 + i % 3)*1000);
        console.log(`task ${i} ends with ${i*10}`);
        return i*10;
    }
);

throttle(tasks, 4).then(console.log);
Betaine answered 20/6, 2021 at 9:45 Comment(0)
M
0

Here's my take on it using TypeScript:

function ParallelMap<T, U>(array: U[], callbackFn: (element: U, index?: number, array?: U[]) => Promise<T>, maxDegreeOfParallelism: number = -1) {
    if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);

    return new Promise<T[]>((resolve, reject) => {
        const inputArraySize = array.length;

        let indexTracker = 0;
        let completedTracker = 0;

        const output = new Array<T>(inputArraySize);
        const errors = new Array<{ index: number, error: any }>();

        const processNext = () => {
            const elementIndex = indexTracker++;
            const element = array[elementIndex];

            callbackFn(element, elementIndex, array).then(
                value => output[elementIndex] = value,
                reason => errors.push({ index: elementIndex, error: reason })
            ).finally(() => {
                ++completedTracker;

                if (completedTracker == inputArraySize) {
                    if (errors.length > 0) reject(errors);

                    else resolve(output);
                }

                else if (indexTracker < inputArraySize) processNext();
            });
        };

        for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
            processNext();
        }
    });
}

Usage:

const maxDegreeOfParallelism = 3; // Number of concurrent tasks
const result = await ParallelMap(
    inputArray,
    async (value, index, array) => { /* Do something */ }, // Some async function to process each element
    maxDegreeOfParallelism
);

And same in JavaScript:

function ParallelMap(array, callbackFn, maxDegreeOfParallelism = -1) {
  if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);

  return new Promise((resolve, reject) => {
    const inputArraySize = array.length;

    let indexTracker = 0;
    let completedTracker = 0;

    const output = new Array(inputArraySize);
    const errors = new Array();

    const processNext = () => {
      const elementIndex = indexTracker++;
      const element = array[elementIndex];

      callbackFn(element, elementIndex, array).then(
        value => output[elementIndex] = value,
        reason => errors.push({
          index: elementIndex,
          error: reason
        })
      ).finally(() => {
        ++completedTracker;

        if (completedTracker == inputArraySize) {
          if (errors.length > 0) reject(errors);

          else resolve(output);
        } else if (indexTracker < inputArraySize) processNext();
      });
    };

    for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
      processNext();
    }
  });
}



// Usage

(async() => {
  const input = new Array(10).fill(1); // Array containing 10 '1' values

  const oneSecondTask = (value, index) => {
    return new Promise(resolve => {
      setTimeout(() => {
        resolve(value + index); // Extremely complex calculation of adding index to value 1
      }, 1000);
    });
  };

  console.log(`const input = [${input.join(', ')}];`);
  console.log(`---------------------------------------------`);
  console.log(`... wait for 10s ...`);
  console.log(`---------------------------------------------`);

  let start = Date.now();
  let maxDegreeOfParallelism = 1;
  let result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
  console.log(`const result = [${result.join(', ')}];`);
  console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) one at a time`);

  console.log(`---------------------------------------------`);

  start = Date.now();
  maxDegreeOfParallelism = 2;
  result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
  console.log(`const result = [${result.join(', ')}];`);
  console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);

  console.log(`---------------------------------------------`);

  start = Date.now();
  maxDegreeOfParallelism = 5;
  result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
  console.log(`const result = [${result.join(', ')}];`);
  console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);

  console.log(`---------------------------------------------`);

  start = Date.now();
  maxDegreeOfParallelism = 10;
  result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
  console.log(`const result = [${result.join(', ')}];`);
  console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
})();
Methodize answered 4/2, 2022 at 21:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.