Clojure core.async, CPU hangs after timeout. Anyway to properly kill macro thread produced by (go..) block?
Asked Answered
U

3

7

Based on core.async walk through example, I created below similar code to handle some CPU intensive jobs using multiple channels with a timeout of 10 seconds. However after the main thread returns, the CPU usage remains around 700% (8 CPUs machine). I have to manually run nrepl-close in emacs to shut down the Java process.

Is there any proper way to kill macro thread produced by (go..) block ? I tried close! each chan, but it doesn't work. I want to make sure CPU usage back to 0 by Java process after main thread returns.

(defn [] RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB (do...   (str ...)))


(let [n 1000
      cs (repeatedly n chan)]
  (doseq [c cs] 
    (go 
     (>! c  (RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB ))))

  (dotimes [i n]
    (let [[result source] (alts!!  (conj cs (timeout 10000))) ]  ;;wait for 10 seconds for each job
      (if  (list-contains? cs source)  ;;if returned chan belongs to cs 
        (prn "OK JOB FINISHED " result)
        (prn "JOB TIMEOUT")
        )))

 (doseq [i cs]
   (close! i))  ;;not useful for "killing" macro thread

 (prn "JOBS ARE DONE"))

;;Btw list-contains? function is used to judge whether an element is in a list
;;https://mcmap.net/q/127993/-test-whether-a-list-contains-a-specific-value-in-clojure
(defn list-contains? [coll value]
  (let [s (seq coll)]
    (if s
      (if (= (first s) value) true (recur (rest s) value))
      false)))
Umberto answered 4/9, 2013 at 11:3 Comment(5)
Since go blocks run in a fixed thread pool, don't use them for CPU intensive tasks. However you could check whether the channel is closed before calling the CPU intensive function (or even while computing). It's not that different than checking Thread/isInterrputed in regular thread-based clojure/javaDulla
You can terminate your go block in various ways. Either use an atom like (while @running ...) or have the block take from another channel on each iteration that you feed with data as long as you want the go block to run.Hardly
By the way, I'd use a set for the channels. It will let you get rid of the list-contains? function and allows you to simply (if (cs source) ...)Hardly
@cgrand, so far I've found it's working fine for CPU intensive tasks, maybe you mean it's not suitable for I/O blocking tasks? There are articles saying I/O blocking tasks in go blocks should be avoided martintrojer.github.io/clojure/2013/07/07/…Umberto
@KevinZhu by "intensive" I mean "really long". If you launch a number (greater than the threadpool size) of go blocks trying to compute decimals of pi (stupid example) then all subsequent go blocks will have to wait. It's the same problem as with blocking I/O only rarer.Dulla
U
2

In REPL there seems to be no clean way yet.

I first tried a very dirty way by using deprecated method Thread.stop

 (doseq [i @threadpool ]
              (.stop i))

It seemed worked as CPU usage dropped once the main thread returned to REPL, but if I run the program again in REPL, it'd just hang at the go block part!!

Then I googled around and found this blog and it says

One final thing to note: we don't explicitly do any work to shutdown the go routines. Go routines will automatically stop operation when the main function exits. Thus, go routines are like daemon threads in the JVM (well, except for the "thread" part ...)

So I tried again by making my project into a uberjar and run it on a command console, and it turned out that CPU usage would drop immediately when blinking cursor returns to the console!

Umberto answered 10/9, 2013 at 2:1 Comment(0)
U
1

Based on answer for another related question How to control number of threads in (go...), I've found a better way to properly kill all the threads started by (go...) block:

First alter the executor var and supply a custom thread pool

;; def, not defonce, so that the executor can be re-defined
;; Number of threads are fixed to be 4
(def my-executor
  (java.util.concurrent.Executors/newFixedThreadPool
   4
   (conc/counted-thread-factory "my-async-dispatch-%d" true)))

(alter-var-root #'clojure.core.async.impl.dispatch/executor
                (constantly (delay (tp/thread-pool-executor my-executor))))

Then call .shutdownNow and .awaitTermination method at the end of (go...) block

(.shutdownNow my-executor)
(while (not  (.awaitTermination  my-executor 10 java.util.concurrent.TimeUnit/SECONDS ) )
       (prn "...waiting 10 secs for executor pool to finish") )

[UPDATE] The shutdown executor method above seems not pure enough. The final solution for my case is to send a function with control of its own timeout into go block, using thunk-timeout function. Credits go to this post. Example below

(defn toSendToGo [args timeoutUnits]
  (let [result (atom nil)  
        timeout? (atom false)]
    (try
      ( thunk-timeout
        (fn []  (reset! result  (myFunction args))) timeoutUnits)
      (catch  java.util.concurrent.TimeoutException e  (do  (prn "!Time out after " timeoutUnits " seconds!!") (reset! timeout? true))     ))

    (if @timeout?  (do sth))
    @result))


(let [c ( chan)]
  (go (>! c (toSendToGo args timeoutUnits))))
Umberto answered 25/9, 2013 at 4:42 Comment(0)
K
1
(shutdown-agents)

Implementation-specific, JVM: both agents and channels use a global thread pool, and the termination function for agents iterates and closes all open threads in the VM. Empty the channels first: this action is immediate and non-reversible (especially if you are in a REPL).

Kenyatta answered 7/4, 2016 at 19:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.