如何将Source[ByteString, Any]转换为InputStream

33

akka-http将使用multipart/form-data编码上传的文件表示为Source[ByteString, Any]。我需要使用Java库对其进行解组,该库需要一个InputStream

如何将Source[ByteString, Any]转换为InputStream


我没有点踩,但我猜测这个问题似乎没有经过什么实质性的工作。你遇到了一个障碍。你尝试过什么来克服它?很高兴看到有人先花时间和精力进行研究,在来这里之前耗尽所有选项,并让我们知道你已经尝试过什么。 - cmbaxter
1
请注意,关于此事在akka用户列表中有一些讨论,网址为https://groups.google.com/forum/#!topic/akka-user/4WvOrFtewQY,同时也有一个相关的开放案例 https://github.com/akka/akka/issues/17338 - Biswanath
4
我也对踩票很好奇。我认为这个问题是合理的,因为库本身没有提供解决方案,文档中也没有描述。答案也很有帮助,希望能帮到其他人。 - kostya
3个回答

25

然而,截至2.0.2,它已经损坏:https://github.com/akka/akka/issues/19392,并且来自已接受答案的解决方案可能会因融合而死锁(发生了一个不太可能的情况!;) - kostya
@kostya 感谢您指出这个问题,但是在修复后语法似乎仍然保持不变:https://github.com/akka/akka/pull/19575 - Bennie Krijger
如果您想要一个Array[Byte]作为结果输出,您会使用InputStream吗?还是有其他方法? - lisak
2
@lisak 如果是这样,你可以使用 data.runFold(ByteString.empty)(_ ++ _).toArray,其中 data 的类型为 Source[ByteString, Any]。 - Bennie Krijger
3
在实例化时,它会将整个数据集加载到内存中吗? - maks
显示剩余2条评论

7

您可以尝试使用OutputStreamSink写入PipedOutputStream并将其提供给PipedInputStream作为输入流,以便其他代码使用。这只是一个大致的想法,但可能会起作用。代码如下:

import akka.util.ByteString
import akka.stream.scaladsl.Source
import java.io.PipedInputStream
import java.io.PipedOutputStream
import akka.stream.io.OutputStreamSink
import java.io.BufferedReader
import java.io.InputStreamReader
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer

object PipedStream extends App{
  implicit val system = ActorSystem("flowtest")
  implicit val mater = ActorFlowMaterializer()

  val lines = for(i <- 1 to 100) yield ByteString(s"This is line $i\n")
  val source = Source(lines)

  val pipedIn = new PipedInputStream()
  val pipedOut = new PipedOutputStream(pipedIn)      
  val flow = source.to(OutputStreamSink(() => pipedOut))
  flow.run()

  val reader = new BufferedReader(new InputStreamReader(pipedIn))
  var line:String = reader.readLine
  while(line != null){
    println(s"Reader received line: $line")
    line = reader.readLine
  }           
}

顺便提一句,PipedOutputStream 的 javadocs 提到,在同一线程中尝试使用 PipedOutputStreamPipedInputStream 不被推荐,因为这可能会导致线程死锁。在 akka 中是否有任何方法可以确保读写发生在不同的线程中? - kostya
我认为你很少会在同一个线程上结束。HTTP系统正在使用“ExecutionContext”来处理具有多部分表单的请求。该请求是从EC的一个线程服务的。如果您使用相同的EC启动另一个“RunnableFlow”,则会获得另一个线程,因此不会出现死锁情况。如果您真的很担心,那么可以让读取表单数据的流使用完全不同的“ExecutionContext”。 - cmbaxter
谢谢您的解释,尽管我有点担心您回答中的“不太可能”这个词。它要么会发生(并且一定会发生),要么不会发生。 - kostya
2
我说“不太可能”,因为如果设置正确,这种情况是不会发生的。但是举个例子,假设你正在进行测试,并且你正在使用CallingThreadDispatcher来处理所有事情,那么它就会出现死锁。 - cmbaxter

2
您可以从ByteString中提取一个迭代器,然后获取InputStream。类似这样的代码(伪代码):
source.map { data: ByteString =>
  data.iterator.asInputStream
}

更新

一个更详细的示例,从Multipart.FormData开始

def isSourceFromFormData(formData: Multipart.FormData): Source[InputStream, Any] = 
 formData.parts.map { part => 
   part.entity.dataBytes
   .map(_.iterator.asInputStream)
}.flatten(FlattenStrategy.concat)

现在的问题是如何将多个输入流转换为单个输入流。 - kostya
你需要在这里处理源代码。正如你在问题中提到的,你正在将一些二进制文件作为多部分表单的一部分上传。所以,你所要做的就是将每个文件的ByteString源代码(每个文件一个源代码)转换为唯一的Source[ByteString]。在akka流中进行此操作应该是一个扁平化操作。类似于这样的(伪代码):{source of sources}.flatten(FlattenStrategy.concat)。希望这可以帮助到你。 - Juan José Vázquez Delgado
也许你需要多次压平?不管怎样,我认为你已经拥有了完成这个任务所需的所有信息。 - Juan José Vázquez Delgado
我问你这些问题的原因是因为我认为你的建议不会起作用。你无法将Source[ByteString]压缩成InputStream。先尝试实现你的建议,你自己就会明白困难在哪里。 - kostya
1
考虑到您的情况,我认为您应该选择source.runWith(StreamConverters.asInputStream),并将该输入流提供给您的Java API。 - Viktor Klang
显示剩余9条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接