Kotlin coroutine future await with timeout (no can

2019-08-08 16:01发布

Given we have a CompletableFuture f, in kotlin suspendable scope we can call f.await() and we will suspend until its done.

I'm having trouble implementing a similar function with signature f.await(t) which must suspend for maximum t milliseconds or return sooner if future did complete within that duration (whichever happens first).

Here is what i tried.

/**
 * Suspend current method until future is done or specified duration expires,
 * whichever happens first without cancelling the future.
 * Returns true if its done, false otherwise.
 */
suspend fun <T> ListenableFuture<T>.await(duration: Long): Boolean {
   val future = this
   try {
      withTimeout(duration) {
         withContext(NonCancellable) { // this does not help either
            future.await() // i do not expect the future itself to be cancelled
         }
      }
   } catch (t: TimeoutCancellationException) {
      // we expected this
   } catch (e: Throwable) {
      e.printStackTrace()
   }

   return future.isDone

}

fun main(args: Array<String>) = runBlocking<Unit> {
   val future = GlobalScope.future {
      try {
         repeat(5) {
            println("computing")
            delay(500)
         }
         println("complete")
      } finally {
         withContext(NonCancellable) {
            println("cancelling")
            delay(500)
            println("cancelled")
         }
      }
   }

   for (i in 0..10) {
      if (future.await(2000)) {
         println("checking : done")
      } else {
         println("checking : not done")
      }
   }
}

I also need a similar function for a job. But maybe solution for this will also help me with that...

Output for this is

computing
computing
computing
computing
checking : done
checking : done
checking : done
checking : done
cancelling
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done

2条回答
一纸荒年 Trace。
2楼-- · 2019-08-08 16:38

Here is what i came up with, i believe this is not a good solution since, i'm most likely creating a lot of garbage for rather primitive task.


suspend fun <T> CompletableFuture<T>.await(duration: Millis): Boolean {
   val timeout = CompletableFuture<Unit>()

   GlobalScope.launch {
      delay(duration)
      timeout.complete(Unit)
   }

   val anyOfTwo = CompletableFuture.anyOf(this, timeout)
   anyOfTwo.await()
   return this.isDone
}


fun main() = runBlocking {
   val future = CompletableFuture<String>()

   GlobalScope.launch {
      delay(2000)
      println("setting the result (future now ${future.isDone})")
      future.complete("something")
   }

   while (future.isNotDone()) {
      println("waiting for the future to complete for the next 500ms")
      val isDone = future.await(500)

      if (isDone) {
         println("future is done")
         break
      } else {

         println("future not done")
      }
   }

   Unit
}

this will give output of

waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
setting the result (future now false)
future is done

which is what we wanted...

查看更多
三岁会撩人
3楼-- · 2019-08-08 16:39

I've written some test code:

fun main(args: Array<String>) = runBlocking {
    val future = calculateAsync()
    val result = future.await(2000)
    println("result=$result")
}

suspend fun <T> CompletableFuture<T>.await(duration: Long): T? {
    val future = this
    var result: T? = null
    try {
        withTimeout(duration) {
            result = future.await()
        }
    } catch (t: TimeoutCancellationException) {
        println("timeout exception")
    } catch (e: Throwable) {
        e.printStackTrace()
    }

    return result
}

@Throws(InterruptedException::class)
fun calculateAsync(): CompletableFuture<String> {
    val completableFuture = CompletableFuture<String>()

    Executors.newCachedThreadPool().submit {
        Thread.sleep(3000)
        println("after sleep")
        completableFuture.complete("Completed")
    }

    return completableFuture
}

After we run this code we will get an output:

timeout exception
result=null
after sleep

We see that our extension function await returns null because we set timeout to 2000 milliseconds but CompletableFuture completes after 3000 milliseconds. In this case CompletableFuture is cancelled(its isCancelled property returns true), but the Thread which we ran in calculateAsync function continues executing (we see it in the logs after sleep).

If we set timeout duration to 4000 milliseconds future.await(4000) in the main function, we will see next output:

after sleep
result=Completed

Now we have some result because CompletableFuture is executed faster than 4000 millisecond.

查看更多
登录 后发表回答