Scala:将InputStream转换为Array[Byte]

44

在Scala中,从InputStream读取到byte数组的最佳方法是什么?

我看到你可以将InputStream转换为char数组。

Source.fromInputStream(is).toArray()
12个回答

50
如下所示:

怎么样:

Stream.continually(is.read).takeWhile(_ != -1).map(_.toByte).toArray

更新:使用LazyList代替Stream(因为在Scala 3中已经弃用了Stream

LazyList.continually(is.read).takeWhile(_ != -1).map(_.toByte).toArray

1
你能解释一下这个和问题中的变量之间的区别吗? - Jus12
12
那不会创建一个巨大的链表,然后再将其转换为数组吗?这看起来在时间和内存方面都不是很高效。 - Marcus Downing
1
看起来这并没有创建一个链表。Stream.continually 生成一个迭代器,而 takeWhilemap 似乎将迭代器转换为迭代器。例如,在 Scala 2.9.3 REPL 中评估 Array(1, 2, 3, 4, -1).iterator.takeWhile(-1 !=).map(_.toByte) 将给出 Iterator[Byte] = non-empty iterator - mikhail_b
1
这似乎会导致OOM错误。虽然最终进行了垃圾回收,但这些峰值超出了我的服务器处理能力。 - James Ward
@JamesWard 你可以使用KevinWright的方法。 - Eastsun
显示剩余2条评论

46

刚刚通过替换,我们的服务器代码中的瓶颈被移除了

Stream.continually(request.getInputStream.read()).takeWhile(_ != -1).map(_.toByte).toArray

org.apache.commons.io.IOUtils.toByteArray(request.getInputStream)

或者用纯Scala:

def bytes(in: InputStream, initSize: Int = 8192): Array[Byte] = {
  var buf = new Array[Byte](initSize)
  val step = initSize
  var pos, n = 0
  while ({
    if (pos + step > buf.length) buf = util.Arrays.copyOf(buf, buf.length << 1)
    n = in.read(buf, pos, step)
    n != -1
  }) pos += n
  if (pos != buf.length) buf = util.Arrays.copyOf(buf, pos)
  buf
}

不要忘记无论如何都要关闭已打开的输入流:

val in = request.getInputStream
try bytes(in) finally in.close()

这是org.apache.commons.io.IOUtils.toByteArray,以防有人想知道。 - Haakon
这绝对感觉更快了。有人用大文件做过基准测试吗? - EdgeCaseBerg
谢谢。我在使用Apache Spark时遇到了GC Overhead错误的巨大问题,其中90%的时间我的任务都花费在GC上。将其替换为“toByteArray”大大加快了速度。 - hermansc
重要的是要指出这种解决方案如何真正地比其他替代方案表现出色,其中你需要像 map(_.toByte) 这样逐字节迭代输入... 如果你正在处理大数据,那么就要这样做! - dividebyzero

20

与Eastsun的回答类似...我本来想以评论的形式发表,但最后变得有点长了!

如果持有头元素的引用,我建议不要使用Stream,因为流很容易消耗大量内存。

考虑到您只会读取文件一次,那么Iterator是一个更好的选择:

def inputStreamToByteArray(is: InputStream): Array[Byte] =
  Iterator continually is.read takeWhile (-1 !=) map (_.toByte) toArray

14
import scala.tools.nsc.io.Streamable
Streamable.bytes(is)

我不记得这有多久了:可能是以天为单位。如果回到2.8,情况更像是

new Streamable.Bytes { def inputStream() = is } toByteArray

1
使用scala.tools包中的内容是否安全?它们是否属于标准库的一部分? - Y.H Wong
不行。但如果您想知道如何编写它,这就是代码。 - psp
2
现在似乎已经转移到更标准的 scala.reflect.io 包中了。 - Thilo
1
scala.reflect.io.Streamable.bytes - WestCoastProjects

11

使用 Scala IO 库,这应该能够工作:

def inputStreamToByteArray(is: InputStream): Array[Byte] = 
   Resource.fromInputStream(in).byteArray

7

3
better.files 应该只在标准库中存在,它更好用。另外,如果你想要 Array[Byte],你需要使用 is.byteArray - Julian Pieles

3

Source.fromInputStream(is).map(_.toByte).toArray


3
这种方法无法处理二进制/伪编码的文本文件:https://dev59.com/f2rXa4cB1Zd3GeqPB7RG。 - Sebastian J.

2

如何使用流以及ByteArrayOutputStream的缓冲版本,以最小化围绕最终数组增长的样板文件?

Original Answer翻译成"最初的回答"

val EOF: Int = -1

def readBytes(is: InputStream, bufferSize: Int): Array[Byte] = {
  val buf = Array.ofDim[Byte](bufferSize)
  val out = new ByteArrayOutputStream(bufferSize)

  Stream.continually(is.read(buf)) takeWhile { _ != EOF } foreach { n =>
    out.write(buf, 0, n)
  }

  out.toByteArray
}

1
自JDK 9以来:
is.readAllBytes()

1
这是一种使用scalaz-stream的方法:

import scalaz.concurrent.Task
import scalaz.stream._
import scodec.bits.ByteVector

def allBytesR(is: InputStream): Process[Task, ByteVector] =
  io.chunkR(is).evalMap(_(4096)).reduce(_ ++ _).lastOr(ByteVector.empty)

可能没有理由减少,那样会破坏流的增量特性。 - OlegYch
原因是问题要求一个字节数组。 - Chris Martin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接