How know with GPars that all threads have finished

2019-08-11 08:30发布

问题:

In the case of a thread throwing an exception, how can I wait until all threads that did not throw an exception have finished (so the user doesn't launch again until everything has stopped)?

I use GPars in several different ways, so I need a strategy for each (parallel collections, async closures, and fork/join). The exceptions are not getting buried, they are nicely handled via promises, getChildrenResults, etc., so that's not an issue (thanks to Vaclav Pech's answers). I just need to make sure that the main thread waits until anything that was still running gets to complete or is otherwise stopped.

For instance, when using parallel collections, some threads continue to run, while some never launch after the exception. So it's not easy to tell how many are out there to wait on, or to get a hold of them possibly.

My guess is maybe there's a way to work with the Thread pool (GParsPool in this case). Any suggestions?

Thanks!

回答1:

I believe I have a solution for the problem, I implemented it in the application after thorough testing and it works.

The withPool closure passes in the created pool (a jsr166y.ForkJoinPool) as the first argument. I can grab that and store it off in a variable (currentPool), to be used later by the main thread, like so:

    GParsPool.withPool { pool ->
        currentPool = pool

When an exception is thrown, and goes back up to the main thread for handling, I can make it wait until everything is finished, something like this:

    } catch (Exception exc) {
        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        println 'all done'
    }

The isQuiescent() seems to be a safe way to make sure there's no more work to be done.

Note that during testing, I also found that exceptions didn't seem to terminate execution of the loop as I originally thought. If I had a list of 500 and did an eachParallel, they all ran regardless if the 1st one through had an error. So I had to terminate the loop by using currentPool.shutdownNow() inside the parallel loop's exception handler. See also: GPars - proper way to terminate a parallel collection early

Here is a complete simplified representation of the actual solution:

void example() {
    jsr166y.ForkJoinPool currentPool

    AtomicInteger threadCounter = new AtomicInteger(0)
    AtomicInteger threadCounterEnd = new AtomicInteger(0)

    AtomicReference<Exception> realException = new AtomicReference<Exception>()

    try {
        GParsPool.withPool { pool ->
            currentPool = pool

            (1..500).eachParallel {
                try {
                    if (threadCounter.incrementAndGet() == 1) {
                        throw new RuntimeException('planet blew up!')
                    }

                    if (realException.get() != null) {
                        // We had an exception already in this eachParallel - quit early
                        return
                    }

                    // Do some long work
                    Integer counter=0
                    (1..1000000).each() {counter++}

                    // Flag if we went all the way through
                    threadCounterEnd.incrementAndGet()
                } catch (Exception exc) {
                    realException.compareAndSet(null, exc)

                    pool.shutdownNow()
                    throw realException
                }
            }
        }
    } catch (Exception exc) {
        // If we used pool.shutdownNow(), we need to look at the real exception.
        // This is needed because pool.shutdownNow() sometimes generates a CancellationException
        // which can cover up the real exception that caused us to do a shutdownNow().
        if (realException.get()) {
            exc = realException.get()
        }

        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        // Do further exception handling here...
        exc.printStackTrace()
    }
}

Going back to my earlier example, if I threw an exception on the 1st time through on a 4-core machine, there were about 5 threads queued up. The shutdownNow() would cut things off after around 20 or so threads had gotten through, so having the 'quit early' check near the top helped those 20 or so quit as soon as possible.

Just posting it here in case it helps somebody else, in return for all the help I've gotten here. Thanks!



回答2:

I believe you will need to catch the exception, and then return something other than the expected result (such as a String or null if you are expecting a number for instance), ie;

@Grab('org.codehaus.gpars:gpars:0.12')
import static groovyx.gpars.GParsPool.*

def results = withPool {
  [1,2,3].collectParallel {
    try {
      if( it % 2 == 0 ) {
        throw new RuntimeException( '2 fails' )
      }
      else {
        Thread.sleep( 2000 )
        it
      }
    }
    catch( e ) { e.class.name }
  }
}


标签: groovy gpars