How to create a channel from another with transducers?
Asked Answered
E

1

9

I want to create a channel of clojure.core.async from another one that just filters specific messages. Therefore I found a function called filter<.

=> (def c1 (chan))
=> (def c2 (filter< even? c1))
=> (put! c1 1)
=> (put! c1 2)
=> (<!! c2)
2

But the function and its friends are marked as deprecated:

Deprecated - this function will be removed. Use transducer instead

There are some ways to use channels with transducer like chan with the xform parameter. How can I build a new channel from an existing one using transducers?

Executor answered 8/7, 2015 at 7:29 Comment(0)
R
8

I did some research on this, found a couple of interesting articles (first and second), and then got something working using pipeline

(require '[clojure.core.async :as async :refer [chan <!! pipeline put!]])
(def c1 (chan))
(def c2 (chan))

(pipeline 4 c2 (filter even?) c1)

(put! c1 1)
(put! c1 2)
(<!! c2)
;;=> 2

The second article I linked makes this a bit cleaner with some helper functions around the pipeline function:

(defn ncpus []
  (.availableProcessors (Runtime/getRuntime)))

(defn parallelism []
  (+ (ncpus) 1))

(defn add-transducer
  [in xf]
  (let [out (chan (buffer 16))]
    (pipeline (parallelism) out xf in)
    out))

Then you can simply tie channels together with

(def c1 (chan))
(def c2 (add-transducer c1 (filter even?))

To complete the answer, as you found yourself you can use pipe in a similar fashion:

(defn pipe-trans
  [ci xf]
  (let [co (chan 1 xf)]
    (pipe ci co)
    co))
(def c1 (chan))
(def c2 (pipe-trans c1 (filter even?)))
Restless answered 8/7, 2015 at 8:58 Comment(4)
Inspired by you I created a facory method using pipe. (defn from-chan [ci xf] (let [co (chan 1 xf)] (pipe ci co) co))Executor
nice, glad I could help.Restless
i've added a version of your factory method to try and complete the answer in case others search for it.Restless
Noticed this: dev.clojure.org/jira/browse/ASYNC-153 Seems like pipe-trans could be simplified to (defn pipe-trans [ci xf] (pipe ci (chan 1 xf))) but it's undocumentedConsueloconsuetude

© 2022 - 2024 — McMap. All rights reserved.