如何创建一个源,通过方法调用后能接收元素?

62

我想创建一个Source,然后像下面这样推送元素:

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)

请问怎样做是推荐的方法?

谢谢!


2
这可能与以下链接相同:https://dev59.com/9F4b5IYBdhLWcg3wdxnG#29077212 - cmbaxter
1
@cmbaxter确实。虽然我更多地是想以一种方式通过向某个actor发送消息来提供流,而不需要实例化或拥有此actor的类。我相信可以使用Source.actorRef功能实现,就像您链接的帖子和这里所示的那样:https://dev59.com/cF0a5IYBdhLWcg3wCE3O。非常感谢) - ale64bit
您可能会发现这个答案有帮助: https://dev59.com/hFkS5IYBdhLWcg3wHTNy#44605821 - dimart.sp
3个回答

103

可以通过以下三种方式实现:

1. 使用SourceQueue进行后处理

您可以使用Source.queue将Flow材料化为SourceQueue

case class Weather(zipCode : String, temperature : Double, raining : Boolean)

val bufferSize = 100

//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val queue = Source.queue(bufferSize, overflowStrategy)
                  .filter(!_.raining)
                  .to(Sink foreach println)
                  .run() // in order to "keep" the queue Materialized value instead of the Sink's

queue offer Weather("02139", 32.0, true)

2. 使用Actor进行后处理

类似的问题和答案可以在这里找到,要点是将流作为ActorRef实例化并向该引用发送消息:

val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                .filter(!_.raining)
                .to(Sink foreach println )
                .run() // in order to "keep" the ref Materialized value instead of the Sink's

ref ! Weather("02139", 32.0, true)

3. 通过Actor进行预物化

同样地,您可以显式地创建一个包含消息缓冲区的Actor,使用该Actor创建一个Source,并按照这里的回答向该Actor发送消息:

object WeatherForwarder {
  def props : Props = Props[WeatherForwarder]
}

//see provided link for example definition
class WeatherForwarder extends Actor {...}

val actorRef = actorSystem actorOf WeatherForwarder.props 

//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true) 

//stream already has 1 Weather value to process which is sitting in the 
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}

3
@Loic的评论没有让我明白“带队列的预材料化”是第四种可能的解决方案。事实上,它是其中一种解决方案。我在这里找到了一个好的资源:https://dev59.com/V5bfa4cB1Zd3GeqPzt_0#37117205 - akauppi
@akauppi 在你发布的链接中,如果你使用 mapMaterializedValue,它会创建另一个源。他使用 Future 来获取他想要返回的源的队列。 - Guillaume Massé
1
@zt1983811,我从未尝试过你所指定的用例。 - Ramón J Romero y Vigil
我该如何做到这一点 https://stackoverflow.com/questions/44316740/akka-stream-sliding-window-to-control-reduce-emit-to-sink/44317588?noredirect=1#comment75647475_44317588 - zt1983811
因为我也使用队列将数据发送到源,但是一旦使用它,我就无法进行缩减。 - zt1983811
显示剩余2条评论

19
自Akka 2.5以来,Source有一个preMaterialize方法。
根据文档,这似乎是执行您要求的操作的指定方式:
引用如下: 在某些情况下,您需要在将Source连接到图的其余部分之前获得Source物化值。这在“物化值驱动”的源(例如Source.queueSource.actorRefSource.maybe)的情况下特别有用。
以下是使用SourceQueue的示例。在物化之前和之后以及从Flow内推送元素到队列:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()


val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()

// Adding element before actual materialization
sourceMat.offer("pre materialization element")

val flow = Flow[String].map { e =>
  if(!e.contains("new")) {
    // Adding elements from within the flow
    sourceMat.offer("new element generated inside the flow")
  }
  s"Processing $e"
}

// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()

// Adding element after materialization
sourceMat.offer("post materialization element")

输出:

Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow

我一直在努力创建一个队列的前置和后置材化。这个答案非常有用。感谢您的解释。 - codeviper
太棒了,这正是我想要的。谢谢! - fritteli

3

在尝试和寻找解决方案后,我发现了这个干净、简单且适用于材料化前后的解决方案。

https://dev59.com/cF0a5IYBdhLWcg3wCE3O#32553913
  val (ref: ActorRef, publisher: Publisher[Int]) =
    Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
      .toMat(Sink.asPublisher(true))(Keep.both).run()

  ref ! 1 //before

  val source = Source.fromPublisher(publisher)

  ref ! 2 //before
  Thread.sleep(1000)
  ref ! 3 //before

  source.runForeach(println)

  ref ! 4 //after
  Thread.sleep(1000)
  ref ! 5 //after

输出:

1
2
3
4
5

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接