我通常会采用纯函数式、非akka技术来解决这样的问题,然后将这些函数“提升”到akka结构中。我的意思是尝试只使用scala相关内容,并在后续尝试将其包装在akka内部...
文件创建
从基于“随机生成名称”的FileOutputStream
创建开始:
def randomFileNameGenerator : String = ??? //not specified in question
import java.io.FileOutputStream
val randomFileOutGenerator : () => FileOutputStream =
() => new FileOutputStream(randomFileNameGenerator)
状态
需要有一种方式来存储当前文件的“状态”,例如已经写入的字节数:
case class FileState(byteCount : Int = 0,
fileOut : FileOutputStream = randomFileOutGenerator())
文件写入
首先,我们需要确定使用给定的 ByteString
是否会超出最大文件大小阈值:
import akka.util.ByteString
val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
(state, byteString, maxBytes) =>
state.byteCount + byteString.length > maxBytes
我们接下来需要编写一个函数,如果当前状态已经达到容量上限,则创建一个新的
FileState
,否则返回当前状态。该函数的代码如下:
val closeFileInState : FileState => Unit =
(_ : FileState).fileOut.close()
val getCurrentFileState(FileState, ByteString, Int) => FileState =
(state, byteString, maxBytes) =>
if(isEndOfChunk(maxBytes, state, byteString)) {
closeFileInState(state)
FileState()
}
else
state
现在唯一剩下的就是向 FileOutputStream
写入内容:
val writeToFileAndReturn(FileState, ByteString) => FileState =
(fileState, byteString) => {
fileState.fileOut write byteString.toArray
fileState copy (byteCount = fileState.byteCount + byteString.size)
}
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =
writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)
对于任何GenTraversableOnce进行折叠操作
在Scala中,GenTraversableOnce
是指具有折叠运算符的任何集合,可以是并行或非并行的。这些包括Iterator、Vector、Array、Seq、Scala Stream等......最终的writeToChunkedFile
函数完全匹配GenTraversableOnce#fold的签名:
val anyIterable : Iterable = ???
val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))
最后一个未解决的问题是:最后一个 FileOutputStream
也需要关闭。由于这个文件夹只会生成最后一个 FileState
,因此我们可以关闭它:
closeFileInState(finalFileState)
Akka流
Akka Flow从fold
中获取,该方法来自于FlowOps#fold,与GenTraversableOnce
签名类似。因此,我们可以将正常函数转化为流值,方式类似于我们使用Iterable
fold:
import akka.stream.scaladsl.Flow
def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] =
Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))
处理问题时使用常规函数的好处是,它们可以在流之外的其他异步框架中使用,例如Futures或Actors。您还不需要在单元测试中使用akka
ActorSystem
,只需使用常规语言数据结构即可。
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
def byteStringSink(maxBytes : Int) : Sink[ByteString, _] =
chunkerFlow(maxBytes) to (Sink foreach closeFileInState)
你可以使用这个
Sink
来处理从
HttpRequest
返回的
HttpEntity
。
GraphStageLogic
中保持状态并手动创建输出流的逻辑有什么区别吗?例如(基于您的评论+我之前提供的链接)http://pastebin.com/tzLFAmzk?小加号是它允许在文件创建后立即流式传输(但代码更长,容易出错)。 - VuziGraphStageLogic
之间的主要区别是测试。使用我的函数,所有测试都可以在没有ActorSystem
、ActorMaterializer
和ExecutionContext
的情况下完成。如果您将逻辑放入akka流构造中,则需要所有akka框架来测试该逻辑。祝你愉快。 - Ramón J Romero y Vigil