Scala有哪些自动资源管理的替代方案?

105

我在网上看到很多关于Scala中ARM(自动资源管理)的例子。似乎编写一个这样的例子是成长过程中必经的,但大多数看起来几乎都一样。不过,我确实看到了一个使用continuations的相当酷的例子。

无论如何,许多代码都存在某种类型的缺陷,因此我认为在Stack Overflow上有一个参考是个好主意,我们可以投票支持最正确和适当的版本。


如果这个问题不是社区维基,会产生更多的答案吗?不确定社区维基中投票的答案是否会奖励声望... - huynhjl
2
唯一的引用可以为ARM添加另一层安全性,以确保在调用close()之前将资源的引用返回给管理器。http://thread.gmane.org/gmane.comp.lang.scala/19160/focus=19168 - retronym
@retronym,我认为唯一性插件将是一个相当大的革命,比续体更加重要。实际上,我认为这是Scala中最有可能在不久的将来被移植到其他语言的东西之一。当这个插件发布时,让我们确保相应地编辑答案。 :-) - Daniel C. Sobral
1
因为我需要能够嵌套多个java.lang.AutoCloseable实例,每个实例都依赖于先前的实例成功实例化,所以我最终想出了一个对我非常有用的模式。我将其写成了类似StackOverflow问题的答案:https://dev59.com/qG445IYBdhLWcg3w_O8m#34277491 - chaotic3quilibrium
10个回答

75

Chris Hansen在他于2009年3月26日发布的博客文章“Scala中的ARM块:再探讨”中谈到了Martin Odersky的FOSDEM演讲的第21张幻灯片。下面这个代码块是直接从第21张幻灯片上复制而来(经过允许):

def using[T <: { def close() }]
    (resource: T)
    (block: T => Unit) 
{
  try {
    block(resource)
  } finally {
    if (resource != null) resource.close()
  }
}

--结束引用--

然后我们可以这样调用:

using(new BufferedReader(new FileReader("file"))) { r =>
  var count = 0
  while (r.readLine != null) count += 1
  println(count)
}

这种方法有什么缺点呢?这个模式似乎可以解决95%的自动资源管理需求...
编辑:添加了代码片段
---
编辑2:扩展设计模式 - 受Python的 “with” 语句启发,解决以下问题:
- 块前运行的语句 - 根据受控资源重新抛出异常 - 一次使用一个 using 语句处理两个资源 - 提供隐式转换和 Managed 类以实现资源特定处理
此代码适用于Scala 2.8。
trait Managed[T] {
  def onEnter(): T
  def onExit(t:Throwable = null): Unit
  def attempt(block: => Unit): Unit = {
    try { block } finally {}
  }
}

def using[T <: Any](managed: Managed[T])(block: T => Unit) {
  val resource = managed.onEnter()
  var exception = false
  try { block(resource) } catch  {
    case t:Throwable => exception = true; managed.onExit(t)
  } finally {
    if (!exception) managed.onExit()
  }
}

def using[T <: Any, U <: Any]
    (managed1: Managed[T], managed2: Managed[U])
    (block: T => U => Unit) {
  using[T](managed1) { r =>
    using[U](managed2) { s => block(r)(s) }
  }
}

class ManagedOS(out:OutputStream) extends Managed[OutputStream] {
  def onEnter(): OutputStream = out
  def onExit(t:Throwable = null): Unit = {
    attempt(out.close())
    if (t != null) throw t
  }
}
class ManagedIS(in:InputStream) extends Managed[InputStream] {
  def onEnter(): InputStream = in
  def onExit(t:Throwable = null): Unit = {
    attempt(in.close())
    if (t != null) throw t
  }
}

implicit def os2managed(out:OutputStream): Managed[OutputStream] = {
  return new ManagedOS(out)
}
implicit def is2managed(in:InputStream): Managed[InputStream] = {
  return new ManagedIS(in)
}

def main(args:Array[String]): Unit = {
  using(new FileInputStream("foo.txt"), new FileOutputStream("bar.txt")) { 
    in => out =>
    Iterator continually { in.read() } takeWhile( _ != -1) foreach { 
      out.write(_) 
    }
  }
}

2
有其他选择,但我并不意味着那有什么问题。我只是想把所有的答案都放在 Stack Overflow 上。 :-) - Daniel C. Sobral
5
你知道标准API中是否有类似的东西吗?每次都要自己写好像很麻烦。 - Daniel Darabos
这篇文章发布已经有一段时间了,但第一个解决方案如果out构造函数抛出异常就不会关闭内部流,虽然在这里可能不会发生,但在其他情况下可能会很糟糕。close()方法也可能会抛出异常。此外,没有区分致命异常。第二个解决方案到处都是代码异味,并且与第一个解决方案相比没有任何优势。你甚至失去了实际类型,所以对于像ZipInputStream这样的东西来说是无用的。 - steinybot
如果该块返回一个迭代器,您建议如何处理? - Jorge Machado
返回某物: def using[R, T <: {def close(): Unit}](resource: T)(fnc: T => R): R = { ... } - surfealokesea
返回某物: def using[R, T <: {def close(): Unit}](resource: T)(fnc: T => R): R = { ... } - undefined

63

我最近刚刚部署了scala-arm库,用于自动资源管理。你可以在这里找到文档:https://github.com/jsuereth/scala-arm/wiki

目前,该库支持三种使用方式:

  1. 命令式/for表达式:

    import resource._
    for(input <- managed(new FileInputStream("test.txt"))) {
    // 使用input作为FileInputStream的代码
    }
    
  2. 单子风格

    import resource._
    import java.io._
    val lines = for { 
                    input <- managed(new FileInputStream("test.txt"))
                    val bufferedReader = new BufferedReader(new InputStreamReader(input)) 
                    line <- makeBufferedReaderLineIterator(bufferedReader)
                  } yield line.trim()
    lines foreach println
    
  3. 分界限延续风格

这是一个“回声”TCP服务器:
import java.io._
import util.continuations._
import resource._
def each_line_from(r : BufferedReader) : String @suspendable =
  shift { k =>
    var line = r.readLine
    while(line != null) {
      k(line)
      line = r.readLine
    }
  }
reset {
  val server = managed(new ServerSocket(8007)) !
  while(true) {
    // This reset is not needed, however the  below denotes a "flow" of execution that can be deferred.
    // One can envision an asynchronous execuction model that would support the exact same semantics as below.
    reset {
      val connection = managed(server.accept) !
      val output = managed(connection.getOutputStream) !
      val input = managed(connection.getInputStream) !
      val writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(output)))
      val reader = new BufferedReader(new InputStreamReader(input))
      writer.println(each_line_from(reader))
      writer.flush()
    }
  }
}

代码使用了一个资源类型特性,因此它能够适应大多数资源类型。它还具备对具有close或dispose方法的类进行结构化类型检查的回退功能。请查看文档,并让我知道是否有任何有用的功能需要添加。

1
是的,我看到了。我想查看代码,看看你是如何完成一些事情的,但我现在太忙了。无论如何,既然问题的目标是提供可靠的ARM代码参考,我将把这个答案作为被接受的答案。 - Daniel C. Sobral

18

以下是James Iry使用continuations的解决方案:

// standard using block definition
def using[X <: {def close()}, A](resource : X)(f : X => A) = {
   try {
     f(resource)
   } finally {
     resource.close()
   }
}

// A DC version of 'using' 
def resource[X <: {def close()}, B](res : X) = shift(using[X, B](res))

// some sugar for reset
def withResources[A, C](x : => A @cps[A, C]) = reset{x}

以下是带有continuations和不带continuations的解决方案供比较:

def copyFileCPS = using(new BufferedReader(new FileReader("test.txt"))) {
  reader => {
   using(new BufferedWriter(new FileWriter("test_copy.txt"))) {
      writer => {
        var line = reader.readLine
        var count = 0
        while (line != null) {
          count += 1
          writer.write(line)
          writer.newLine
          line = reader.readLine
        }
        count
      }
    }
  }
}

def copyFileDC = withResources {
  val reader = resource[BufferedReader,Int](new BufferedReader(new FileReader("test.txt")))
  val writer = resource[BufferedWriter,Int](new BufferedWriter(new FileWriter("test_copy.txt")))
  var line = reader.readLine
  var count = 0
  while(line != null) {
    count += 1
    writer write line
    writer.newLine
    line = reader.readLine
  }
  count
}

以下是 Tiark Rompf 提出的改进建议:

trait ContextType[B]
def forceContextType[B]: ContextType[B] = null

// A DC version of 'using'
def resource[X <: {def close()}, B: ContextType](res : X): X @cps[B,B] = shift(using[X, B](res))

// some sugar for reset
def withResources[A](x : => A @cps[A, A]) = reset{x}

// and now use our new lib
def copyFileDC = withResources {
 implicit val _ = forceContextType[Int]
 val reader = resource(new BufferedReader(new FileReader("test.txt")))
 val writer = resource(new BufferedWriter(new FileWriter("test_copy.txt")))
 var line = reader.readLine
 var count = 0
 while(line != null) {
   count += 1
   writer write line
   writer.newLine
   line = reader.readLine
 }
 count
}

使用(new BufferedWriter(new FileWriter("test_copy.txt")))不会在BufferedWriter构造函数失败时遇到问题吗?每个资源都应该包装在using块中... - Jaap
@Jaap 这是Oracle建议的风格BufferedWriter不会抛出已检查异常,因此如果抛出任何异常,则不应从中恢复程序。 - Daniel C. Sobral

17

目前,Scala 2.13 终于支持了 try with resources,使用Using进行操作 :), 示例:

val lines: Try[Seq[String]] =
  Using(new BufferedReader(new FileReader("file.txt"))) { reader =>
    Iterator.unfold(())(_ => Option(reader.readLine()).map(_ -> ())).toList
  }

或者使用Using.resource来避免使用Try

val lines: Seq[String] =
  Using.resource(new BufferedReader(new FileReader("file.txt"))) { reader =>
    Iterator.unfold(())(_ => Option(reader.readLine()).map(_ -> ())).toList
  }

你可以在Using文档中找到更多示例。

一个用于执行自动资源管理的工具。它可用于使用资源执行操作,执行完成后按照创建的相反顺序释放资源。


请您也添加 Using.resource 变体好吗? - Daniel C. Sobral
@DanielC.Sobral,好的,已经添加了。 - chengpohi
你会如何在Scala 2.12中编写这个代码?这里有一个类似的using方法: (block: A => B): B = try block(resource) finally resource.close()``` - Mike Slinn

7

我认为使用Scala进行ARM的演变可以分为4个步骤:

  1. 不使用ARM:脏乱
  2. 只使用闭包:更好,但是有多个嵌套块
  3. Continuation Monad:使用For语句来扁平化嵌套,但在2个块之间有不自然的分离
  4. 直接风格的continuations:Nirava,aha!这也是最安全的选择:在withResource块之外的资源将会产生类型错误。

1
请注意,Scala中的CPS通过单子实现。 :-) - Daniel C. Sobral
1
  1. 你可以在不是 continuation monad 的 monad 中进行资源管理。
  2. 使用我的 withResources/resource delimited continuations 代码进行资源管理与 "using" 相比,它的类型安全性并没有更高(也没有更低)。仍然有可能忘记管理需要管理的资源。
比较:using(new Resource()) { first => val second = new Resource() //oops! // use resources } // only first gets closed withResources { val first = resource(new Resource()) val second = new Resource() // oops! // use resources... } // only first gets closed
- James Iry
2
Daniel,Scala中的CPS就像任何函数式语言中的CPS一样。它使用一个单子来限定延续。 - James Iry
James,感谢你的清晰解释。身在印度的我只能幻想着我能参加你的BASE演讲。期待着你将那些幻灯片放到网上 :) - Mushtaq Ahmed

6

在better-files中包含了轻量级(10行代码)的ARM。详情请参考:https://github.com/pathikrit/better-files#lightweight-arm

import better.files._
for {
  in <- inputStream.autoClosed
  out <- outputStream.autoClosed
} in.pipeTo(out)
// The input and output streams are auto-closed once out of scope

以下是如果你不想使用整个库而实现它的方法:

  type Closeable = {
    def close(): Unit
  }

  type ManagedResource[A <: Closeable] = Traversable[A]

  implicit class CloseableOps[A <: Closeable](resource: A) {        
    def autoClosed: ManagedResource[A] = new Traversable[A] {
      override def foreach[U](f: A => U) = try {
        f(resource)
      } finally {
        resource.close()
      }
    }
  }

这很不错。我采用了类似的方法,但是为CloseableOps定义了mapflatMap方法,而不是使用foreach,以便for推导不会产生可遍历对象。 - EdgeCaseBerg

1

另一种选择是Choppy的懒惰TryClose monad。它在数据库连接方面表现不错:

val ds = new JdbcDataSource()
val output = for {
  conn  <- TryClose(ds.getConnection())
  ps    <- TryClose(conn.prepareStatement("select * from MyTable"))
  rs    <- TryClose.wrap(ps.executeQuery())
} yield wrap(extractResult(rs))

// Note that Nothing will actually be done until 'resolve' is called
output.resolve match {
    case Success(result) => // Do something
    case Failure(e) =>      // Handle Stuff
}

而且使用流:

val output = for {
  outputStream      <- TryClose(new ByteArrayOutputStream())
  gzipOutputStream  <- TryClose(new GZIPOutputStream(outputStream))
  _                 <- TryClose.wrap(gzipOutputStream.write(content))
} yield wrap({gzipOutputStream.flush(); outputStream.toByteArray})

output.resolve.unwrap match {
  case Success(bytes) => // process result
  case Failure(e) => // handle exception
}

更多信息请点击此处:https://github.com/choppythelumberjack/tryclose


1
如何使用类型类?
trait GenericDisposable[-T] {
   def dispose(v:T):Unit
}
...

def using[T,U](r:T)(block:T => U)(implicit disp:GenericDisposable[T]):U = try {
   block(r)
} finally { 
   Option(r).foreach { r => disp.dispose(r) } 
}

0

这里是@chengpohi的答案,修改后可以在Scala 2.8+中使用,而不仅仅是Scala 2.13(是的,它也可以在Scala 2.13中使用):

def unfold[A, S](start: S)(op: S => Option[(A, S)]): List[A] =
  Iterator
    .iterate(op(start))(_.flatMap{ case (_, s) => op(s) })
    .map(_.map(_._1))
    .takeWhile(_.isDefined)
    .flatten
    .toList

def using[A <: AutoCloseable, B](resource: A)
                                (block: A => B): B =
  try block(resource) finally resource.close()

val lines: Seq[String] =
  using(new BufferedReader(new FileReader("file.txt"))) { reader =>
    unfold(())(_ => Option(reader.readLine()).map(_ -> ())).toList
  }

0

虽然使用Using也可以,但我更喜欢资源组合的单子样式。Twitter Util的Managed非常好,除了它的依赖和不太光滑的API。

为此,我已经发布了https://github.com/dvgica/managerial,适用于Scala 2.12、2.13和3.0.0。主要基于Twitter Util Managed代码,无依赖项,一些API改进受到cats-effect Resource的启发。

下面是一个简单的例子:

import ca.dvgi.managerial._
val fileContents = Managed.from(scala.io.Source.fromFile("file.txt")).use(_.mkString)

但是该库的真正强大之处在于通过for comprehensions组合资源

让我知道你的想法!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接