Transducers shouldn't really be a concern for a batching function – as a taker on the in
channel, it will see values transformed by any transducers on that channel, and any takers listening on out
will in turn see values transformed by that channel's transducer.
As for an implementation, the function below will take batches of max-count
items from in
, or however many arrive by max-time
since the last batch was output, and output them to out
, closing when the input channel closes, subject to the input channel's transducer (if any, and any takers listening on out
will also have that channel's transducer applied as noted above):
(defn batch [in out max-time max-count]
(let [lim-1 (dec max-count)]
(async/go-loop [buf [] t (async/timeout max-time)]
(let [[v p] (async/alts! [in t])]
(cond
(= p t)
(do
(async/>! out buf)
(recur [] (async/timeout max-time)))
(nil? v)
(if (seq buf)
(async/>! out buf))
(== (count buf) lim-1)
(do
(async/>! out (conj buf v))
(recur [] (async/timeout max-time)))
:else
(recur (conj buf v) t))))))
out
as a publisher). – Libra