Akka Streams: 流中的状态

21

我希望使用Akka Streams读取多个大文件,并使用每行的内容进行处理。假设每个键由一个(标识符->值)组成。如果找到新的标识符,我想要将它及其值保存在数据库中;否则,如果已经在流处理过程中找到了该标识符,则只保存值。为此,我认为需要某种递归状态流来将已经找到的标识符保存在Map中。我认为这个流会接收一个(newLine, contextWithIdentifiers)的对。

我刚开始学习Akka Streams。我觉得自己可以处理无状态的内容,但不知道如何保留contextWithIdentifiers。如果能指点一下方向就非常感激。


3
谢谢您的提问。这是一个简单的请求,但是找到有意义的答案和示例代码似乎比较复杂。这是我找到的唯一一个! - akauppi
2个回答

30

也许像 statefulMapConcat 这样的函数可以帮到你:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.util.Random._
import scala.math.abs
import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

//encapsulating your input
case class IdentValue(id: Int, value: String)
//some random generated input
val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))

val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
  //state with already processed ids
  var ids = Set.empty[Int]
  identValue => if (ids.contains(identValue.id)) {
    //save value to DB
    println(identValue.value)
    List(identValue)
  } else {
    //save both to database
    println(identValue)
    ids = ids + identValue.id
    List(identValue)
  }
}

Source(identValues)
  .via(stateFlow)
  .runWith(Sink.seq)
  .onSuccess { case identValue => println(identValue) }

谢谢你提供的代码。由于涉及() => ...工厂,我希望能在中间再添加一些类型信息。你知道为什么没有“.statefulMap”方法吗? - akauppi
1
我的问题1.5年前看起来很幼稚,现在让我回答它们。工厂方式并不会使任何事情变得复杂。它只是意味着代码可能会被调用多次。微不足道。没有“.statefulMap”,因为代码的工作是为每个传入条目(即List)提供0到n个条目,显然这些条目会被连接起来。要么我在'16年有一个糟糕的日子,要么我已经学到了一些东西。 - akauppi
1
哇,statefulMapConcat 看起来非常多才多艺。 - ig-dev

3
几年后,我写了一个实现,如果你只需要1对1的映射(而不是1对N),可以使用此实现:
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

object StatefulMap {
  def apply[T, O](converter: => T => O) = new StatefulMap[T, O](converter)
}

class StatefulMap[T, O](converter: => T => O) extends GraphStage[FlowShape[T, O]] {
  val in = Inlet[T]("StatefulMap.in")
  val out = Outlet[O]("StatefulMap.out")
  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    val f = converter
    setHandler(in, () => push(out, f(grab(in))))
    setHandler(out, () => pull(in))
  }
}

测试(和演示):

  behavior of "StatefulMap"

  class Counter extends (Any => Int) {
    var count = 0

    override def apply(x: Any): Int = {
      count += 1
      count
    }
  }

  it should "not share state among substreams" in {
    val result = await {
      Source(0 until 10)
        .groupBy(2, _ % 2)
        .via(StatefulMap(new Counter()))
        .fold(Seq.empty[Int])(_ :+ _)
        .mergeSubstreams
        .runWith(Sink.seq)
    }
    result.foreach(_ should be(1 to 5))
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接