Simple task runner in javascript with waiting
Asked Answered
A

7

5

I want to implement something like a task runner which will be pushed new tasks. Each of those tasks could be some async operation like waiting on user or making API calls or something else. The task runner makes sure that at a time only allowed number of tasks can execute, while other tasks will keep on waiting till their turn comes.

class Runner {
  constructor(concurrent) {
    this.taskQueue = []; //this should have "concurrent" number of tasks running at any given time

  }

  push(task) {
    /* pushes to the queue and then runs the whole queue */
  }
}

The calling pattern would be

let runner = new Runner(3);
runner.push(task1);
runner.push(task2);
runner.push(task3);
runner.push(task4);

where task is a function reference which will run a callback at the end by which we may know that it is finished. So it should be like

let task = function(callback) {
  /* does something which is waiting on IO or network or something else*/
  callback(); 
}

So I am pushing a closure to runner like

runner.push(function(){return task(callback);});

I think I might need to add a waitList queue as well. But the tasks are not promise itself, so I don't know how to check if those are finished.

Anyways, I need the right approach.

Affirm answered 13/4, 2018 at 23:24 Comment(9)
but I have to resolve that promise only when the task is finished. And I don't have control on the task, but have control only on the callback. Then again callback is defined outside the scope of the task runner. Update: this was a reply to a comment which got deleted! Original comment was to use async await internally.Affirm
Node? Browser? NativeScript? Electron? etc?Larocca
actually does not matter as long as there is no dependency on a specific third party architecture, so vanilla JS is preferred, no matter in Node or Browser.Affirm
It does matter Roy - implementations are different between those two platforms. Anyway. Good luck then.Larocca
@Affirm I posted an answer instead. In Node realm, we've got async/await and promisification utils, so the solution may vary. Since you don't want to resolve a promise prematurely, a task could be promise-returning function (basically, async function).Teammate
@estus Thanks, I understand. However vanilla JS with ES6 does have Promise, async await.. developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/…. So I'm good with that.Affirm
@Affirm async is ES2017, there's still not enough support in browsers to use it natively.Teammate
Why is this tagged async/await and es6-promise if you want to use callbacks? Or the other way round: if you want to use promises, just convert your existing callback API to promises and don't put the irrelevant detail about callbacks in your question.Valetudinary
Have a look at https://mcmap.net/q/88671/-limit-concurrency-of-pending-promises and https://mcmap.net/q/137144/-limited-parallelism-with-async-await-in-typescript-es7.Valetudinary
C
2

So I am pushing a closure to runner like

runner.push(function(){return task(callback);});

looks like missing pieces of the runner are being added to the calling syntax. A more complete runner might look like:

class Runner {
  constructor(concurrent) {
    this.taskQueue = []; // run at most "concurrent" number of tasks at once
    this.runCount = 0;
    this.maxCount = concurrent;
    this.notifyEnd = this.notifyEnd.bind(this);
  }
  
  notifyEnd() {
    --this.runCount;
    this.run();
  }

  run() {
    while( (this.runCount < this.maxCount) && taskQueue.length) {
      ++this.runCount;
      // call task with callback bound to this instance (in the constructor)
      taskQueue.shift()(this.notifyEnd);
    } 
  }

  push(task) {
    this.taskQueue.push(task);
    this.run();
  }
}

Now the runner's push method is called with a function taking a callback parameter. Runner state is contained in the value of runCount, 0 for idle or positive integer for tasks running.

There remain a couple of issues:

  1. The task may be called synchronously to code adding it to the runner. It lacks the strict approach of Promises that always call a then callback asynchronously from the event queue.

  2. The task code must return normally without error. This is not unheard of in JavaScript, where the host tracker for uncaught promise rejection errors must do the same thing, but it is fairly unusual in application script. The runner's call to the task could be placed in a try/catch block to catch synchronous errors but it should also add code to ignore the error if a callback was received before the task threw a synchronous error - otherwise the running task count could go wrong.

  3. If the task calls the callback multiple times, the running task count will be upset in the runner above.

Considerations similar to these were behind the development and standardization of the Promise interface. I suggest that after taking into consideration potential drawbacks, if a simple task runner meets all requirements then use one. If additional robustness is required, then promisifying tasks and writing a more promise-centric runner could prove a better alternative.

Clavier answered 14/4, 2018 at 3:7 Comment(4)
Do we need the while loop? Im guessing it can work with just "if" as "run" will be called both during entry in and exit from the queue.Affirm
I agree it's probably not needed - I wrote run simply to handle the current task queue state, whatever that might be, without additional consideration that only one task can be added at a time. If you wrote push to handle multiple parameters you could re-investigate the best way of doing it.Clavier
inside the while loop, you are dequeueing right? that should be taskQueue.shift() instead of unshift(), right?Slivovitz
@Slivovitz my bad, you are correct. I've fixed implementing the run count in run to use ++this.runCount as well. My thanks.Clavier
A
7

A simple demo of the concept. Changed the variable names for better understanding.

class Runner {
  constructor(concurrency = 1) {
    this.concurrency = concurrency;
    this.waitList = [];
    this.count = 0;
    this.currentQ = [];
  }

  push(task) {
    this.waitList.push(task);
    this.run();
  }

  run() {
    let me = this;
    if (this.count < this.concurrency) {
      this.count++;
      if (this.waitList.length > 0) {
        let task = this.waitList.shift();
        let id = task.id;
        this.currentQ.push(id);
        this.showQ();
        task.task(function() {
          this.currentQ.splice(this.currentQ.indexOf(id), 1);
          this.showQ();
          this.count--;
          this.run();
        }.bind(me))
      }
    }
  }

  showQ() {
    let q = "";
    q = this.currentQ.join(', ');
    document.getElementById("running").innerHTML = q;
  }
}

let output = document.getElementById("output");

let task1 = {
  id: 1,
  task: function(done) {
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task1");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task1");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 3000)
  }
}
let task2 = {
  id: 2,
  task: function(done) {
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task2");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task2");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 6000)
  }
}
let task3 = {
  id: 3,
  task: function(done) {
    this.id = "3";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task3");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task3");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 10000)
  }
}
let task4 = {
  id: 4,
  task: function(done) {
    this.id = "4";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task4");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task4");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 5000)
  }
}
let task5 = {
  id: 5,
  task: function(done) {
    this.id = "5";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task5");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task5");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 6000)
  }
}
let task6 = {
  id: 6,
  task: function(done) {
    this.id = "6";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task6");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task6");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 4000)
  }
}
let task7 = {
  id: 7,
  task: function(done) {
    this.id = "7";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task7");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task7");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 5000)
  }
}

let r = new Runner(3);
r.push(task1);
r.push(task2);
r.push(task3);
r.push(task4);
r.push(task5);
r.push(task6);
r.push(task7);
Currently running
<div id="running">

</div>
<hr>
<div id="output">

</div>
Affirm answered 16/4, 2018 at 0:57 Comment(1)
Man, this is gold!!!! Thanks for such a beautiful answer. I used this on my server to queue tasks. I didn't want to use a heavy library. This simply solved my purpose. Once again thanks mate!! Also I had to make a minor change this.count++; should come after the waitlist condition. Only then it works perfectly.Crowboot
T
2

It's reasonable for tasks to be defined as promises (more specifically, promise-returning functions) because it's a good use case for them; currently errors cannot be handled (without promises, they could be conventionally handled with Node-style callbacks). Even if they aren't promises, promises can be used internally:

  constructor(concurrent = 1) {
    this.concurrent = concurrent;
    this.taskQueue = [];
  }

  push(task) {
    this.taskQueue.push(task);
  }

  run() {
    let tasksPromise = Promise.resolve();
    for (let i = 0; i < this.taskQueue.length; i += this.concurrent) {
      const taskChunk = this.taskQueue.slice(i, i + this.concurrent));
      const taskChunkPromises = taskChunk.map(task => new Promise(resolve => task(resolve)));
      tasksPromise = tasksPromise.then(() => Promise.all(taskChunkPromises));
    }

    return tasksPromise;
  }

async..await can provide benefits in this case:

  async run() {
    for (let i = 0; i < this.taskQueue.length; i += this.concurrent) {
      const taskChunk = this.taskQueue.slice(i, i + this.concurrent));
      const taskChunkPromises = taskChunk.map(task => new Promise(resolve => task(resolve)));
      await Promise.all(taskChunkPromises);
    }
  }
Teammate answered 13/4, 2018 at 23:58 Comment(6)
I'm not sure what you mean by "errors cannot be handled." Are you suggesting that if the underlying task doesn't "callback" there'd be no way to trap the error? Or, are you suggesting that promises cannot handle error?Malaria
@KevinPeno Task functions in the OP don't handle errors, they are expected to call callback function in case of success. If there's synchronous error, this will result in uncaught error (Promise constructor doesn't catch them), and if there's async error, this will result in pending promise.Teammate
Makes sense. I just wanted to be sure it was clear we're talking about uncaught errors due to promisifying things vs. Promises not being able to catch errors.Malaria
I don't think this does what the OP wants. Why do you start the same task multiple times?Valetudinary
@Valetudinary Is there a mistake in code? I posted roughly same code I use for chunked processing. It's expected to run all tasks on each run call.Teammate
Oops, I thought the loop was i++ not i+=concurrent. Yeah, it's chunked processing, taking batches of concurrent tasks. Though that's probably still not what I'd want, to start the next task as soon as any of them finishes not the whole batch.Valetudinary
C
2

So I am pushing a closure to runner like

runner.push(function(){return task(callback);});

looks like missing pieces of the runner are being added to the calling syntax. A more complete runner might look like:

class Runner {
  constructor(concurrent) {
    this.taskQueue = []; // run at most "concurrent" number of tasks at once
    this.runCount = 0;
    this.maxCount = concurrent;
    this.notifyEnd = this.notifyEnd.bind(this);
  }
  
  notifyEnd() {
    --this.runCount;
    this.run();
  }

  run() {
    while( (this.runCount < this.maxCount) && taskQueue.length) {
      ++this.runCount;
      // call task with callback bound to this instance (in the constructor)
      taskQueue.shift()(this.notifyEnd);
    } 
  }

  push(task) {
    this.taskQueue.push(task);
    this.run();
  }
}

Now the runner's push method is called with a function taking a callback parameter. Runner state is contained in the value of runCount, 0 for idle or positive integer for tasks running.

There remain a couple of issues:

  1. The task may be called synchronously to code adding it to the runner. It lacks the strict approach of Promises that always call a then callback asynchronously from the event queue.

  2. The task code must return normally without error. This is not unheard of in JavaScript, where the host tracker for uncaught promise rejection errors must do the same thing, but it is fairly unusual in application script. The runner's call to the task could be placed in a try/catch block to catch synchronous errors but it should also add code to ignore the error if a callback was received before the task threw a synchronous error - otherwise the running task count could go wrong.

  3. If the task calls the callback multiple times, the running task count will be upset in the runner above.

Considerations similar to these were behind the development and standardization of the Promise interface. I suggest that after taking into consideration potential drawbacks, if a simple task runner meets all requirements then use one. If additional robustness is required, then promisifying tasks and writing a more promise-centric runner could prove a better alternative.

Clavier answered 14/4, 2018 at 3:7 Comment(4)
Do we need the while loop? Im guessing it can work with just "if" as "run" will be called both during entry in and exit from the queue.Affirm
I agree it's probably not needed - I wrote run simply to handle the current task queue state, whatever that might be, without additional consideration that only one task can be added at a time. If you wrote push to handle multiple parameters you could re-investigate the best way of doing it.Clavier
inside the while loop, you are dequeueing right? that should be taskQueue.shift() instead of unshift(), right?Slivovitz
@Slivovitz my bad, you are correct. I've fixed implementing the run count in run to use ++this.runCount as well. My thanks.Clavier
T
1

So I am pushing a closure to runner like

runner.push(function(){return task(callback);});

Would you be able to specify the task and callback functions as separate parameters in the push function instead? If yes, you can probably do something like this.

class Runner {
  constructor(maxCount = 1) {
    this.taskQueue = [];
    this.maxCount = maxCount;
    this.currentCount = 0;
  }

  run() {
    if (this.taskQueue.length && this.currentCount < this.maxCount) {
      const task = this.taskQueue.shift();
      task();
    }
  }

  push(task, callback) {
    this.taskQueue.push(() => {
      this.currentCount++;
      task((...args) => {
        this.currentCount--;
        callback(...args);
        this.run();
      })
    })
    this.run();
  }
}

// Example usage
const myCallback = (caller) => {
  console.log(`myCallback called by ${caller} ${new Date()}`);
};

const task1 = (callback) => {
  console.log(`task1 started ${new Date()}`);
  setTimeout(() => {
    callback('task1');
  }, 3000);
};

const task2 = (callback) => {
  console.log(`task2 started ${new Date()}`);
  setTimeout(() => {
    callback('task2');
  }, 3000);
};

const task3 = (callback) => {
  console.log(`task3 started ${new Date()}`);
  setTimeout(() => {
    callback('task3');
  }, 3000);
};

const task4 = (callback) => {
  console.log(`task4 started ${new Date()}`);
  setTimeout(() => {
    callback('task4');
  }, 3000);
};

const runner = new Runner(2);
runner.push(task1, myCallback);
runner.push(task2, myCallback);
runner.push(task3, myCallback);
runner.push(task4, myCallback);
Tristan answered 14/4, 2018 at 10:22 Comment(0)
S
0

Callback style:

run () {
    var task = this.taskQueue.unshift();
    task(() => this.run());
}
Sporophore answered 14/4, 2018 at 0:46 Comment(1)
this looks like endless recursion, doesn't that result in a stackoverflow after a couple of 1000 tasks ?Pshaw
B
0

Interesting question. So i tried to implement a very simple asynchronous task runner for JS. I believe these are essential when sending out bulk emails and such. Yet this one particularly demonstartes the job on Fetch API and i am sure it can easily be implemented for any asynchronous job.

Here i have a constructor which gives us an instance of an async task runner in which we will run given number async tasks as a chunk concurrently and wait for a given period of time to continue with the next chunk up until we are run out of all the tasks in the taskQueue. However at the meantime we can still push new tasks in and it will continue invoking the tasks as chunks including the newly added ones. Throughout this process we are free to modify the interval of each chunk being processed too.

What i havent implemented here is a proper error handling, other than .catch(console.log) and a try n times then fail mechanism. Which can simply be implemented from one of my previous answers.

When we feed the asynchronous task we of course need the consequtive .then() stages to complete the job. In my abstraction they are provided in an array as todo functions. So lets say in total you are to do 20 fetches like;

var url     = "https://jsonplaceholder.typicode.com/posts/",
    fetches = Array(20).fill().map((_,i) => () => fetch(`${url+(i+1)}`));

then you may provide a todo array as;

var todos   = [resp => resp.json(), json => console.log(json)];

where each item is a callback for a consequtive .then() stage as mentioned earlier. The following code runs tasks in chunks of 3 initially at 1000ms intervals but after 2 seconds it switches to 500ms intervals.

function TaskRunner(tasks = [],
                    todos = [],
                    group = 1,
                    interval = 1000){

  this.interval   = interval;
  this.concurrent = group;
  this.taskQueue  = tasks;
  this.todos      = todos;
}

TaskRunner.prototype.enqueue = function(ts = []){
                                 var cps; // current promises
                                 this.taskQueue = this.taskQueue.concat(ts);
                                 cps = this.taskQueue.splice(0,this.concurrent)
                                                     .map(t => this.todos.reduce((p,td) => p.then(td), t())
                                                                         .catch(console.log));
                                 this.taskQueue.length && setTimeout(this.enqueue.bind(this), this.interval);
                               };

var url     = "https://jsonplaceholder.typicode.com/posts/",
    fetches = Array(20).fill().map((_,i) => () => fetch(`${url+(i+1)}`)),
    todos   = [resp => resp.json(), json => console.log(json)],
    goFetch = new TaskRunner();

goFetch.todos.push(...todos);
goFetch.concurrent = 2;
goFetch.enqueue(fetches);
setTimeout(() => goFetch.interval = 500, 2000);
.as-console-wrapper {
max-height: 100% !important
}
Burlington answered 19/4, 2018 at 21:45 Comment(0)
S
0

class TaskRunner {
  tasks = [];
  isThreadFree = true;

  enqueue(task, cb) {
    this.tasks.push({
      task: task,
      callBack: cb,
    });
    this.checkIfTaskCanbeRun();
  }

  checkIfTaskCanbeRun() {
    if (this.isThreadFree && this.tasks.length != 0) {
      const taskToRun = this.tasks[0];
      this.tasks = this.tasks.splice(1);
      this.runTask(taskToRun);
    } else {
      console.warn("wait for thread to get free");
    }
  }

  runTask(taskToRun) {
    this.isThreadFree = false;
    const p = new Promise(taskToRun.task);

    p.then((data) => {
        taskToRun.callBack({
          data
        });
      })
      .catch((error) => {
        taskToRun.callBack({
          error
        });
      })
      .finally(() => {
        this.isThreadFree = true;
        this.checkIfTaskCanbeRun();
      });
  }
}
const runner = new TaskRunner();

const task1 = (pass) => {
  setTimeout(() => {
    pass("task 1 is successfully completed.");
  }, 1000);
};
const task2 = (pass, fail) => {
  setTimeout(() => {
    fail("task 2 is failed.");
  }, 3000);
};

const callback = ({
  data,
  error
}) => {
  console.log({
    data,
    error
  });
};

runner.enqueue(task1, callback);
runner.enqueue(task2, callback);
Serdab answered 17/4 at 10:25 Comment(1)
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Motile

© 2022 - 2024 — McMap. All rights reserved.