How to implement the equivalent of Promise.all for my Task implementation?
Asked Answered
I

3

6

Here is my Task implementation (i.e. a sort of Promise but complying with the monad laws and cancelable). It works rock solid:

const Task = k =>
  ({runTask: (res, rej) => k(res, rej)});

const tAp = tf => tk =>
  Task((res, rej) => tf.runTask(f => tk.runTask(x => res(f(x)), rej), rej));

const tOf = x => Task((res, rej) => res(x));

const tMap = f => tk =>
  Task((res, rej) => tk.runTask(x => res(f(x)), rej));

const tChain = fm => mx =>
  Task((res, rej) => mx.runTask(x => fm(x).runTask(res, rej), rej));

const log = x => console.log(x);
const elog = e => console.error(e);

const fetchName = (id, cb) => {
  const r = setTimeout(id_ => {
    const m = new Map([[1, "Beau"], [2, "Dev"], [3, "Liz"]]);

    if (m.has(id_))
      return cb(null, m.get(id_));

    else
      return cb("unknown id", null);
  }, 0, id);

  return () => clearTimeout(r);
};

const fetchNameAsync = id =>
  Task((res, rej) =>
    fetchName(id, (err, data) =>
      err === null
        ? res(data)
        : rej(err)));

const a = tAp(tMap(x => y => x.length + y.length)
  (fetchNameAsync(1)))
    (fetchNameAsync(3));

const b = tAp(tMap(x => y => x.length + y.length)
  (fetchNameAsync(1)))
    (fetchNameAsync(5));

a.runTask(log, elog); // 7
b.runTask(log, elog); // Error: "unknown id"

However, I have no idea how to implement awaitAll, which should have the following traits:

  • it either resolves with an array of results of the individual Tasks
  • or it rejects immediately with the first error and cancels all other Tasks
  • it executes Tasks in "parallel"

const awaitAll = ms =>
  Task((res, rej) => ms.map(mx => mx.runTask(...?)));

Any hint is appreciated!

Impracticable answered 16/3, 2019 at 17:5 Comment(14)
"it executes Tasks in "parallel"" That isn't what Promise.all does. Promise.all doesn't execute anything. By the time you're using Promise.all, the operations in question have already been started. All Promise.all does is wait for them to complete, gathering up the results.Richelle
@T.J.Crowder A task is a runnable thingZymogenic
@T.J.Crowder Sorry, this behavior only applies to monadic Tasks, which in my case is a continuation monad augmented with an Either type. My statement was imprudent in connection with Promises.Impracticable
@bob - I think you'll struggle with awaitAll, at least with the current implementation. a Task as far as I can tell above isn't necessarily asynchronous. Some are, but not all. So if awaitAll's job is to start the tasks and run them in parallel, it has the problem that some tasks are synchronous. The only way I see out of that is to require that all tasks be asynchronous, in which case the implementation is fairly trivial: Loop through the tasks starting them and hooking their completion, then fail if any of them fails, or filling in the task's slot in the result array if it works.Richelle
@T.J.Crowder Tasks are functions that take a (binary) continuation and the monad describes how to compose such functions. So the Task type deals with functions that take continuations (callbacks) and I guess it doesn't really matter if such functions are asynchronous or not. Task just abstracts from the callback pattern.Impracticable
It does matter whether they're synchronous or not though. If they're synchronous they can potentially throw from the same stack that awaitAll is on, in which case you might have to implement redundant error handling.Mescal
And if they're synchronous, map won't complete until the task has finished, holding up other tasks. (Mind you: Unless they're going to be offloaded to a worker thread, they're going to hold up other tasks at some point.)Richelle
@PatrickRoberts If you mix pure and impure computations, the behavior is undefined.Impracticable
@T.J.Crowder That's right. So you should only create asynchronous Tasks, if you need non-blocking behavior.Impracticable
@bob - Okay, cool. If the expectation is there. :-)Richelle
Is cancellation really necessary? You have not implemented it properly for tAp and tChain, so why do it for awaitAll?Zymogenic
@Bergi: No, it is not necessary. I explored the issue a bit and think that what I am looking for is the trait of applicatives to be able to run effects in parallel. Then awaitAll could be just a rather trivial applicative computation. However, tAp doesn't run its effects in parallel. Maybe the reason is that it takes its applicative arguments sequentially. But that is what applicative functors do, right?Impracticable
@bob Yes, you can implement Applicative in that way (and afaik, that's what most algebraic promise implementations do), it's easy to run them in parallel not sequentially. It fits the type just fine. However this would mean that your Monad and Applicative instances don't agree which is potentially confusing.Zymogenic
@Zymogenic Thank you, that's enlightening and actually the answer to my not-yet-asked follow-up question :DImpracticable
G
2

Here's another way that takes inspiration from the other answers here as well as the linked folktale/task. Instead of implementing a complicated tAll which takes care of iterating a list of tasks and combining tasks, we'll separate the concerns into individual functions.

Here's a simplified tAnd -

const tAnd = (t1, t2) =>
{ const acc = []

  const guard = (res, i) => x =>
    ( acc[i] = x
    , acc[0] !== undefined && acc[1] !== undefined
        ? res (acc)
        : void 0
    )

  return Task
    ( (res, rej) =>
        ( t1 .runTask (guard (res, 0), rej) // rej could be called twice!
        , t2 .runTask (guard (res, 1), rej) // we'll fix this below
        )
    )
}

It works like this -

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b' ]

Now tAll is a breeze to implement -

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tAnd (t, tAll (...ts))

Wups, don't forget to flatten along the way -

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll(...ts))
        )

It works like this -

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll properly handles errors as well -

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// test passed

Getting tAnd right is surprisingly difficult, even though we've limited the scope of our program, when compared to our original tAll. The combined task should only resolve once, or reject once - not both. This means double resolve/reject should also be avoided. Enforcing these constraints requires a bit more code -

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

Expand the snippet below to verify the result in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false
  
  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b' ]
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

Serial processing

The trickiest bit is in the parallel processing requirement. If the requirements asked for a serial behavior, the implementation is dramatically easier -

const tAnd = (t1, t2) =>
  Task
    ( (res, rej) =>
        t1 .runTask
          ( a =>
              t2 .runTask
                ( b =>
                    res ([ a, b ])
                , rej
                )
          , rej
          )
    )

Implementation for tAll stays the same, of course. Note the difference in delays now as the tasks are now run sequentially -

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2.5 seconds later
// [ 'a', 'b' ]

And many tasks with tAll -

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~ 9 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAnd = (t1, t2) =>
  Task
    ( (res, rej) =>
        t1 .runTask
          ( a =>
              t2 .runTask
                ( b =>
                    res ([ a, b ])
                , rej
                )
          , rej
          )
    )

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2.5 seconds later
// [ 'a', 'b' ]

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~ 9 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

How to implement tOr and tRace

For sake of completeness, here's tOr. Note tOr here is equivalent to folktale's Task.concat -

const tOr = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const guard = (res, rej) =>
    [ x =>
        resolved || rejected
          ? void 0
          : ( resolved = true
            , res (x)
            )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej))
        , t2 .runTask (...guard (res, rej))
        )
    )
}

Which resolves or rejects the first-to-complete of two tasks -

tOr
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~500 ms later
// 'b' 

And tRace -

const tRace = (t = tOf (undefined), ...ts) =>
  ts .reduce (tOr, t)

Which resolves or rejects the first-to-complete of many tasks -

tRace
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~300 ms later
// 'f'

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (a, b) => k (a, b) })

const tOr = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const guard = (res, rej) =>
    [ x =>
        resolved || rejected
          ? void 0
          : ( resolved = true
            , res (x)
            )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej))
        , t2 .runTask (...guard (res, rej))
        )
    )
}

const tRace = (t = tOf (undefined), ...ts) =>
  ts. reduce (tOr, t)

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tOr
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~500 ms later
// 'b' 

tRace
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~300 ms later
// note `f` appears in the output first because this tRace demo finishes before the tOr demo above
// 'f'

tRace
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

How to implement tAp

In the comments, we're talking about applicative, tAp. I think tAll makes the implementation rather easy -

const tAp = (f, ...ts) =>
  tMap
    ( ([ f, ...xs ]) => f (...xs)
    , tAll (f, ...ts)
    )

tAp accepts a task-wrapped function and any number of task-wrapped values, and returns a new task -

const sum = (v, ...vs) =>
  vs.length === 0
    ? v
    : v + sum (...vs)

tAp
  ( delay (2000, sum)
  , delay (500, 1)
  , delay (900, 2)
  , delay (1500, 3)
  , delay (1800, 4)
  , delay (300, 5)
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// 15

Unless the tasks have a side effect, I cannot see a reason why a "parallel" implementation of tAp breaks the applicative laws.

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAp = (f, ...ts) =>
  tMap
    ( ([ f, ...xs ]) => f (...xs)
    , tAll (f, ...ts)
    )

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false
  
  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

const sum = (v, ...vs) =>
  vs.length === 0
    ? v
    : v + sum (...vs)

tAp
  ( delay (2000, sum)
  , delay (500, 1)
  , delay (900, 2)
  , delay (1500, 3)
  , delay (1800, 4)
  , delay (300, 5)
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// 15
Glace answered 16/3, 2019 at 21:9 Comment(19)
Let us continue this discussion in chat.Glace
Oh. If one of the tasks rejects, tAll does not resolve, which makes my testcase irrelevant. With two rejecting tasks however the tAll task will reject twiceMacymad
I think tConcat should be renamed to tAnd, because it isn't the binary operation of a monoid. I also wonder if we can generalize tConcat by expressing it with tAp. I've once read that applicative computations run in parallel. I've never understand what that statement means - maybe this is the right usecase.Impracticable
When we have tAnd there is probably also a tOr, which forms a monoid by waiting for the first async computation to finish and discarding the other. tEmpty would thus be the never ending Task, that is to say the Task that never calls it continuation. So Promise.race is actually quite useful as soon as you add empty = () => new Promise((res, rej) => undefined) to the Promise prototype.Impracticable
As usual, a great answer, btw.Impracticable
"we decide what concat means for our module" - it's worth mentioning though that the expected type is concat :: Task<A> -> Task<A> -> Task<A>, which your implementation doesn't follow. You should use a different name for this helper function.Zymogenic
@bob and Bergi thanks for catching these fine details. tAnd is a much more suitable name. I've updated the answer.Glace
I also made some additional updates to enforce only single resolve or reject. @Jonas, this update includes the new idea I had for guard which generates a pair of functions.Glace
@bob and a final update to include tOr and tRace. It's been a fun afternoon :DGlace
Clearly, this answer deserves more upvotes. You might also be interest in the (<*>) = ap monad law. We can break it by letting tAp run the effects in parallel. But the different applicative/monadic behavior might confuse people.Impracticable
Maybe I misunderstand ap in the context of Task. If the output of deterministic, why does it matter if the tasks are processed serially or in parallel? Ie, why does task processing have to be serial in order to uphold the law?Glace
@bob I added tAp, implemented using tAllGlace
@user633183 I am not an expert on this topic either. I guess it is more of a guidline rather than a law and there are no mathematical reasons for it.Impracticable
@user633183 Maybe I misunderstand ap in the context of Task. - If you're still interested in this detail, here is an explanationImpracticable
I've used this implementation in practice for a while now and it works very predictable and smooth. However, I moved the parallel applicative in a separate type Parallel, which also implements the monoid typeclass (append=tRace and the never settling Parallel as empty element). It is remarkable how little code is needed to handle asynchronous stuff monadically.Impracticable
Another nitpick that might be helpful for other users: Since the given Task implementation doesn't implement monoid, you cannot pass an empty array to tRace, i.e. tRace is only a partial function.Impracticable
@bob I appreciate you sharing your continued experience. I will update the answer when I’m back at my computer. Maybe default t = tAlways([]) then ts.reduce(tAnd, t)?Glace
I figured tEmpty = Task((res, rej) => null);Impracticable
@bob, wups I was thinking about tAll which is already total. For tRace, I think tOf(null) or tOf(undefined) is fine. I updated the answer with a fixed tRace.Glace
M
2

Another solution that uses recursion with a 2 Task base case, which then allows to just manage the state in two variables:

  const tAll = ([first, second, ...rest]) =>
   !second
     ? first
     : rest.length 
        ? tMap(
            results => results.flat()
          )(tAll([ tAll([first, second]), tAll(rest) ]))
        : Task((res, rej, a, b, done) => (
            first.runTask(
               value => !done && b ? (res([value, b.value]), done = true) : (a = { value }),
               err => !done && (rej(err), done = true)
            ),
            second.runTask(
               value => !done && a ? (res([a.value, value]), done = true) : (b = { value }),
              err => !done && (rej(err), done = true)
            ) 
         ));
Macymad answered 16/3, 2019 at 19:16 Comment(9)
Intriguing! However, this produces 'a' instead of the expected [ 'a', 'b', 'c' ] when the same input from my answer is used. I'll wait for a fix before attempting to understand the sorcery you've conjured here. Maybe include functioning snippet?Glace
@user633183 yup, was still a work in progress, here it is (stolen your testcase :))Macymad
@JonasWilms I actually used tAll as a name myself but changed it for this question, funny!Impracticable
@bob And I just took it over from user633183 (naomik was way easier to write) ;)Macymad
@JonasWilms, I'm always delighted to collaborate with you. This is a creative and insightful procedure but somewhat complex. I'm going to play with this idea to see if it can be further simplified.Glace
@user633183 (naomik) me too :)Macymad
@JonasWilms This seems similar to folktale's Task.Impracticable
@bob oh right, it is ... (although folktale's is not really functional itself though)Macymad
@JonasWilms I untangled the wires a bit more in my second answer. This has been a fun program to work on ^^Glace
G
2

Here's another way that takes inspiration from the other answers here as well as the linked folktale/task. Instead of implementing a complicated tAll which takes care of iterating a list of tasks and combining tasks, we'll separate the concerns into individual functions.

Here's a simplified tAnd -

const tAnd = (t1, t2) =>
{ const acc = []

  const guard = (res, i) => x =>
    ( acc[i] = x
    , acc[0] !== undefined && acc[1] !== undefined
        ? res (acc)
        : void 0
    )

  return Task
    ( (res, rej) =>
        ( t1 .runTask (guard (res, 0), rej) // rej could be called twice!
        , t2 .runTask (guard (res, 1), rej) // we'll fix this below
        )
    )
}

It works like this -

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b' ]

Now tAll is a breeze to implement -

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tAnd (t, tAll (...ts))

Wups, don't forget to flatten along the way -

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll(...ts))
        )

It works like this -

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll properly handles errors as well -

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// test passed

Getting tAnd right is surprisingly difficult, even though we've limited the scope of our program, when compared to our original tAll. The combined task should only resolve once, or reject once - not both. This means double resolve/reject should also be avoided. Enforcing these constraints requires a bit more code -

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

Expand the snippet below to verify the result in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false
  
  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b' ]
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

Serial processing

The trickiest bit is in the parallel processing requirement. If the requirements asked for a serial behavior, the implementation is dramatically easier -

const tAnd = (t1, t2) =>
  Task
    ( (res, rej) =>
        t1 .runTask
          ( a =>
              t2 .runTask
                ( b =>
                    res ([ a, b ])
                , rej
                )
          , rej
          )
    )

Implementation for tAll stays the same, of course. Note the difference in delays now as the tasks are now run sequentially -

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2.5 seconds later
// [ 'a', 'b' ]

And many tasks with tAll -

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~ 9 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAnd = (t1, t2) =>
  Task
    ( (res, rej) =>
        t1 .runTask
          ( a =>
              t2 .runTask
                ( b =>
                    res ([ a, b ])
                , rej
                )
          , rej
          )
    )

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2.5 seconds later
// [ 'a', 'b' ]

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~ 9 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

How to implement tOr and tRace

For sake of completeness, here's tOr. Note tOr here is equivalent to folktale's Task.concat -

const tOr = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const guard = (res, rej) =>
    [ x =>
        resolved || rejected
          ? void 0
          : ( resolved = true
            , res (x)
            )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej))
        , t2 .runTask (...guard (res, rej))
        )
    )
}

Which resolves or rejects the first-to-complete of two tasks -

tOr
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~500 ms later
// 'b' 

And tRace -

const tRace = (t = tOf (undefined), ...ts) =>
  ts .reduce (tOr, t)

Which resolves or rejects the first-to-complete of many tasks -

tRace
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~300 ms later
// 'f'

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (a, b) => k (a, b) })

const tOr = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const guard = (res, rej) =>
    [ x =>
        resolved || rejected
          ? void 0
          : ( resolved = true
            , res (x)
            )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej))
        , t2 .runTask (...guard (res, rej))
        )
    )
}

const tRace = (t = tOf (undefined), ...ts) =>
  ts. reduce (tOr, t)

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tOr
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~500 ms later
// 'b' 

tRace
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~300 ms later
// note `f` appears in the output first because this tRace demo finishes before the tOr demo above
// 'f'

tRace
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

How to implement tAp

In the comments, we're talking about applicative, tAp. I think tAll makes the implementation rather easy -

const tAp = (f, ...ts) =>
  tMap
    ( ([ f, ...xs ]) => f (...xs)
    , tAll (f, ...ts)
    )

tAp accepts a task-wrapped function and any number of task-wrapped values, and returns a new task -

const sum = (v, ...vs) =>
  vs.length === 0
    ? v
    : v + sum (...vs)

tAp
  ( delay (2000, sum)
  , delay (500, 1)
  , delay (900, 2)
  , delay (1500, 3)
  , delay (1800, 4)
  , delay (300, 5)
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// 15

Unless the tasks have a side effect, I cannot see a reason why a "parallel" implementation of tAp breaks the applicative laws.

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAp = (f, ...ts) =>
  tMap
    ( ([ f, ...xs ]) => f (...xs)
    , tAll (f, ...ts)
    )

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false
  
  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

const sum = (v, ...vs) =>
  vs.length === 0
    ? v
    : v + sum (...vs)

tAp
  ( delay (2000, sum)
  , delay (500, 1)
  , delay (900, 2)
  , delay (1500, 3)
  , delay (1800, 4)
  , delay (300, 5)
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// 15
Glace answered 16/3, 2019 at 21:9 Comment(19)
Let us continue this discussion in chat.Glace
Oh. If one of the tasks rejects, tAll does not resolve, which makes my testcase irrelevant. With two rejecting tasks however the tAll task will reject twiceMacymad
I think tConcat should be renamed to tAnd, because it isn't the binary operation of a monoid. I also wonder if we can generalize tConcat by expressing it with tAp. I've once read that applicative computations run in parallel. I've never understand what that statement means - maybe this is the right usecase.Impracticable
When we have tAnd there is probably also a tOr, which forms a monoid by waiting for the first async computation to finish and discarding the other. tEmpty would thus be the never ending Task, that is to say the Task that never calls it continuation. So Promise.race is actually quite useful as soon as you add empty = () => new Promise((res, rej) => undefined) to the Promise prototype.Impracticable
As usual, a great answer, btw.Impracticable
"we decide what concat means for our module" - it's worth mentioning though that the expected type is concat :: Task<A> -> Task<A> -> Task<A>, which your implementation doesn't follow. You should use a different name for this helper function.Zymogenic
@bob and Bergi thanks for catching these fine details. tAnd is a much more suitable name. I've updated the answer.Glace
I also made some additional updates to enforce only single resolve or reject. @Jonas, this update includes the new idea I had for guard which generates a pair of functions.Glace
@bob and a final update to include tOr and tRace. It's been a fun afternoon :DGlace
Clearly, this answer deserves more upvotes. You might also be interest in the (<*>) = ap monad law. We can break it by letting tAp run the effects in parallel. But the different applicative/monadic behavior might confuse people.Impracticable
Maybe I misunderstand ap in the context of Task. If the output of deterministic, why does it matter if the tasks are processed serially or in parallel? Ie, why does task processing have to be serial in order to uphold the law?Glace
@bob I added tAp, implemented using tAllGlace
@user633183 I am not an expert on this topic either. I guess it is more of a guidline rather than a law and there are no mathematical reasons for it.Impracticable
@user633183 Maybe I misunderstand ap in the context of Task. - If you're still interested in this detail, here is an explanationImpracticable
I've used this implementation in practice for a while now and it works very predictable and smooth. However, I moved the parallel applicative in a separate type Parallel, which also implements the monoid typeclass (append=tRace and the never settling Parallel as empty element). It is remarkable how little code is needed to handle asynchronous stuff monadically.Impracticable
Another nitpick that might be helpful for other users: Since the given Task implementation doesn't implement monoid, you cannot pass an empty array to tRace, i.e. tRace is only a partial function.Impracticable
@bob I appreciate you sharing your continued experience. I will update the answer when I’m back at my computer. Maybe default t = tAlways([]) then ts.reduce(tAnd, t)?Glace
I figured tEmpty = Task((res, rej) => null);Impracticable
@bob, wups I was thinking about tAll which is already total. For tRace, I think tOf(null) or tOf(undefined) is fine. I updated the answer with a fixed tRace.Glace
G
1

Here's one possible way to do it using a counter and a loop wrapped inside another Task. A counter is used because the tasks could complete in any order and it's otherwise difficult to know when the outer Task can finally resolve -

const assign = (o = {}, [ k, v ]) =>
  Object .assign (o, { [k]: v })

const tAll = (ts = []) =>
{ let resolved = 0
  const acc = []
  const run = (res, rej) =>
  { for (const [ i, t ] of ts .entries ())
      t .runTask
        ( x =>
            ++resolved === ts.length
              ? res (assign (acc, [ i, x ]))
              : assign (acc, [ i, x ])
        , rej
        )
  }
  return Task (run)
}

We write a simple delay function to test it -

const delay = (ms, x) =>
  Task ((res, _) => setTimeout (res, ms, x))

const tasks =
  [ delay (200, 'a')
  , delay (300, 'b')
  , delay (100, 'c')
  ]

tAll (tasks) .runTask (console.log, console.error)
// ~300 ms later
// => [ 'a', 'b', 'c' ]

In the event any task fails, the outer task is rejected -

const tasks =
  [ delay (200, 'a')
  , delay (300, 'b')
  , Task ((_, rej) => rej (Error('bad')))
  ]

tAll (tasks) .runTask (console.log, console.error)
// => Error: bad

Expand the snippet below to verify the results in your own browser -

const assign = (o = {}, [ k, v ]) =>
  Object .assign (o, { [k]: v })

const Task = k =>
  ({runTask: (res, rej) => k(res, rej)});

const tAll = (ts = []) =>
{ let resolved = 0
  const acc = []
  const run = (res, rej) =>
  { for (const [ i, t ] of ts .entries ())
      t .runTask
        ( x =>
            ++resolved === ts.length
              ? res (assign (acc, [ i, x ]))
              : assign (acc, [ i, x ])
        , rej
        )
  }
  return Task (run)
}

const delay = (ms, x) =>
  Task ((res, _) => setTimeout (res, ms, x))

const tasks =
  [ delay (200, 'a')
  , delay (300, 'b')
  , delay (100, 'c')
  ]

tAll (tasks) .runTask (console.log, console.error)
// ~300 ms later
// => [ 'a', 'b', 'c' ]

Here's an alternative implementation of tAll which trades for for forEach and removes one more imperative-style block, { ... } -

const tAll = (ts = []) =>
{ let resolved = 0
  const acc = []
  const run = (res, rej) => (t, i) =>
    t .runTask
      ( x =>
          ++resolved === ts.length
            ? res (assign (acc, [ i, x ]))
            : assign (acc, [ i, x ])
      , rej
      )
  return Task ((res, rej) => ts .forEach (run (res, rej)))
}
Glace answered 16/3, 2019 at 18:1 Comment(3)
@bob yvw. It's a tricky problem and unfortunately I can't see a solution that doesn't involve some imperative style. I added a variation at the end of the answer that you might like more. If you come up with some else, please share ^_^Glace
"some imperative style" ... well the variables could be turned into parameters, resolved can be omitted, ...Macymad
This won't work like Promise.all for an empty array though, my answer doesn't eitherMacymad

© 2022 - 2024 — McMap. All rights reserved.