I wrote some core.async code in Clojure and when I ran it it consumed all available memory and failed with an error. It appears that using mapcat
in a core.async pipeline breaks back pressure. (Which is unfortunate for reasons beyond the scope of this question.)
Here is some code that demonstrates the problem by counting :x
s in and out of a mapcat
ing transducer:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
The producer races far ahead of the consumer.
It appears that I'm not the first person to discover this. But the explanation given here doesn't quite seem to cover it. (Although it does provide an adequate workaround.) Conceptually, I would expect the producer to be ahead, but only by the length of the few messages that might be buffered in the channels.
My question is, where are all the other messages? By the fourth line of output 7000 :x
s are unaccounted for.
takers
referenced here. Not so sure about it so let's wait for a more confident answer. – Want