Clojure core.async, CPU hangs after timeout. Anywa

2019-04-25 11:30发布

Based on core.async walk through example, I created below similar code to handle some CPU intensive jobs using multiple channels with a timeout of 10 seconds. However after the main thread returns, the CPU usage remains around 700% (8 CPUs machine). I have to manually run nrepl-close in emacs to shut down the Java process.

Is there any proper way to kill macro thread produced by (go..) block ? I tried close! each chan, but it doesn't work. I want to make sure CPU usage back to 0 by Java process after main thread returns.

(defn [] RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB (do...   (str ...)))


(let [n 1000
      cs (repeatedly n chan)]
  (doseq [c cs] 
    (go 
     (>! c  (RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB ))))

  (dotimes [i n]
    (let [[result source] (alts!!  (conj cs (timeout 10000))) ]  ;;wait for 10 seconds for each job
      (if  (list-contains? cs source)  ;;if returned chan belongs to cs 
        (prn "OK JOB FINISHED " result)
        (prn "JOB TIMEOUT")
        )))

 (doseq [i cs]
   (close! i))  ;;not useful for "killing" macro thread

 (prn "JOBS ARE DONE"))

;;Btw list-contains? function is used to judge whether an element is in a list
;;http://stackoverflow.com/questions/3249334/test-whether-a-list-contains-a-specific-value-in-clojure
(defn list-contains? [coll value]
  (let [s (seq coll)]
    (if s
      (if (= (first s) value) true (recur (rest s) value))
      false)))

3条回答
【Aperson】
2楼-- · 2019-04-25 11:56

In REPL there seems to be no clean way yet.

I first tried a very dirty way by using deprecated method Thread.stop

 (doseq [i @threadpool ]
              (.stop i))

It seemed worked as CPU usage dropped once the main thread returned to REPL, but if I run the program again in REPL, it'd just hang at the go block part!!

Then I googled around and found this blog and it says

One final thing to note: we don't explicitly do any work to shutdown the go routines. Go routines will automatically stop operation when the main function exits. Thus, go routines are like daemon threads in the JVM (well, except for the "thread" part ...)

So I tried again by making my project into a uberjar and run it on a command console, and it turned out that CPU usage would drop immediately when blinking cursor returns to the console!

查看更多
对你真心纯属浪费
3楼-- · 2019-04-25 12:07
(shutdown-agents)

Implementation-specific, JVM: both agents and channels use a global thread pool, and the termination function for agents iterates and closes all open threads in the VM. Empty the channels first: this action is immediate and non-reversible (especially if you are in a REPL).

查看更多
Fickle 薄情
4楼-- · 2019-04-25 12:11

Based on answer for another related question How to control number of threads in (go...), I've found a better way to properly kill all the threads started by (go...) block:

First alter the executor var and supply a custom thread pool

;; def, not defonce, so that the executor can be re-defined
;; Number of threads are fixed to be 4
(def my-executor
  (java.util.concurrent.Executors/newFixedThreadPool
   4
   (conc/counted-thread-factory "my-async-dispatch-%d" true)))

(alter-var-root #'clojure.core.async.impl.dispatch/executor
                (constantly (delay (tp/thread-pool-executor my-executor))))

Then call .shutdownNow and .awaitTermination method at the end of (go...) block

(.shutdownNow my-executor)
(while (not  (.awaitTermination  my-executor 10 java.util.concurrent.TimeUnit/SECONDS ) )
       (prn "...waiting 10 secs for executor pool to finish") )

[UPDATE] The shutdown executor method above seems not pure enough. The final solution for my case is to send a function with control of its own timeout into go block, using thunk-timeout function. Credits go to this post. Example below

(defn toSendToGo [args timeoutUnits]
  (let [result (atom nil)  
        timeout? (atom false)]
    (try
      ( thunk-timeout
        (fn []  (reset! result  (myFunction args))) timeoutUnits)
      (catch  java.util.concurrent.TimeoutException e  (do  (prn "!Time out after " timeoutUnits " seconds!!") (reset! timeout? true))     ))

    (if @timeout?  (do sth))
    @result))


(let [c ( chan)]
  (go (>! c (toSendToGo args timeoutUnits))))
查看更多
登录 后发表回答