I have 100 workers (agents) that share one ref
that contains collection of tasks. While this collection have tasks, each worker get one task from this collection (in dosync
block), print it and sometimes put it back in the collection (in dosync
block):
(defn have-tasks?
[tasks]
(not (empty? @tasks)))
(defn get-task
[tasks]
(dosync
(let [task (first @tasks)]
(alter tasks rest)
task)))
(defn put-task
[tasks task]
(dosync (alter tasks conj task))
nil)
(defn worker
[& {:keys [tasks]}]
(agent {:tasks tasks}))
(defn worker-loop
[{:keys [tasks] :as state}]
(while (have-tasks? tasks)
(let [task (get-task tasks)]
(println "Task: " task)
(when (< (rand) 0.1)
(put-task tasks task))))
state)
(defn create-workers
[count & options]
(->> (range 0 count)
(map (fn [_] (apply worker options)))
(into [])))
(defn start-workers
[workers]
(doseq [worker workers] (send-off worker worker-loop)))
(def tasks (ref (range 1 10000000)))
(def workers (create-workers 100 :tasks tasks))
(start-workers workers)
(apply await workers)
When i run this code, the last value printed by agents is (after several tries):
435445
,
4556294
,
1322061
,
3950017
.
But never 9999999
what I expect.
And every time the collection is really empty at the end.
What I'm doing wrong?
Edit:
I rewrote worker-loop as simple as possible:
(defn worker-loop
[{:keys [tasks] :as state}]
(loop []
(when-let [task (get-task tasks)]
(println "Task: " task)
(recur)))
state)
But problem is still there. This code behaves as expected when create one and only one worker.
println
thread safe? – Japanese(locking :out (println "..."))
to give legible output. – Where