GPars report status on large number of async funct

2019-07-21 10:58发布

问题:

I have a parser, and after gathering the data for a row, I want to fire an aync function and let it process the row, while the main thread continues on and gets the next row.

I've seen this post: How do I execute two tasks simultaneously and wait for the results in Groovy? but I'm not sure it is the best solution for my situation.

What I want to do is, after all the rows are read, wait for all the async functions to finish before I go on. One concern with using a collection of Promises is that the list could be large (100,000+).

Also, I want to report status as we go. And finally, I'm not sure I want to automatically wait for a timeout (like on a get()), because the file could be huge, however, I do want to allow the user to kill the process for various reasons.

So what I've done for now is record the number of rows parsed (as they occur via rowsRead), then use a callback from the Promise to record another row being finished processing, like this:

def promise = processRow(row)
promise.whenBound {
    rowsProcessed.incrementAndGet()
}

Where rowsProcessed is an AtomicInteger.

Then in the code invoked at the end of the sheet, after all parsing is done and I'm waiting for the processing to finish, I'm doing this:

boolean test = true
while (test) {
    Thread.sleep(1000)  // No need to pound the CPU with this check
    println "read: ${sheet.rowsRead}, processed: ${sheet.rowsProcessed.get()}"
    if (sheet.rowsProcessed.get() == sheet.rowsRead) {
        test = false
    }
}

The nice thing is, I don't have an explosion of Promise objects here - just a simple count to check. But I'm not sure sleeping every so often is as efficient as checking the get() on each Promise() object.

So, my questions are:

  1. If I used the collection of Promises instead, would a get() react and return if the thread executing the while loop above was interrupted with Thread.interrupt()?
  2. Would using the collection of Promises and calling get() on each be more efficient than trying to sleep and check every so often?
  3. Is there another, better approach that I haven't considered?

Thanks!

回答1:

  1. Call to allPromises*.get() will throw InterruptedException if the waiting (main) thread gets interrupted
  2. Yes, the promises have been created anyway, so grouping them in a list should not impose additional memory requirements, in my opinion.
  3. The suggested solutions with a CountDownLanch or a Phaser are IMO much more suitable than using busy waiting.


回答2:

An alternative to an AtomicInteger is to use a CountDownLatch. It avoids both the sleep and the large collection of Promise objects. You could use it like this:

latch = new CountDownLatch(sheet.rowsRead)
...
def promise = processRow(row)
promise.whenBound {
    latch.countDown()
}
...
while (!latch.await(1, TimeUnit.SECONDS)) {
    println "read: ${sheet.rowsRead}, processed: ${sheet.rowsRead - latch.count}"
}


标签: groovy gpars