由于我们有一个CompletableFuture f
,在科特林悬浮范围内,我们可以调用f.await()
我们将暂停,直到其完成。
我在实施与签名类似功能的麻烦f.await(t)
必须暂停最大t
毫秒或更早返回如果将来这样做时间内完成(无论哪个首先发生)。
下面是我试过了。
/**
* Suspend current method until future is done or specified duration expires,
* whichever happens first without cancelling the future.
* Returns true if its done, false otherwise.
*/
suspend fun <T> ListenableFuture<T>.await(duration: Long): Boolean {
val future = this
try {
withTimeout(duration) {
withContext(NonCancellable) { // this does not help either
future.await() // i do not expect the future itself to be cancelled
}
}
} catch (t: TimeoutCancellationException) {
// we expected this
} catch (e: Throwable) {
e.printStackTrace()
}
return future.isDone
}
fun main(args: Array<String>) = runBlocking<Unit> {
val future = GlobalScope.future {
try {
repeat(5) {
println("computing")
delay(500)
}
println("complete")
} finally {
withContext(NonCancellable) {
println("cancelling")
delay(500)
println("cancelled")
}
}
}
for (i in 0..10) {
if (future.await(2000)) {
println("checking : done")
} else {
println("checking : not done")
}
}
}
我也需要一份工作类似的功能。 但也许这个解决方案也将帮助我与...
该输出是
computing
computing
computing
computing
checking : done
checking : done
checking : done
checking : done
cancelling
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done
我已经写了一些测试代码:
fun main(args: Array<String>) = runBlocking {
val future = calculateAsync()
val result = future.await(2000)
println("result=$result")
}
suspend fun <T> CompletableFuture<T>.await(duration: Long): T? {
val future = this
var result: T? = null
try {
withTimeout(duration) {
result = future.await()
}
} catch (t: TimeoutCancellationException) {
println("timeout exception")
} catch (e: Throwable) {
e.printStackTrace()
}
return result
}
@Throws(InterruptedException::class)
fun calculateAsync(): CompletableFuture<String> {
val completableFuture = CompletableFuture<String>()
Executors.newCachedThreadPool().submit {
Thread.sleep(3000)
println("after sleep")
completableFuture.complete("Completed")
}
return completableFuture
}
当我们运行这段代码,我们将得到一个输出:
timeout exception
result=null
after sleep
我们看到,我们的扩展功能await
返回null
,因为我们设置超时时间为2000毫秒,但CompletableFuture
后3000毫秒完成。 在这种情况下CompletableFuture
被取消(其isCancelled
属性返回true
),但是这是我们在跑线程calculateAsync
功能继续执行(我们看到它在日志中after sleep
)。
如果我们设置超时时间4000毫秒future.await(4000)
的main
功能,我们将看下输出:
after sleep
result=Completed
现在我们有一些结果,因为CompletableFuture
执行4000毫秒快。
以下是我想出了,我相信这不是因为我最有可能创造相当原始的任务很多垃圾的好办法。
suspend fun <T> CompletableFuture<T>.await(duration: Millis): Boolean {
val timeout = CompletableFuture<Unit>()
GlobalScope.launch {
delay(duration)
timeout.complete(Unit)
}
val anyOfTwo = CompletableFuture.anyOf(this, timeout)
anyOfTwo.await()
return this.isDone
}
fun main() = runBlocking {
val future = CompletableFuture<String>()
GlobalScope.launch {
delay(2000)
println("setting the result (future now ${future.isDone})")
future.complete("something")
}
while (future.isNotDone()) {
println("waiting for the future to complete for the next 500ms")
val isDone = future.await(500)
if (isDone) {
println("future is done")
break
} else {
println("future not done")
}
}
Unit
}
这会给输出
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
setting the result (future now false)
future is done
这就是我们想要的...