Akka流 - 将字节串流拆分为多个文件

4
我正在尝试将接收到的Akka字节流(来自http请求体,但也可以来自文件)拆分为多个预定义大小的文件。
例如,如果我上传10GB的文件,它会创建大约10个1GB的文件。这些文件将具有随机生成的名称。我的问题是我不知道从哪里开始,因为所有的响应和示例都是将整个数据块存储在内存中,或者使用基于字符串的分隔符。除了我不能真正拥有1GB的“块”,然后将它们写入磁盘。
是否有任何容易执行此操作的方式? 我唯一的想法是使用类似于http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings 的内容,但转换为类似于FlowShape[ByteString, File],将收到的字节字符串写入文件块直到达到最大文件大小,然后创建新文件等...,并且流回已创建的文件。 然而,这看起来像是一个不适当地使用Akka的可怕想法。
提前谢谢!
3个回答

7

我通常会采用纯函数式、非akka技术来解决这样的问题,然后将这些函数“提升”到akka结构中。我的意思是尝试只使用scala相关内容,并在后续尝试将其包装在akka内部...

文件创建

从基于“随机生成名称”的FileOutputStream创建开始:

def randomFileNameGenerator : String = ??? //not specified in question

import java.io.FileOutputStream

val randomFileOutGenerator : () => FileOutputStream = 
  () => new FileOutputStream(randomFileNameGenerator)

状态

需要有一种方式来存储当前文件的“状态”,例如已经写入的字节数:

case class FileState(byteCount : Int = 0, 
                     fileOut : FileOutputStream = randomFileOutGenerator())

文件写入

首先,我们需要确定使用给定的 ByteString 是否会超出最大文件大小阈值:

import akka.util.ByteString

val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
  (state, byteString, maxBytes) =>
    state.byteCount + byteString.length > maxBytes

我们接下来需要编写一个函数,如果当前状态已经达到容量上限,则创建一个新的FileState,否则返回当前状态。该函数的代码如下:
val closeFileInState : FileState => Unit = 
  (_ : FileState).fileOut.close()

val getCurrentFileState(FileState, ByteString, Int) => FileState = 
  (state, byteString, maxBytes) =>
    if(isEndOfChunk(maxBytes, state, byteString)) {
      closeFileInState(state)
      FileState()
    }
    else
      state

现在唯一剩下的就是向 FileOutputStream 写入内容:

val writeToFileAndReturn(FileState, ByteString) => FileState = 
  (fileState, byteString) => {
    fileState.fileOut write byteString.toArray
    fileState copy (byteCount = fileState.byteCount + byteString.size)
  }

//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =    
  writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)    

对于任何GenTraversableOnce进行折叠操作

在Scala中,GenTraversableOnce是指具有折叠运算符的任何集合,可以是并行或非并行的。这些包括Iterator、Vector、Array、Seq、Scala Stream等......最终的writeToChunkedFile函数完全匹配GenTraversableOnce#fold的签名:

val anyIterable : Iterable = ???

val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))

最后一个未解决的问题是:最后一个 FileOutputStream 也需要关闭。由于这个文件夹只会生成最后一个 FileState,因此我们可以关闭它:

closeFileInState(finalFileState)

Akka流

Akka Flow从fold中获取,该方法来自于FlowOps#fold,与GenTraversableOnce签名类似。因此,我们可以将正常函数转化为流值,方式类似于我们使用Iterable fold:

import akka.stream.scaladsl.Flow

def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] = 
  Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))

处理问题时使用常规函数的好处是,它们可以在流之外的其他异步框架中使用,例如Futures或Actors。您还不需要在单元测试中使用akka ActorSystem,只需使用常规语言数据结构即可。
import akka.stream.scaladsl.Sink
import scala.concurrent.Future

def byteStringSink(maxBytes : Int) : Sink[ByteString, _] = 
  chunkerFlow(maxBytes) to (Sink foreach closeFileInState)

你可以使用这个 Sink 来处理从 HttpRequest 返回的 HttpEntity

哇,谢谢,这是一个非常详细的回答!我没有想到过。这种方法与在GraphStageLogic中保持状态并手动创建输出流的逻辑有什么区别吗?例如(基于您的评论+我之前提供的链接)http://pastebin.com/tzLFAmzk?小加号是它允许在文件创建后立即流式传输(但代码更长,容易出错)。 - Vuzi
@Vuzi 不用谢。我在“现实世界”中看到的我的方法和GraphStageLogic之间的主要区别是测试。使用我的函数,所有测试都可以在没有ActorSystemActorMaterializerExecutionContext的情况下完成。如果您将逻辑放入akka流构造中,则需要所有akka框架来测试该逻辑。祝你愉快。 - Ramón J Romero y Vigil

1

将一个ByteString流拆分成多个文件的惯用方法是使用Alpakka的LogRotatorSink。来自文档:

此sink将采取一个函数作为参数,该函数返回一个Bytestring => Option[Path]函数。如果生成的函数返回路径,则sink将旋转文件输出到这个新路径,并且实际的ByteString也将写入这个新文件。通过这种方法,用户可以定义自定义的有状态文件生成实现。

以下fileSizeRotationFunction也来自文档:

val fileSizeRotationFunction = () => {
  val max = 10 * 1024 * 1024
  var size: Long = max
  (element: ByteString) =>
    {
      if (size + element.size > max) {
        val path = Files.createTempFile("out-", ".log")
        size = element.size
        Some(path)
      } else {
        size += element.size
        None
      }
    }
}

它的使用示例:

val source: Source[ByteString, _] = ???
source.runWith(LogRotatorSink(fileSizeRotationFunction))

1
你可以编写自定义图形阶段。你的问题类似于在上传到Amazon S3期间在Alpakka中面临的问题。(谷歌Alpakka S3连接器..他们不允许我发布超过2个链接)。
由于某种原因,s3连接器DiskBuffer会将整个字节串的传入源写入文件,然后再发出块以进行进一步的流处理。
我们想要的是类似于将字节字符串来源限制为特定长度的东西。在这个例子中,他们通过维护一个内存缓冲区,将传入的Source[ByteString, _]限制为固定大小的byteStrings源。我将其用于与文件一起使用。 优点是您可以使用专用线程池来执行此阶段的阻塞IO操作。对于良好的反应流,您希望将阻塞IO保留在actor系统中的单独线程池中。 PS:这并不试图制作精确大小的文件..因此,如果在100MB文件中读取了额外的2KB..我们会将这些额外的字节写入当前文件,而不是尝试实现精确的大小。
import java.io.{FileOutputStream, RandomAccessFile}
import java.nio.channels.FileChannel
import java.nio.file.Path

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream._
import akka.util.ByteString

case class MultipartUploadChunk(path: Path, size: Int, partNumber: Int)
//Starts writing the byteStrings received from upstream to a file. Emits a path after writing a partSize number of bytes. Does not attemtp to write exact number of bytes.
class FileChunker(maxSize: Int, tempDir: Path, partSize: Int)
    extends GraphStage[FlowShape[ByteString, MultipartUploadChunk]] {

  assert(maxSize > partSize, "Max size should be larger than part size. ")

  val in: Inlet[ByteString] = Inlet[ByteString]("PartsMaker.in")
  val out: Outlet[MultipartUploadChunk] = Outlet[MultipartUploadChunk]("PartsMaker.out")

  override val shape: FlowShape[ByteString, MultipartUploadChunk] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {

      var partNumber: Int = 0
      var length: Int = 0
      var currentBuffer: Option[PartBuffer] = None

      override def onPull(): Unit =
        if (isClosed(in)) {
          emitPart(currentBuffer, length)
        } else {
          pull(in)
        }

      override def onPush(): Unit = {
        val elem = grab(in)
        length += elem.size
        val currentPart: PartBuffer = currentBuffer match {
          case Some(part) => part
          case None =>
            val newPart = createPart(partNumber)
            currentBuffer = Some(newPart)
            newPart
        }
        currentPart.fileChannel.write(elem.asByteBuffer)
        if (length > partSize) {
          emitPart(currentBuffer, length)
          //3. .increment part number, reset length.
          partNumber += 1
          length = 0
        } else {
          pull(in)
        }
      }

      override def onUpstreamFinish(): Unit =
        if (length > 0) emitPart(currentBuffer, length) // emit part only if something is still left in current buffer.

      private def emitPart(maybePart: Option[PartBuffer], size: Int): Unit = maybePart match {
        case Some(part) =>
          //1. flush the part buffer and truncate the file.
          part.fileChannel.force(false)
          //          not sure why we do this truncate.. but was being done in alpakka. also maybe safe to do.
//                    val ch = new FileOutputStream(part.path.toFile).getChannel
//          try {
//            println(s"truncating to size $size")
//            ch.truncate(size)
//          } finally {
//            ch.close()
//          }
          //2emit the part
          val chunk = MultipartUploadChunk(path = part.path, size = length, partNumber = partNumber)
          push(out, chunk)
          part.fileChannel.close() // TODO: probably close elsewhere.
          currentBuffer = None
          //complete stage if in is closed.
          if (isClosed(in)) completeStage()
        case None => if (isClosed(in)) completeStage()
      }

      private def createPart(partNum: Int): PartBuffer = {
        val path: Path = partFile(partNum)
        //currentPart.deleteOnExit() //TODO: Enable in prod. requests that the file be deleted when VM dies.
        PartBuffer(path, new RandomAccessFile(path.toFile, "rw").getChannel)
      }

      /**
       * Creates a file in the temp directory with name bmcs-buffer-part-$partNumber
       * @param partNumber the part number in multipart upload.
       * @return
       * TODO:add unique id to the file name. for multiple
       */
      private def partFile(partNumber: Int): Path =
        tempDir.resolve(s"bmcs-buffer-part-$partNumber.bin")
      setHandlers(in, out, this)
    }

  case class PartBuffer(path: Path, fileChannel: FileChannel) //TODO:  see if you need mapped byte buffer. might be ok with just output stream / channel.

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接