scala.concurrent.Promise的用例有哪些?

97

我正在阅读SIP-14Future的概念非常容易理解。但是我有两个关于Promise的问题:

  1. The SIP says Depending on the implementation, it may be the case that p.future == p. How can this be? Are Future and Promise not two different types?

  2. When should we use a Promise? The example producer and consumer code :

    import scala.concurrent.{ future, promise }
    val p = promise[T]
    val f = p.future
    
    val producer = future {
        val r = produceSomething()
        p success r
        continueDoingSomethingUnrelated()
    }
    val consumer = future {
        startDoingSomething()
        f onSuccess {
            case r => doSomethingWithResult()
        }
    }
    

虽然这种写法容易阅读,但我们真的需要这样写吗?我尝试使用Future而不是Promise来实现它,代码如下:

val f = future {
   produceSomething()
}

val producer = future {
   continueDoingSomethingUnrelated()
}

startDoingSomething()

val consumer = future {
  f onSuccess {
    case r => doSomethingWithResult()
  }
}

这个问题的区别在于使用Promise是必要的吗?

在第一个例子中,continueDoingSomethingUnrelated() 在同一线程中的 produceSomething() 之后进行评估。 - senia
1
回答问题1,是的FuturePromise是两种不同的类型,但正如您从https://github.com/scala/scala/blob/master/src/library/scala/concurrent/impl/Promise.scala中看到的那样,这个特定的`Promise`实现也扩展了`Future`。 - Dylan
1个回答

124
承诺和未来是互补的概念。未来是一个值,将在未来某个时候检索到,当该事件发生时,您可以对其进行操作。因此,它是计算的读取或输出端点-您从中检索值的东西。
类比地说,Promise是计算的写入端。您创建一个Promise,这是放置计算结果的位置,并从该Promise获取一个Future,将用于读取放入Promise中的结果。当您完成Promise时,无论是失败还是成功,都会触发附加到相关Future的所有行为。
关于您的第一个问题,如何才能使承诺p的p.future == p。您可以将其想象成单项缓冲区-最初为空的容器,您可以在之后存储一个值,该值将永远成为其内容。现在,根据您的观点,这既是Promise也是Future。对于打算在缓冲区中写入值的人来说,这是Promise。对于等待该值放入缓冲区的人来说,它是未来。
具体来说,对于Scala并发API,如果你查看此处中的Promise特质,你可以看到Promise伴生对象中的方法是如何实现的:
object Promise {

  /** Creates a promise object which can be completed with a value.
   *  
   *  @tparam T       the type of the value in the promise
   *  @return         the newly created `Promise` object
   */
  def apply[T](): Promise[T] = new impl.Promise.DefaultPromise[T]()

  /** Creates an already completed Promise with the specified exception.
   *  
   *  @tparam T       the type of the value in the promise
   *  @return         the newly created `Promise` object
   */
  def failed[T](exception: Throwable): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception))

  /** Creates an already completed Promise with the specified result.
   *  
   *  @tparam T       the type of the value in the promise
   *  @return         the newly created `Promise` object
   */
  def successful[T](result: T): Promise[T] = new impl.Promise.KeptPromise[T](Success(result))

}

现在,Promise的实现,DefaultPromise和KeptPromise可以在这里找到。它们都扩展了一个基本的小特性,恰好具有相同的名称,但位于不同的包中:
private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
  def future: this.type = this
}

因此,您可以看到他们通过p.future == p的含义。

DefaultPromise是我上面提到的缓冲区,而KeptPromise是从其创建之初就放入值的缓冲区。

关于您的示例,您在那里使用的future块实际上在幕后创建了一个promise。让我们来看一下这里future的定义:

def future[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = Future[T](body)

通过按照方法的链式调用,您最终会进入 impl.Future
private[concurrent] object Future {
  class PromiseCompletingRunnable[T](body: => T) extends Runnable {
    val promise = new Promise.DefaultPromise[T]()

    override def run() = {
      promise complete {
        try Success(body) catch { case NonFatal(e) => Failure(e) }
      }
    }
  }

  def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
    val runnable = new PromiseCompletingRunnable(body)
    executor.execute(runnable)
    runnable.promise.future
  }
}

因此,正如您所看到的,从producer block获得的结果会被注入到一个promise中。

后续编辑:

关于实际应用:大部分时间里,您不会直接处理promises。如果您使用的是执行异步计算的库,则只需使用该库方法返回的futures即可。在这种情况下,promises是由库创建的-您只是使用这些方法所做的读取端口。

但是,如果您需要实现自己的异步API,那么您必须开始使用它们。 假设您需要在Netty上实现一个异步HTTP客户端。那么您的代码将如下所示:

    def makeHTTPCall(request: Request): Future[Response] = {
        val p = Promise[Response]
        registerOnCompleteCallback(buffer => {
            val response = makeResponse(buffer)
            p success response
        })
        p.future
    }

3
Promise 的使用案例应该在实现代码中。Future 是一种不错的只读对象,可以暴露给客户端代码。另外,Future.future{...} 语法有时可能很繁琐。 - Dylan
11
你可以这样看待它:没有承诺就没有未来。如果一开始没有完成承诺,未来就无法返回价值。承诺不是可选的,它们是未来必须遵循的强制书写规则。仅使用未来是不够的,因为没有人提供它们的返回值。 - Marius Danila
4
我明白你所说的“真实世界的应用”是什么意思:我已经更新了我的回答并给出了一个例子。 - Marius Danila
2
@Marius: 考虑到给出的现实世界示例,如果 makeHTTPCall 实现如下: def makeHTTPCall(request: Request): Future[Response] = { Future { registerOnCompleteCallback(buffer => { val response = makeResponse(buffer) response }) } } - puneetk
1
@puneetk 然后你将拥有一个未来,它会在 registerOnCompleteCallback() 完成后立即完成。此外,它不会返回 Future[Response],而是返回 Future[registerOnCompleteCallback() 返回类型] - Evgeny Veretennikov
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接