Strange behavior of clojure ref
Asked Answered
T

3

7

I have 100 workers (agents) that share one ref that contains collection of tasks. While this collection have tasks, each worker get one task from this collection (in dosync block), print it and sometimes put it back in the collection (in dosync block):

(defn have-tasks?
  [tasks]
  (not (empty? @tasks)))

(defn get-task
  [tasks]
  (dosync
    (let [task (first @tasks)]
      (alter tasks rest)
      task)))

(defn put-task
  [tasks task]
  (dosync (alter tasks conj task))
  nil)

(defn worker
  [& {:keys [tasks]}]
  (agent {:tasks tasks}))

(defn worker-loop
  [{:keys [tasks] :as state}]
  (while (have-tasks? tasks)
    (let [task (get-task tasks)]
      (println "Task: " task)
      (when (< (rand) 0.1)
        (put-task tasks task))))
  state)

(defn create-workers
  [count & options]
  (->> (range 0 count)
       (map (fn [_] (apply worker options)))
       (into [])))

(defn start-workers
  [workers]
  (doseq [worker workers] (send-off worker worker-loop)))

(def tasks (ref (range 1 10000000)))

(def workers (create-workers 100 :tasks tasks))

(start-workers workers)
(apply await workers)

When i run this code, the last value printed by agents is (after several tries): 435445, 4556294, 1322061, 3950017. But never 9999999 what I expect. And every time the collection is really empty at the end. What I'm doing wrong?

Edit:

I rewrote worker-loop as simple as possible:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop []
    (when-let [task (get-task tasks)]
      (println "Task: " task)
      (recur)))
  state)

But problem is still there. This code behaves as expected when create one and only one worker.

Tref answered 20/8, 2016 at 13:55 Comment(2)
Is println thread safe?Japanese
@ShannonSeverance No. It needs to be used e.g. like (locking :out (println "...")) to give legible output.Where
G
4

The problem here has nothing to do with agents and barely anything to do with laziness. Here's a somewhat reduced version of the original code that still exhibits the problem:

(defn f [init]
  (let [state (ref init)
        task (fn []
               (loop [last-n nil]
                 (if-let [n (dosync
                              (let [n (first @state)]
                                (alter state rest)
                                n))]
                   (recur n)
                   (locking :out
                     (println "Last seen:" last-n)))))
        workers (->> (range 0 5)
                     (mapv (fn [_] (Thread. task))))]
    (doseq [w workers] (.start w))
    (doseq [w workers] (.join w))))

(defn r []
  (f (range 1 100000)))

(defn i [] (f (->> (iterate inc 1)
                   (take 100000))))

(defn t []
  (f (->> (range 1 100000)
          (take Integer/MAX_VALUE))))

Running this code shows that both i and t, both lazy, reliably work, whereas r reliably doesn't. The problem is in fact a concurrency bug in the class returned by the range call. Indeed, that bug is documented in this Clojure ticket and is fixed as of Clojure version 1.9.0-alpha11.

A quick summary of the bug in case the ticket is not accessible for some reason: in the internals of the rest call on the result of range, there was a small opportunity for a race condition: the "flag" that says "the next value has already been computed" was set before the actual value itself, which meant that a second thread could see that flag as true even though the "next value" is still nil. The call to alter would then fix that nil value on the ref. It's been fixed by swapping the two assignment lines.

In cases where the result of range was either forcibly realized in a single thread or wrapped in another lazy seq, that bug would not appear.

Gratulant answered 28/8, 2017 at 12:32 Comment(0)
T
3

I asked this question on the Clojure Google Group and it helped me to find the answer.

The problem is that I used a lazy sequence within the STM transaction.

When I replaced this code:

(def tasks (ref (range 1 10000000)))

by this:

(def tasks (ref (into [] (range 1 10000000))))

it worked as expected!

In my production code where the problem occurred, I used the Korma framework that also returns a lazy collection of tuples, as in my example.

Conclusion: Avoid the use of lazy data structures within the STM transaction.

Tref answered 25/8, 2016 at 13:52 Comment(4)
As you are using only one ref for your state: Have you tried using an atom instead of a ref? That seems to give the same result, and large (ca 10 x) reduction in runtime.Where
Yes, I thought about it, but I don't know how to write the coordinated atomic function "get-task" in this case. What function should I pass to the swap! function?Tref
Either (defn get-taks [tasks] (let [my-tasks @tasks] (if (compare-and-set! tasks my-tasks (rest my-tasks)) (first my-tasks) (recur tasks)))), or start tasks with a dummy 0 and do (defn get-tasks [tasks] (first (swap! tasks rest)))Where
Yes, both your variants works consistently and much faster! Thank you. However, a problem with lazy sequences is still remain. Be careful.Tref
W
1

When the last number in the range is reached, there a are still older numbers being held by the workers. Some of these will be returned to the queue, to be processed again.

In order to better see what is happening, you can change worker-loop to print the last task handled by each worker:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop [last-task nil]
    (if (have-tasks? tasks)
      (let [task (get-task tasks)]
        ;; (when (< (rand) 0.1)
        ;;   (put-task tasks task)
        (recur task))
      (when last-task
        (println "Last task:" last-task))))
  state)

This also shows the race condition in the code, where tasks seen by have-tasks? often is taken by others when get-task is called near the end of the processing of the tasks.

The race condition can be solved by removing have-tasks? and instead using the return value of nil from get-task as a signal that no more tasks are available (at the moment).

Updated:

As observed, this race conditions does not explain the problem.

Neither is the problem solved by removing a possible race condition in get-task like this:

(defn get-task [tasks]
  (dosync
   (first (alter tasks rest))))

However changing get-task to use an explicit lock seems to solve the problem:

 (defn get-task [tasks]  
   (locking :lock
     (dosync
       (let [task (first @tasks)]
         (alter tasks rest)
         task))))
Where answered 20/8, 2016 at 22:0 Comment(2)
I do not think this is the reason. I can comment out the (when (< (rand)... expression and not return any tasks back in the queue, and it still only processes a portion. Besides, it's only returning 10% of all tasks on average, and the last task numbers to print before stopping are sometimes not even half of the entire queue, so the theory does not really make sense. I looked at this today and am hopeful I can find an answer, or that someone can. It's a very good question.Antlion
Yes, you are right about the race condition in my code, thanks. I rewrote my code as simple as possible, but problem is still there.Tref

© 2022 - 2024 — McMap. All rights reserved.