Kotlin协程未来等待超时(无取消)

Kotlin协程未来等待超时(无取消),kotlin,kotlinx.coroutines,Kotlin,Kotlinx.coroutines,鉴于我们有一个CompletableFuture f,在kotlin suspendable作用域中,我们可以调用f.await(),我们将暂停,直到它完成 我在实现签名为f.await(t)的类似函数时遇到问题,该函数必须暂停最长t毫秒,如果future在该持续时间内完成(以先发生的为准),则必须提前返回 这是我试过的 /** * Suspend current method until future is done or specified duration expires, * whi

鉴于我们有一个
CompletableFuture f
,在kotlin suspendable作用域中,我们可以调用
f.await()
,我们将暂停,直到它完成

我在实现签名为f.await(t)的类似函数时遇到问题,该函数必须暂停最长
t
毫秒,如果future在该持续时间内完成(以先发生的为准),则必须提前返回

这是我试过的

/**
 * Suspend current method until future is done or specified duration expires,
 * whichever happens first without cancelling the future.
 * Returns true if its done, false otherwise.
 */
suspend fun <T> ListenableFuture<T>.await(duration: Long): Boolean {
   val future = this
   try {
      withTimeout(duration) {
         withContext(NonCancellable) { // this does not help either
            future.await() // i do not expect the future itself to be cancelled
         }
      }
   } catch (t: TimeoutCancellationException) {
      // we expected this
   } catch (e: Throwable) {
      e.printStackTrace()
   }

   return future.isDone

}

fun main(args: Array<String>) = runBlocking<Unit> {
   val future = GlobalScope.future {
      try {
         repeat(5) {
            println("computing")
            delay(500)
         }
         println("complete")
      } finally {
         withContext(NonCancellable) {
            println("cancelling")
            delay(500)
            println("cancelled")
         }
      }
   }

   for (i in 0..10) {
      if (future.await(2000)) {
         println("checking : done")
      } else {
         println("checking : not done")
      }
   }
}

我已经编写了一些测试代码:

fun main(args: Array<String>) = runBlocking {
    val future = calculateAsync()
    val result = future.await(2000)
    println("result=$result")
}

suspend fun <T> CompletableFuture<T>.await(duration: Long): T? {
    val future = this
    var result: T? = null
    try {
        withTimeout(duration) {
            result = future.await()
        }
    } catch (t: TimeoutCancellationException) {
        println("timeout exception")
    } catch (e: Throwable) {
        e.printStackTrace()
    }

    return result
}

@Throws(InterruptedException::class)
fun calculateAsync(): CompletableFuture<String> {
    val completableFuture = CompletableFuture<String>()

    Executors.newCachedThreadPool().submit {
        Thread.sleep(3000)
        println("after sleep")
        completableFuture.complete("Completed")
    }

    return completableFuture
}
我们看到扩展函数
await
返回
null
,因为我们将超时设置为2000毫秒,但
CompletableFuture
在3000毫秒后完成。在这种情况下,
CompletableFuture
被取消(其
isCancelled
属性返回
true
),但我们在
calculateAsync
函数中运行的线程继续执行(我们在睡眠后的日志
中看到它)

如果我们在
main
函数中将超时持续时间设置为4000毫秒
future.wait(4000)
,我们将看到下一个输出:

after sleep
result=Completed

现在我们有了一些结果,因为
CompletableFuture
的执行速度超过了4000毫秒。

这是我想到的,我认为这不是一个好的解决方案,因为我很可能会为相当原始的任务创建大量垃圾


suspend fun <T> CompletableFuture<T>.await(duration: Millis): Boolean {
   val timeout = CompletableFuture<Unit>()

   GlobalScope.launch {
      delay(duration)
      timeout.complete(Unit)
   }

   val anyOfTwo = CompletableFuture.anyOf(this, timeout)
   anyOfTwo.await()
   return this.isDone
}


fun main() = runBlocking {
   val future = CompletableFuture<String>()

   GlobalScope.launch {
      delay(2000)
      println("setting the result (future now ${future.isDone})")
      future.complete("something")
   }

   while (future.isNotDone()) {
      println("waiting for the future to complete for the next 500ms")
      val isDone = future.await(500)

      if (isDone) {
         println("future is done")
         break
      } else {

         println("future not done")
      }
   }

   Unit
}

这正是我们想要的…

它正在被取消,completable future与运行您提交的代码的线程没有任何关系,在第一次超时后,future对象为One。。。检查您自己,或者在completable future的cancel方法中设置调试断点。。。CF不会以任何方式中断或终止底层线程…或者您可以在第一次提前超时后简单地检查future.isDone()。。。这将是真的,我对我的答案做了一些修改。
CompletableFuture
中还有
isCancelled
属性。如果执行超过超时,它将返回
true
,否则返回
false
。Sergey我感谢您的努力,但这不是目标,我们希望未来不被触及,我们只是需要一种偶尔检查某件事是否完成的方法(无论是未来还是工作),是的,我们可以在循环中使用延迟,但在这种情况下,如果将来完成,我们仍然会失去延迟时间。。。请看我下面的解决方案,它的工作不涉及未来,完全符合我们的要求,但我不确定该解决方案在性能方面有多好。。。
after sleep
result=Completed

suspend fun <T> CompletableFuture<T>.await(duration: Millis): Boolean {
   val timeout = CompletableFuture<Unit>()

   GlobalScope.launch {
      delay(duration)
      timeout.complete(Unit)
   }

   val anyOfTwo = CompletableFuture.anyOf(this, timeout)
   anyOfTwo.await()
   return this.isDone
}


fun main() = runBlocking {
   val future = CompletableFuture<String>()

   GlobalScope.launch {
      delay(2000)
      println("setting the result (future now ${future.isDone})")
      future.complete("something")
   }

   while (future.isNotDone()) {
      println("waiting for the future to complete for the next 500ms")
      val isDone = future.await(500)

      if (isDone) {
         println("future is done")
         break
      } else {

         println("future not done")
      }
   }

   Unit
}
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
future not done
waiting for the future to complete for the next 500ms
setting the result (future now false)
future is done