akka-http将使用multipart/form-data编码上传的文件表示为Source[ByteString, Any]
。我需要使用Java库对其进行解组,该库需要一个InputStream
。
如何将Source[ByteString, Any]
转换为InputStream
?
akka-http将使用multipart/form-data编码上传的文件表示为Source[ByteString, Any]
。我需要使用Java库对其进行解组,该库需要一个InputStream
。
如何将Source[ByteString, Any]
转换为InputStream
?
从2.x版本开始,您可以使用以下代码实现此功能:
import akka.stream.scaladsl.StreamConverters
...
val inputStream: InputStream = entity.dataBytes
.runWith(
StreamConverters.asInputStream(FiniteDuration(3, TimeUnit.SECONDS))
)
注意:此功能在版本2.0.2中出现故障,已于2.4.2中修复。
您可以尝试使用OutputStreamSink
写入PipedOutputStream
并将其提供给PipedInputStream
作为输入流,以便其他代码使用。这只是一个大致的想法,但可能会起作用。代码如下:
import akka.util.ByteString
import akka.stream.scaladsl.Source
import java.io.PipedInputStream
import java.io.PipedOutputStream
import akka.stream.io.OutputStreamSink
import java.io.BufferedReader
import java.io.InputStreamReader
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
object PipedStream extends App{
implicit val system = ActorSystem("flowtest")
implicit val mater = ActorFlowMaterializer()
val lines = for(i <- 1 to 100) yield ByteString(s"This is line $i\n")
val source = Source(lines)
val pipedIn = new PipedInputStream()
val pipedOut = new PipedOutputStream(pipedIn)
val flow = source.to(OutputStreamSink(() => pipedOut))
flow.run()
val reader = new BufferedReader(new InputStreamReader(pipedIn))
var line:String = reader.readLine
while(line != null){
println(s"Reader received line: $line")
line = reader.readLine
}
}
PipedOutputStream
的 javadocs 提到,在同一线程中尝试使用 PipedOutputStream
和 PipedInputStream
不被推荐,因为这可能会导致线程死锁。在 akka 中是否有任何方法可以确保读写发生在不同的线程中? - kostyasource.map { data: ByteString =>
data.iterator.asInputStream
}
更新
一个更详细的示例,从Multipart.FormData开始
def isSourceFromFormData(formData: Multipart.FormData): Source[InputStream, Any] =
formData.parts.map { part =>
part.entity.dataBytes
.map(_.iterator.asInputStream)
}.flatten(FlattenStrategy.concat)