Waiting for n channels with core.async
Asked Answered
R

2

9

In the same way alt! waits for one of n channels to get a value, I'm looking for the idiomatic way to wait for all n channels to get a value.

I need this because I "spawn" n go blocks to work on async tasks, and I want to know when they are all done. I'm sure there is a very beautiful way to achieve this.

Rexrexana answered 6/8, 2015 at 14:45 Comment(0)
T
5

You can say (mapv #(async/<!! %) channels).

If you wanted to handle individual values as they arrive, and then do something special after the final channel produces a value, you can use exploit the fact that alts! / alts!! take a vector of channels, and they are functions, not macros, so you can easily pass in dynamically constructed vectors.

So, you can use alts!! to wait on your initial collection of n channels, then use it again on the remaining channels etc.

(def c1 (async/chan))
(def c2 (async/chan))

(def out
  (async/thread
    (loop [cs [c1 c2] vs []]
      (let [[v p] (async/alts!! cs)
            cs (filterv #(not= p %) cs)
            vs (conj vs v)]
        (if (seq cs)
          (recur cs vs)
          vs)))))

(async/>!! c1 :foo)
(async/>!! c2 :bar)

(async/<!! out)
;= [:foo :bar]

If instead you wanted to take all values from all the input channels and then do something else when they all close, you'd want to use async/merge:

clojure.core.async/merge
([chs] [chs buf-or-n])
Takes a collection of source channels and returns a channel which contains all values taken from them. The returned channel will be unbuffered by default, or a buf-or-n can be supplied. The channel will close after all the source channels have closed.

Trilobite answered 6/8, 2015 at 15:9 Comment(2)
Thanks! I think merge is what I need.Rexrexana
Great! NB. I've just added in the simplest solution for waiting for a single value from each channel (as opposed to all values before all channels close), which is (mapv #(async/<!! %) channels) – somehow it went MIA in the initial version.Amalia
M
14

Use the core.async map function:

(<!! (a/map vector [ch1 ch2 ch3]))
;; [val-from-ch-1 val-from-ch2 val-from-ch3]
Maurita answered 6/8, 2015 at 15:17 Comment(2)
If wondering, vector is for returning a vector of the first value taken from every channel. I guess seq would do as well, right?Happening
The lambda to a/map is invoked with as many args as channels, so seq wouldn't work. Maybe you are looking for a/merge?Maurita
T
5

You can say (mapv #(async/<!! %) channels).

If you wanted to handle individual values as they arrive, and then do something special after the final channel produces a value, you can use exploit the fact that alts! / alts!! take a vector of channels, and they are functions, not macros, so you can easily pass in dynamically constructed vectors.

So, you can use alts!! to wait on your initial collection of n channels, then use it again on the remaining channels etc.

(def c1 (async/chan))
(def c2 (async/chan))

(def out
  (async/thread
    (loop [cs [c1 c2] vs []]
      (let [[v p] (async/alts!! cs)
            cs (filterv #(not= p %) cs)
            vs (conj vs v)]
        (if (seq cs)
          (recur cs vs)
          vs)))))

(async/>!! c1 :foo)
(async/>!! c2 :bar)

(async/<!! out)
;= [:foo :bar]

If instead you wanted to take all values from all the input channels and then do something else when they all close, you'd want to use async/merge:

clojure.core.async/merge
([chs] [chs buf-or-n])
Takes a collection of source channels and returns a channel which contains all values taken from them. The returned channel will be unbuffered by default, or a buf-or-n can be supplied. The channel will close after all the source channels have closed.

Trilobite answered 6/8, 2015 at 15:9 Comment(2)
Thanks! I think merge is what I need.Rexrexana
Great! NB. I've just added in the simplest solution for waiting for a single value from each channel (as opposed to all values before all channels close), which is (mapv #(async/<!! %) channels) – somehow it went MIA in the initial version.Amalia

© 2022 - 2024 — McMap. All rights reserved.