Scala并发:为Java并发Future提供的包装器

23

我正在使用Play Framework 2.1.1与一个产生java.util.concurrent.Future结果的外部Java库。我使用scala future而不是Akka,我认为这是在Play 2.1中正确的做法。如何将java.util.concurrent.Future包装到scala.concurrent.Future中,同时仍保持代码非阻塞?

def geConnection() : Connection = {
  // blocking with get
  connectionPool.getConnectionAsync().get(30000, TimeUnit.MILLISECONDS)
}

以上代码返回一个连接,但使用了get方法,因此它变成了阻塞式。

def getConnectionFuture() : Future[Connection] = {
  future {
    // how to remove blocking get and return a scala future?
    connectionPool.getConnectionAsync().get(30000, TimeUnit.MILLISECONDS)
  }
}
理想情况下,我想要一个Scala函数,像上面的代码一样返回连接作为future,但是不会通过get阻塞代码。我需要在函数中加入什么才能使它非阻塞。
任何提示都将是极好的。

你使用的是哪个版本的Scala?从2.10.x开始,Scala采用了Akka的Future作为自己的一部分。 - Randall Schulz
Play 2.1.1在底层使用Scala 2.10.0。 - Mark Sivill
4个回答

21
import java.util.concurrent.{Future => JFuture}
import scala.concurrent.{Future => SFuture}

由于在 SFutureonComplete)中存在回调函数,而在 JFuture 中只有阻塞的 get 函数,因此您无法使用 SFuture 包装 JFuture 而不会发生阻塞。

您可以创建一个附加的线程并使用 get 阻塞它,然后使用 get 的结果来完成 Promise

val jfuture: JFuture[T] = ???
val promise = Promise[T]()
new Thread(new Runnable { def run() { promise.complete(Try{ jfuture.get }) }}).start
val future = promise.future

你可以在无限循环中检查isDone,但我认为阻塞的效果更好。

3
同意。非常遗憾的是,Java 的未来不支持某种完成监听器 / 观察者来处理回调。 - cmbaxter
@cmbaxter 如果你使用Guava,那里有ListenableFuture - fge
所以我在考虑Java部分的回调函数,当完成后,将其结果放入Scala Future中。周围有Java实现回调函数的例子(http://technology.amis.nl/2009/02/19/asynchronous-processing-in-java-applications-leveraging-those-multi-cores/),但不确定如何将其集成到Play 2.1中。我很乐观地希望有一个简单的包装器,但似乎不可行,将Java回调函数放在Scala函数内似乎是可行的方法。 - Mark Sivill
我很好奇,future { jfuture.get } 有什么问题吗?为什么你要使用额外的线程结合 Promise - Petr
4
如果你在线程池中使用此函数,它将会阻塞线程。如果你已经为这样的future配置了ExecutionContext,那么没问题,但默认的ExecutionContext包含与处理器数量相同的线程数 - senia
1
@senia,我已经在stackoverflow上发布了一个问题,询问为什么future { jfuture.get }不够用。 - Dominykas Mostauskis

7
Future {
  blocking {
    jfuture.get
  }
}

这样可以让ExecutionContext知道你正在进行的操作将会阻塞,从而有机会分配更多的线程。如果不包含blocking { },则可能会耗尽线程。


你可能想提供有关这个的优缺点的详细信息。它是否与上面讨论的完全相同(例如来自@senia的评论)? - akauppi
1
抱歉,已添加一条注释来解释“blocking”的使用。 - Dr.Haribo
3
如果你使用“阻塞”操作,可能会导致高负载下出现数万个线程。要小心,因为“阻塞”并非神器! - folex

2
     import java.util.concurrent.{Future => JFuture}
     import scala.concurrent.ExecutionContext.Implicits.global
     import scala.concurrent.Future
     import scala.util.Try

     object JFuture2SFuture {
        val jFuture: JFuture[Int] = ???
        val promise = Promise[Int]()
        Future { promise.complete(Try(jFuture.get)) } //it is non blocking 
        val sFuture:Future[Int] = promise.future

     }

1
注释说“它是非阻塞的”,但在我看来,这似乎会阻塞执行线程。Try.apply是一个按名称调用,但这并不意味着它会异步执行。Promise.complete也是阻塞的。这确实将Java Future的.get包装在异步Scala Future中,但它仍然会阻塞执行线程,类似于Future { Thread.sleep(10000); } - drobert
是的,它会阻塞执行线程,但不会阻塞调用线程。这是有意为之,不会阻塞调用线程。 - Sky

-1

scala-java8-compat库提供了Java8和Scala Futures之间的转换器。

具体来说,您可以使用{{link2:FutureConverters.toScala(connectionPool.getConnectionAsync())}}将java.util.concurrent.Future转换为scala.concurrent.Future


2
这个答案是误导性的。scala-java8-compat可以将java.util.concurrent.CompletionStage转换为scala.concurrent.Future。没有一种“正确”的方法可以将Java Future转换为Scala Future。 - fcs

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接