如何在Scala中使用ARM和Futures?

10
我希望实现ARM(自动化资源管理)模式,其中资源是异步使用的。
问题:
假设我的资源看起来像:
class MyResource { 
  def foo() : Future[MyResource] = ???
  // Other methods returning various futures
  def close() : Unit = ???
}
object MyResource { 
  def open(name: String): Future[MyResource] = ???
} 

希望使用模式如下:

val r : Future[MyResource] = MyResource.open("name")
r flatMap (r => {
  r.foo() /* map ... */ andThen {
    case _ => r.close()
  }
})

被省略的映射函数可以很复杂,包括分支和链接Futures,这些Futures反复调用返回Futures的r方法。
我想确保在所有未来的延续完成(或失败)后调用r.close()。在每个调用点手动执行此操作容易出错。这需要一个ARM解决方案。
尝试的解决方案
scala-arm库通常是同步的。由于close()将在块内部的Futures完成之前被调用,因此此代码不会执行正确操作:
for (r <- managed(MyResource.open("name"))) {
  r map (_.foo()) // map ...
}

我考虑使用这个包装器:

def usingAsync[T](opener: => Future[MyResource]) (body: MyResource => Future[T]) : Future[T] =
  opener flatMap { 
    myr => body(myr) andThen { case _ => myr.close() } }

那么调用站点将会是这样的:
usingAsync(MyResource.open("name")) ( myr => {
  myr.foo // map ...
})

但是,块内的代码将负责返回一个 Future,在所有由该块创建的其他 futures 完成时完成。如果它意外地没有这样做,那么在所有使用它的 futures 完成之前,资源将再次被关闭。而且没有静态验证来捕获此错误。例如,这将是一个运行时错误:

usingAsync(MyResource.open("name")) ( myr => {
  myr.foo() // Do one thing
  myr.bar() // Do another
})

如何解决这个问题?

显然,我可以使用scala-arm的限定 continuation 支持(CPS)。但它看起来有点复杂,我担心做错了。而且它需要启用编译器插件。此外,我的团队非常新手 scala,我不想要求他们使用 CPS。

CPS是唯一的前进方式吗?是否有一个更简单的库或设计模式可以通过 Futures 实现这一点,或者一个使用 scala-arm 的例子?


检查Monix Task,它具有完成触发器。https://monix.io/docs/3x/eval/task.html#clean-up-resources-on-finish - WeiChing 林煒清
1个回答

2
反应式扩展(Rx)可能是一种替代解决方案。这种编程范例正在不断发展,现在许多语言包括Scala都可以使用。
Rx的基础是创建一个Observable,它是异步事件的源。Observable可以以复杂的方式链接,这是它的优势。你订阅Observable来监听onNext、onError和onComplete事件。你还会收到一个Subscription,允许你取消订阅。
我认为你可能会添加一个resource.close()调用,在onCompleted和/或onError处理程序中。
请查看RxScala文档:
Observable.subscribe(
    onNext: (T) ⇒ Unit, 
    onError: (Throwable) ⇒ Unit, 
    onCompleted: () ⇒ Unit): Subscription

更多信息:


我们已经使用了play-iteratee,它似乎类似甚至更优于Observable。我不明白它们两者如何有所帮助。一个具有多个返回Futures和close()方法的资源并不等同于可观察的流。你能详细说明一下吗? - danarmak
我会使用迭代器。干杯。 - reggoodwin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接