将RX Observable传递给actor (scala)是否安全?

7

我已经使用scala绑定了RX Java一段时间了,现在考虑将其与Akka Actors结合起来。 我想知道在Akka Actor之间传递RX Observable是否安全/可行。例如,一个打印20以内偶数平方的程序(每秒一次):

/* producer creates an observable and sends it to the worker */
object Producer extends Actor {
  val toTwenty : Observable[Int] = Observable.interval(1 second).take(20)

  def receive = {
    case o : Observable[Int] =>
      o.subscribe( onNext => println )
  }

  worker ! toTwenty
}


/* worker which returns squares of even numbers */
object Worker extends Actor {
  def receive = {
    case o : Observable[Int] => 
       sender ! o filter { _ % 2 == 0 } map { _^2 }
  }
}

(请将此视为伪代码;它不会编译)。注意我正在从一个actor发送Observables到另一个actor。我想了解:

  • Akka和RX是否会自动同步访问Observable?
  • Observable无法通过分布式系统发送-它是对本地内存中对象的引用。但是,在本地工作会起作用吗?
  • 假设在这个微不足道的例子中,工作将在Producer中的subscribe调用上进行调度。我能把工作分开,让每个actor单独完成吗?

离题:我看到一些项目试图结合RX和Actors:

http://jmhofer.johoop.de/?p=507https://github.com/jmhofer/rxjava-akka

但这些与仅在actor之间作为消息传递Observable的方式不同。他们首先调用subscribe()来获取值,然后将这些值发送到actor邮箱,并从中创建一个新的Observable。或者我错了吗?

3个回答

10

你的方法不是一个好主意。Akka的主要思想是将消息发送到一个actor的邮箱中,然后该actor按顺序(在一个线程上)处理这些消息。这样就不可能有2个线程访问一个actor的状态,也就不会出现并发问题。

在你的情况下,你在Observable上使用subscribe。你的onNext回调很可能在另一个线程上执行。因此,突然间有可能有2个线程可以访问你的actor的状态。所以你必须非常小心地处理你在回调中做的事情。这就是你对其他实现的最后观察的原因。那些实现似乎在onNext中获取值并将该值作为消息发送。你不能在这样的回调中更改actor的内部状态。而应该向同一actor发送一条消息。这样再次保证了单线程的顺序处理。


谢谢您的回复。您介意看一下我发布的第二个链接(github),并告诉我 README 中第一个代码示例在做什么吗?它以某种方式将可观察对象与演员同步,因此您可以在 receive 中使用它。这是否允许您在演员内部使用 Observable(即使无法将其作为消息传递)? - Luciano
嘿,你有Observable想要做什么的示例吗?第二个GitHub链接显示了一个整合,但是示例太简单了,只记录日志而不会引起任何副作用。 - mavilein

5
我花了一些时间来尝试,发现在Akka中可以使用Observable。实际上,由于我们可以把Observable看作是Future的多变量扩展,因此您可以遵循与组合Actor和Futures相同的指南。在正式文档和教科书(例如Akka Concurrency,Wyatt 2013)中都支持/鼓励使用Akka中的Future,但有很多注意事项。
首先是正面的方面:
  • ObservableFuture一样是不可变的,因此理论上传递消息应该是安全的。
  • Observable允许您指定执行上下文,非常类似于Future。这是使用Observable.observeOn(scheduler)完成的。您可以通过将Akka分派程序(例如system.dispatchercontext.dispatcher)传递给rx.lang.scala.ExecutorScheduler构造函数来从Akka的exec上下文创建调度程序。这应该确保它们同步。
  • 与上述内容相关的是,有一个针对即将推出的rx-scala增强版的改进(https://github.com/Netflix/RxJava/issues/815#issuecomment-38793433),可以隐式地指定observable的调度程序。
  • Futureask模式非常匹配。可以使用类似的模式来处理Observables(请参见本文末尾)。这也解决了向远程observables发送消息的问题。
现在是注意事项:
  • 它们具有与Future相同的问题。例如,请参阅页面底部:http://doc.akka.io/docs/akka/2.3.2/general/jmm.html。还可参阅Wyatt 2013年的Futures章节。
  • 正如@mavilein的答案中所述,这意味着Observable.subscribe()不应使用Actor的包含范围来访问其内部状态。例如,您不应在订阅中调用sender。相反,请将它存储为一个val,然后像下面的例子一样访问这个val。
  • Akka使用的调度程序的分辨率与Rx不同。默认分辨率为100毫秒(Wyatt 2013)。如果有人有过这可能引起的问题的经验,请在下面发表评论!
最后,我已经实现了Observable的“ask”模式等效物。它使用“toObservable”或“??”异步返回Observable,由临时Actor和“PublishSubject”支持。请注意,源发送的消息类型为“rx.lang.scala.Notification”,使用“materialize()”可以满足Observable合同中的“complete”和“error”状态。否则我们无法向汇流处发出这些状态的信号。然而,没有任何阻止您发送任意类型的消息;这些只会调用“onNext()”。Observable具有超时功能,如果在一定时间内未收到消息,则会停止并抛出超时异常。
使用方法如下:
import akka.pattern.RX
implicit val timeout = akka.util.Timeout(10 seconds)
case object Req

val system = ActorSystem("test")
val source = system.actorOf(Props[Source],"thesource")

class Source() extends Actor {
  def receive : Receive = {
     case Req =>
       val s = sender()
       Observable.interval(1 second).take(5).materialize.subscribe{s ! _}
  }
}

val obs = source ?? Req
obs.observeOn(rx.lang.scala.schedulers.ExecutorScheduler(system.dispatcher)).subscribe((l : Any) => println ("onnext : " + l.toString),
              (error : Throwable) => { error.printStackTrace ; system.shutdown() },
              () => { println("completed, shutting system down"); system.shutdown() })

并生成以下输出:
onnext : 0
onnext : 1
onnext : 2
onnext : 3
onnext : 4
completed, shutting system down

以下是源代码。这是AskSupport.scala的修改版本。

package akka.pattern

/*
 * File : RxSupport.scala
 * This package is a modified version of 'AskSupport' to provide methods to 
 * support RX Observables.
 */

import rx.lang.scala.{Observable,Subject,Notification}
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import akka.actor._
import scala.concurrent.ExecutionContext
import akka.util.Unsafe
import scala.annotation.tailrec
import akka.dispatch.sysmsg._

class RxTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) {
  def this(message: String) = this(message, null: Throwable)
  override def getCause(): Throwable = cause
}

trait RxSupport {
  implicit def toRx(actorRef : ActorRef) : RxActorRef = new RxActorRef(actorRef)
  def toObservable(actorRef : ActorRef, message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef ?? message
  implicit def toRx(actorSelection : ActorSelection) : RxActorSelection = new RxActorSelection(actorSelection)
  def toObservable(actorSelection : ActorSelection, message : Any)(implicit timeout : Timeout): Observable[Any] = actorSelection ?? message
}

final class RxActorRef(val actorRef : ActorRef) extends AnyVal {
  def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef match {
    case ref : InternalActorRef if ref.isTerminated =>
      actorRef ! message
      Observable.error(new RxTimeoutException(s"Recepient[$actorRef] has alrady been terminated."))
    case ref : InternalActorRef =>
      if (timeout.duration.length <= 0)
        Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorRef]"))
      else {
        val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorRef.toString)
        actorRef.tell(message, a)
        a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
      }
  }
  def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}

final class RxActorSelection(val actorSel : ActorSelection) extends AnyVal {
  def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorSel.anchor match {
    case ref : InternalActorRef =>
      if (timeout.duration.length <= 0)
        Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorSel]"))
      else {
        val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorSel.toString)
        actorSel.tell(message, a)
         a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
      }
    case _ => Observable.error(new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]"))
  }
  def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}


private[akka] final class RxSubjectActorRef private (val provider : ActorRefProvider, val result: Subject[Any]) extends MinimalActorRef {
  import RxSubjectActorRef._
  import AbstractRxActorRef.stateOffset
  import AbstractRxActorRef.watchedByOffset

  /**
   * As an optimization for the common (local) case we only register this RxSubjectActorRef
   * with the provider when the `path` member is actually queried, which happens during
   * serialization (but also during a simple call to `toString`, `equals` or `hashCode`!).
   *
   * Defined states:
   * null                  => started, path not yet created
   * Registering           => currently creating temp path and registering it
   * path: ActorPath       => path is available and was registered
   * StoppedWithPath(path) => stopped, path available
   * Stopped               => stopped, path not yet created
   */
  @volatile
  private[this] var _stateDoNotCallMeDirectly: AnyRef = _

  @volatile
  private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet

  @inline
  private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]]

  @inline
  private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean =
    Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy)

  @tailrec // Returns false if the subject is already completed
  private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match {
    case null => false
    case other => updateWatchedBy(other, other + watcher) || addWatcher(watcher)
  }

  @tailrec
  private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match {
    case null => ()
    case other => if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher)
  }

  @tailrec
  private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match {
    case null => ActorCell.emptyActorRefSet
    case other => if (!updateWatchedBy(other, null)) clearWatchers() else other
  }

  @inline
  private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)

  @inline
  private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
    Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)

  @inline
  private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)

  override def getParent: InternalActorRef = provider.tempContainer

  def internalCallingThreadExecutionContext: ExecutionContext =
    provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext

  /**
   * Contract of this method:
   * Must always return the same ActorPath, which must have
   * been registered if we haven't been stopped yet.
   */
  @tailrec
  def path: ActorPath = state match {
    case null =>
      if (updateState(null, Registering)) {
        var p: ActorPath = null
        try {
          p = provider.tempPath()
          provider.registerTempActor(this, p)
          p
        } finally { setState(p) }
      } else path
    case p: ActorPath       => p
    case StoppedWithPath(p) => p
    case Stopped =>
      // even if we are already stopped we still need to produce a proper path
      updateState(Stopped, StoppedWithPath(provider.tempPath()))
      path
    case Registering => path // spin until registration is completed
  }

  override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
    case Stopped | _: StoppedWithPath => provider.deadLetters ! message
    case _ =>
      if (message == null) throw new InvalidMessageException("Message is null")
      else
        message match {
          case n : Notification[Any] => n.accept(result)
          case other                 => result.onNext(other)
        }
  }

  override def sendSystemMessage(message: SystemMessage): Unit = message match {
    case _: Terminate                      => stop()
    case DeathWatchNotification(a, ec, at) => this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at))
    case Watch(watchee, watcher) =>
      if (watchee == this && watcher != this) {
        if (!addWatcher(watcher))
           // NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
          watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed = true, addressTerminated = false))
      } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
    case Unwatch(watchee, watcher) =>
      if (watchee == this && watcher != this) remWatcher(watcher)
      else System.err.println("BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, this))
    case _ =>
  }

  @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = state match {
    case Stopped | _: StoppedWithPath => true
    case _                            => false
  }

  @tailrec
  override def stop(): Unit = {
    def ensureCompleted(): Unit = {
      result.onError(new ActorKilledException("Stopped"))
      val watchers = clearWatchers()
      if (!watchers.isEmpty) {
        watchers foreach { watcher =>
          // NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
          watcher.asInstanceOf[InternalActorRef]
            .sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false))
        }
      }
    }
    state match {
      case null => // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
        if (updateState(null, Stopped)) ensureCompleted() else stop()
      case p: ActorPath =>
        if (updateState(p, StoppedWithPath(p))) { try ensureCompleted() finally provider.unregisterTempActor(p) } else stop()
      case Stopped | _: StoppedWithPath => // already stopped
      case Registering                  => stop() // spin until registration is completed before stopping
    }
  }
}

private[akka] object RxSubjectActorRef {
  private case object Registering
  private case object Stopped
  private final case class StoppedWithPath(path : ActorPath)

  def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): RxSubjectActorRef = {
    val result = Subject[Any]()
    new RxSubjectActorRef(provider, result)
    /*timeout logic moved to RxActorRef/Sel*/
  }
}
/*
 * This doesn't work, need to create as a Java class for some reason ...
final object AbstractRxActorRef {
    final val stateOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_stateDoNotCallMeDirectly"))
    final val watchedByOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_watchedByDoNotCallMeDirectly"))
}*/

package object RX extends RxSupport

更新于2015-09-10

在这里,我想增加一些更简单的代码来实现??运算符。这与上文略有不同,因为它不支持网络数据,并且返回Observable [Observable[A]],这使得同步响应更加容易。优点是它不会干扰Akka内部:

object TypedAskSupport {
  import scala.concurrent.Future
  import akka.actor.{ActorRef,ActorSelection}
  import scala.reflect.ClassTag

  implicit class TypedAskableActorRef(actor : ActorRef) {
    val converted : akka.pattern.AskableActorRef = actor
    def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
      converted.ask(topic).mapTo[Observable[R]]
    def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
      Observable.from (this.?[R](topic)(timeout))
    def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
      converted.ask(topic).asInstanceOf[Future[R]]
   def ??[R](topic : Request[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[R] =
      Observable.from { this.?[R](topic)(timeout) }
  }

  implicit class TypedAskableActorSelection(actor : ActorSelection) {
    val converted : akka.pattern.AskableActorSelection = actor
    def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
      converted.ask(topic).mapTo[Observable[R]]
    def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
      Observable.from (this.?[R](topic)(timeout))
    def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
      converted.ask(topic).asInstanceOf[Future[R]]
  }
}

4
自从我发布原始问题以来,rx-java和akka已经走了很长的路。
目前Akka Streams(页面中间)有一个发布候选版,我认为在某种程度上试图提供类似于rx-java的Observable的基本元素。
此外还有一个Reactive Streams的倡议,它看起来可以通过toPublishertoSubscriber方法在不同的这些原语之间提供互操作性;Akka streams实现了这个API,而java-rx也有一个扩展, 提供了这个接口。在这篇博客文章中可以找到在两者之间转换的示例,摘录如下:

// create an observable from a simple list (this is in rxjava style)
val first = Observable.from(text.split("\\s").toList.asJava);
// convert the rxJava observable to a publisher
val publisher = RxReactiveStreams.toPublisher(first);
// based on the publisher create an akka source
val source = PublisherSource(publisher);

然后你可以在一个演员内部安全地传递它们。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接