Why does `core.async/pipeline` return a channel?
Asked Answered
M

2

5

I just noticed that the pipeline family returns a channel which seemingly operates completely independently of the purpose of the pipeline and it's related channels.

Notice in the following example you can >! / <! from pipes and a> / b> separately, and they're unrelated.

As far as I understand, pipelines should be a no-op, and return nil while setting up the sideffecting transduction from a> to b>.

So, what am I missing, and why does pipeline return a channel?

(def a> (chan))
(def b> (chan))
(def pipes (pipeline-blocking 4
                     b>
                     (map clojure.string/upper-case)
                     a>))
(go (>! pipes "hello world"))
(go (println "Pipes: " (<! pipes)))
(go (>! a> "apples are gooood"))
(go (println "B: " (<! b>)))
Megara answered 7/11, 2016 at 20:16 Comment(0)
H
8

You get back a channel which is closed when there are no more elements to copy. That is, after a> is closed and all elements from it have been made upper-case and placed onto b>. You can <! from the resulting channel to find out when the pipelining operation is done, if you care, or you can just throw away the channel. You probably shouldn't write to it.

This is a common pattern for a lot of async operations, and indeed often happens implicitly: every go block returns a channel which has the block's return value written to it when the block completes, and many async operations use a go block as their return value, so you automatically get this "job done" channel as a result.

Hifi answered 7/11, 2016 at 22:1 Comment(1)
That made sense and gave me enough to run an experiment, which I posted as an answer alongside yours. Please let me know if my explanation is off!Megara
M
3

To expound on @amalloy's answer, in the following example, a> and b> have a true put onto them when they are able to complete. Since chan> is unbuffered, they can't complete until another process pulls off of them, ie the println at the end.

If chan> were buffered, a> and b> can >! immediately, so the print immediately.

(def chan> (chan 4))
(def a> (go (>! chan> "Apple")))
(go (println "from a>: " (<! a>)))
(def b> (go (>! chan> "Ball")))
(go (println "from b>: " (<! b>)))

(go (println "from chan>: "(<! chan>)))
;; => from chan>: Apple
;; => from a>: true
(go (println "from chan>: "(<! chan>)))
;; => from chan>: Ball
;; => from b>: true

This is the same idea behind pipelines.


;; Pipeline-specific

(def a> (chan))
(def b> (chan))
(def p> (pipeline 4 b> (map clojure.string/upper-case) a>))

;; this won't happen until `a>` `close!`s
(go (println "pipeline is done: " (<! p>)))

;; execute the following 2 lines ad lib
(go (>! a> "hi there"))
(go (println "from b>: " (<! b>)))

(comment
  (close! a>) ; triggers the "pipeline is done"
  (close! b>)) ; doesn't trigger it, but `b>` now only returns nil when taking
Megara answered 7/11, 2016 at 22:37 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.