使用scalaz-stream计算摘要

4

我想知道如何使用scalaz-stream和java.security.MessageDigest生成文件的摘要?

我希望使用固定的内存缓冲区大小(例如4KB)。我认为我知道如何开始读取文件,但我不知道如何:

1) 对每个4KB调用digest.update(buf),这实际上是对Java MessageDigest实例进行的副作用,我猜应该在scalaz-stream框架中发生。

2) 最后调用digest.digest()来从scalaz-stream框架中以某种方式接收计算出的摘要?

我大概知道如何开始:

import scalaz.stream._
import java.security.MessageDigest

val f = "/a/b/myfile.bin"
val bufSize = 4096

val digest = MessageDigest.getInstance("SHA-256")

Process.constant(bufSize).toSource
  .through(io.fileChunkR(f, bufSize))

但是我现在卡住了!有什么提示吗?我猜肯定也可以将摘要对象的创建、更新、检索(实际摘要计算)和销毁包装在 scalaz-stream Sink 中,然后调用 .to() 并传递该 Sink?如果我使用了错误的术语,请原谅,我完全是新手。我已经尝试了一些示例,但仍然感到困难。

2个回答

3
自版本0.4起,scalaz-stream包含计算摘要的进程。它们在hash模块中提供,并在底层使用java.security.MessageDigest。下面是一个最简例子,展示如何使用它们:
import scalaz.concurrent.Task
import scalaz.stream._

object Sha1Sum extends App {
  val fileName = "testdata/celsius.txt"
  val bufferSize = 4096

  val sha1sum: Task[Option[String]] =
    Process.constant(bufferSize)
      .toSource
      .through(io.fileChunkR(fileName, bufferSize))
      .pipe(hash.sha1)
      .map(sum => s"${sum.toHex}  $fileName")
      .runLast

  sha1sum.run.foreach(println)
}
< p > update()digest() 调用都包含在 hash.sha1Process1 中。< /p >

那非常酷啊 :-)所以如果这个还不存在的话,看起来我需要创建一个进程(Process),然后在其上调用.pipe而不是创建一个Sink。我猜这是因为Sink不能发出结果,而Process可以? - adamretter
1
@adamretter Sink[F[_],O]Process[F, O => F[Unit]] 的类型别名,因此它们是返回 Unit 的有副作用函数的来源。正如您所演示的,您可以将输入的 ByteVector 发送到 Sink,但没有办法从中获取最终摘要。hash 中的进程是 Process1:它们可以消耗和发出值。文件的块被导入到 sha1 中以更新摘要,并且当没有更多输入值时,它只会发出最终摘要。 - Frank S. Thomas
感谢@frank-s-thomas的有益解释 :-) - adamretter

0

我已经有一些可用的东西,但可能还可以改进:

import java.io._
import java.security.MessageDigest
import resource._
import scodec.bits.ByteVector
import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream._
import scalaz.stream.io._

val f = "/a/b/myfile.bin"
val bufSize = 4096

val md = MessageDigest.getInstance("SHA-256")

def _digestResource(md: => MessageDigest): Sink[Task,ByteVector] =
      resource(Task.delay(md))(md => Task.delay(()))(
        md => Task.now((bytes: ByteVector) => Task.delay(md.update(bytes.toArray))))

Process.constant(4096).toSource
    .through(fileChunkR(f.getAbsolutePath, 4096))
    .to(_digestResource(md))
    .run
    .run

md.digest()

然而,我认为应该有一种更清晰的方法来完成这个任务,即通过将MessageDigest的创建移动到scalaz-stream中,并使最终的.run生成md.digest()

欢迎提供更好的答案...


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接