在Akka-Streams中流内的分割

3

我正在尝试想出一种解决方案,将我接收到的字符串拆分为多个字符串。我已经进行了研究,看起来在之前的Akka-Streams版本中有一个名为Transformer的类,你可以扩展它来完成这种转换。

在我使用的版本(RC2)中,有Stage,但我不太确定如何实现拆分模式。

Source.actorPublisher[String](MyActor.props).
.XXXXX(_.split("\n"))
.map(...)
.to(Sink(...))

我正在寻找一个组件XXXXX,它可以允许我输入一个String并返回一系列String,并将每个String发送到其余的流中。


2
如果结果元素始终仅依赖于单个输入元素,则可以使用mapConcat。如果依赖关系更复杂,则可以使用(有状态的)阶段。 - jrudolph
2
作为一般性的补充,mapConcat可以被视为flatMap的一种形式。名称不同是因为某些单子律不成立。 - almendar
2个回答

5

我同意 @jrudolph 的观点,mapConcat 可能是你想要的。下面是一个快速示例,展示了这种方法的用法:

  val strings = List(
  """hello
     world
     test
     this""",
     """foo
     bar
     baz
     """

  )

  implicit val system = ActorSystem("test")
  implicit val mater = ActorFlowMaterializer()
  Source(strings).
    mapConcat(_.split("\n").map(_.trim).toList).
    runForeach(println)

如果您运行此代码,您将看到以下内容打印出来:
hello
world
test
this
foo     
bar
baz

2
Akka提供了Framing辅助函数来解决这种类型的问题。
假设您的字符集为UTF-8,您可以编写一个函数,该函数接受定界String值的最大大小,并返回可以执行拆分的Flow
import akka.stream.scaladsl.Framing
import akka.util.ByteString

val newLineSplitter : (Int) => Flow[String, String, NotUsed] = 
  (maxLineSize) =>
    Flow[String]
      .map(ByteString.apply)
      .via(Framing delimiter (ByteString("\n"), maxLineSize))
      .via(Flow[ByteString] map (_.utf8String))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接