How do clojure core.async channels get cleaned up?

2020-06-16 04:00发布

问题:

I'm looking at Clojure core.async for the first time, and was going through this excellent presentation by Rich Hickey: http://www.infoq.com/presentations/clojure-core-async

I had a question about the example he shows at the end of his presentation:

According to Rich, this example basically tries to get a web, video, and image result for a specific query. It tries two different sources in parallel for each of those results, and just pulls out the fastest result for each. And the entire operation can take no more than 80ms, so if we can't get e.g. an image result in 80ms, we'll just give up. The 'fastest' function creates and returns a new channel, and starts two go processes racing to retrieve a result and put it on the channel. Then we just take the first result off of the 'fastest' channel and slap it onto the c channel.

My question: what happens to these three temporary, unnamed 'fastest' channels after we take their first result? Presumably there is still a go process which is parked trying to put the second result onto the channel, but no one is listening so it never actually completes. And since the channel is never bound to anything, it doesn't seem like we have any way of doing anything with it ever again. Will the go process & channel "realize" that no one cares about their results any more and clean themselves up? Or did we essentially just "leak" three channels / go processes in this code?

回答1:

There is no leak.

Parked gos are attached to channels on which they attempted to perform an operation and have no independent existence beyond that. If other code loses interest in the channels a certain go is parked on (NB. a go can simultaneously become a putter/taker on many channels if it parks on alt! / alts!), then eventually it'll be GC'd along with those channels.

The only caveat is that in order to be GC'd, gos actually have to park first. So any go that keeps doing stuff in a loop without ever parking (<! / >! / alt! / alts!) will in fact live forever. It's hard to write this sort of code by accident, though.



回答2:

Caveats and exceptions aside, you can test garbage collection on the JVM at the REPL.

eg:

(require '[clojure.core.async :as async])
=> nil

(def c (async/chan))
=> #'user/c
(def d (async/go-loop [] 
         (when-let [v (async/<! c)] 
           (println v) 
           (recur))))
=> #'user/d

(async/>!! c :hi)
=> true
:hi        ; core.async go block is working

(import java.lang.ref.WeakReference)
=> java.lang.ref.WeakReference    ; hold a reference without preventing garbage collection
(def e (WeakReference. c))
=> #'user/e
(def f (WeakReference. d))
=> #'user/f

(.get e)
=> #object[...]
(.get f)
=> #object[...]

(def c nil)
=> #'user/c
(def d nil)
=> #'user/d
(println "We need to clear *1, *2 and *3 in the REPL.")
We need to clear *1, *2 and *3 in the REPL.
=> nil
(println *1 *2 *3)
nil #'user/d #'user/c
=> nil
(System/gc)
=> nil
(.get e)
=> nil
(.get f)
=> nil

What just happened? I setup a go block and checked it was working. Then used a WeakReference to observe the communication channel (c) and the go block return channel (d). Then I removed all references to c and d (including *1, *2 and *3 created by my REPL), requested garbage collection, (and got lucky, the System.gc Javadoc does not make strong guarantees) and then observed that my weak references had been cleared.

In this case at least, once references to the channels involved had been removed, the channels were garbage collected (regardless of my failure to close them!)



回答3:

Assumedly a channel produced by fastest only returns the result of the fastest query method and then closes.

If a second result was produced, your assumption could hold that the fastest processeses are leaked. Their results are never consumed. If they relied on all their results to be consumed to terminate, they wouldn't terminate.

Notice that this could also happen if the channel t is selected in the alt! clause.

The usualy way to fix this would be to close the channel c in the last go block with close!. Puts made to a closed channel will then be dropped then and the producers can terminate.

The problem could also be solved in the implementation of fastest. The process created in fastest could itself make the put via alts! and timeout and terminate if the produced values are not consumed within a certain amount of time.

I guess Rich did not address the problem in the slide in favor of a less lengthy example.