在Scala中使用Future和Promise进行取消操作

11
这是我的上一个问题的后续。
假设我有一个任务,它执行一个可中断的阻塞调用。我想将其作为Future运行,并使用Promisefailure方法取消它。
我希望取消的操作如下:
  • 如果在任务完成之前取消任务,我希望任务立即完成,如果已经开始,则中断阻塞调用,并且Future会调用onFailure

  • 如果在任务完成后取消任务,我希望得到一个状态,表明取消失败,因为任务已经完成。

这有意义吗? 在Scala中是否可以实现? 有没有这样实现的示例?
4个回答

13

scala.concurrent.Future是只读的,因此一个读者不能为其他读者搞砸事情。

看起来你应该能够按照以下方式实现你想要的内容:

def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
  val p = Promise[T]()
  val f = p.future
  p tryCompleteWith Future(fun(f))
  (f, () => p.tryFailure(new CancellationException))
}

val (f, cancel) = cancellableFuture( future => {
  while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag

  result  // when we're done, return some result
})

val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally)

2
你需要添加一个同步变量,在计算开始时将当前线程设置为锁定状态,然后在结束时获取锁并清除该变量。如果有任何线程被设置,则取消操作将获取锁并调用中断,否则将退出。 - Viktor Klang
1
sourcedelica:AtomicBoolean是可变的,所以可能不是最好的选择。使用Future的原因是它已经分配了内存,并且不会以任何方式干扰。 - Viktor Klang
嗨@ViktorKlang,输出中的未来是否可能不是有趣的部分?我的意思是你是否指的是val f = Future(fun(p.future)) ; p tryCompleteWith f - Francisco López-Sancho
1
@FranciscoLópez-Sancho 在代码中,返回的 Future 将包含计算结果或 CancellationException。 - Viktor Klang
@FranciscoLópez-Sancho,不用谢,我很高兴有这么多开发人员使用Future & Promise。 :) - Viktor Klang
显示剩余6条评论

11

根据Victor的评论,这里是可中断版本的他的代码(如果我误解了,请纠正我)。

object CancellableFuture extends App {

  def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
    val p = Promise[T]()
    val f = p.future
    val aref = new AtomicReference[Thread](null)
    p tryCompleteWith Future {
      val thread = Thread.currentThread
      aref.synchronized { aref.set(thread) }
      try fun() finally {
        val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
        //Deal with interrupted flag of this thread in desired
      }
    }

    (f, () => {
      aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
      p.tryFailure(new CancellationException)
    })
  }

  val (f, cancel) = interruptableFuture[Int] { () =>
    val latch = new CountDownLatch(1)

    latch.await(5, TimeUnit.SECONDS)    // Blocks for 5 sec, is interruptable
    println("latch timed out")

    42  // Completed
  }

  f.onFailure { case ex => println(ex.getClass) }
  f.onSuccess { case i => println(i) }

  Thread.sleep(6000)   // Set to less than 5000 to cancel

  val wasCancelled = cancel()

  println("wasCancelled: " + wasCancelled)
}

使用Thread.sleep(6000),输出结果如下:
latch timed out
42
wasCancelled: false

使用Thread.sleep(1000),输出如下:
wasCancelled: true
class java.util.concurrent.CancellationException

谢谢。这绝对更干净了。 - sourcedelica
根据Victor的gist更新。 - sourcedelica
1
为什么@ViktorKlang的答案没有更新以反映他自己的评论和主旨?!看起来有两个答案互相竞争,而只有一个作者:( - Jacek Laskowski
1
它们是两种不同的变体,它们彼此之间没有竞争。Viktor使用cancel()方法来取消操作,而我使用Thread.interrupt - sourcedelica

6

它仍然有效吗?你链接到了主分支,所以代码从那时起就发生了变化。 - Basil
哦,亲爱的...看起来raise可能已经替换了这个功能:https://twitter.github.io/util/docs/com/twitter/util/Future.html#raise(interrupt:Throwable):Unit - gzm0

2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接