async.queue with async/await style functions
Asked Answered
D

4

5

I'm trying to create a function that builds a queue from an array of objects and then processes each object by calling a number of functions.

The processing functions are asynchronous functions which, prior to needing to queue, I'd implemented using the async/await pattern. I think this is necessary as each relies on the output of the previous and I don't want to have a tonne of nested promise.then's

i.e. previously I had:

await Promise.all(messages.map(async(message) => {
    let activity = await activityController.getActivity(message.activityId);
    let url = await SMSController.getUrl(message.Token);
    let smsSendResult = await SMSController.sendSMS(messageString, activity.mobileNo);
    // etc...
}

Now what I want to be able to do is:

let queue = async.queue((message, done) => {
     let activity = await activityController.getActivity(message.activityId);
     let smsSendResult = await SMSController.sendSMS(messageString, activity.mobileNo);
    // etc...
}

messages.forEach((message) => {
    queue.push(message);
})

I have the problem though that this results in

SyntaxError: await is only valid in async function

And I can't seem to quite get my head around how to get past this.

Deary answered 9/2, 2018 at 11:42 Comment(1)
Try github.com/baryon/named-promise-taskTruthful
P
1

You're looking for async.series, not async.queue:

series(tasks, callbackopt)

Run the functions in the tasks collection in series, each one running once the previous function has completed.

So just following the docs:

const messageCallbacks = messages.map(function(msg) {
    return async function(callback) {callback(await handleMessage(msg));
});


async.series(messageCallbacks,
// optional callback
function(err, results) {
    // results is now equal to whatever handleMessage resolves to
});

Without async:

async function asyncMessageQueue(messages) {
    const results = [];
    for(var i=0,l=messages.length; i<l; ++i) {
        results.push(await handleMessage(messages[i]));
    }
    return results;
}

async function handleMessage(message) {
        let activity = await activityController.getActivity(message.activityId);
        let url = await SMSController.getUrl(message.Token);
        let smsSendResult = await SMSController.sendSMS(messageString, activity.mobileNo);
        // rest of the code
};

This also allows you to provide the next message with any previous results:, just change await handleMessage(messages[i]) to await handleMessage(messages[i], results) and then:

async function handleMessage(message, prevResults) {
        // rest of the code
};
Peasecod answered 9/2, 2018 at 11:47 Comment(2)
Thanks. I don't think this is quite doing what I need it to. I should have said, I'm using async.queue (caolan.github.io/async/docs.html) so that I can use a configurable concurrency and be notified when the queue is empty using queue.drain() Also the messages come from elsewhere (a database) and it's each one of those that needs to be put into the queue, rather thean the results of the handling.Deary
I updated the answer with async.series solution. I'm not sure what you mean by messages coming from database.Peasecod
D
1

I found the asyncify function in the async module which allows me to do this:

var queue = async.queue(async.asyncify(async (message, done) => {
    let url = await SMSController.getUrl(message.token);
    // etc...
}
Deary answered 9/2, 2018 at 13:55 Comment(0)
S
0

You just need to use the async keyword in the callback function:

var queue = async.queue(async (message, done) => {
    let url = await SMSController.getUrl(message.token);
    // etc...
})
Salamanca answered 19/7, 2023 at 20:41 Comment(0)
S
0

so recently I fall in this problem too, where I have a queue and I have a await process to inside the queue...

Solution ;

var queue = async.queue( (message, done) => {
    return new Promise(async(resolve, reject) => {
      const response = await function_name(); //await the function
      done(null, response); //call the callback function..
      //or
      resolve(done(null, response));
    });
})
Soneson answered 3/10, 2023 at 12:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.