使用流(函数式安全方式)对多个函数调用进行建模

4
假设有一个函数 A => IO[B](也称为 Kleisli[IO, A, B]),该函数旨在被多次调用,并具有副作用,例如更新数据库。如何将其多次调用委托给流(我猜测是 Pipe[IO, A, B])(fs2、monix observable/iterant)?这样做的原因是为了能够累积状态,在时间窗口内批量调用等。
更具体地说,http4s 服务器需要一个 Request => IO[Response],因此我正在寻找如何在流上操作(以获得上述好处),但最终向 http4s 提供这样的函数。
我怀疑它需要一些幕后关联 ID,我对此没问题,我更感兴趣的是如何从 FP 视角安全且正确地执行它。
最终,我期望的签名可能是: Pipe[IO, A, B] => (A => IO[B]),这样 Kleisli 的调用就会通过管道进行传递。
作为一个附加想法,是否有可能进行背压控制?

为什么 StateT 类型的变换器不适用?它们确切地提供了一种有状态计算的能力。 - Some Name
不,我考虑过 StateT 但它既不能满足需要具有透明状态的 Kleisli,也不能访问用于窗口操作的后续调用。 - V-Lamp
我觉得有点奇怪,你会积累状态以批处理HTTP服务的请求,并因此延迟每个请求。也许我没有理解正确的上下文,但是为什么要这样做呢? - Yuval Itzchakov
如果有帮助的话,akka http也将服务器建模为请求-响应的BidiFlow。这样建模有各种动机。 - V-Lamp
请纠正我,但是 Kleisli 的签名是 A => IO[B],但您想要的是 A => IO[A](链接)或 Seq[A] => IO[Seq[B]](批处理)。后者仅在效果实际支持 Seq 时才有用。 - Markus Appel
显示剩余2条评论
1个回答

1

一个想法是使用MPSC(多发布者单消费者)进行建模。我将以Monix为例进行说明,因为我比较熟悉它,但即使您使用FS2,这个想法也是一样的。

object MPSC extends App {

  sealed trait Event
  object Event {
    // You'll need a promise in order to send the response back to user
    case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
  }

  // For backpressure, take a look at `PublishSubject`.
  val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)

  def pushEvent(num: Int) = {
    for {
      promise <- Deferred[Task, Int]
      _ <- Task.delay(cs.onNext(SaveItem(num, promise)))
    } yield promise
  }

  // You get a list of events now since it is buffered
  // Monix has a lot of buffer strategies, check the docs for more details
  def processEvents(items: Seq[Event]): Task[Unit] = {
    Task.delay(println(s"Items: $items")) >>
      Task.traverse(items) {
        case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
      }.void
  }

  val app = for {
    // Start the stream in the background
    _ <- cs
      .bufferTimed(3.seconds) // Buffer all events within 3 seconds
      .filter(_.nonEmpty)
      .mapEval(processEvents)
      .completedL
      .startAndForget

    _ <- Task.sleep(1.second)
    p1 <- pushEvent(10)
    p2 <- pushEvent(20)
    p3 <- pushEvent(30)

    // Wait for the promise to complete, you'll do this for each request
    x <- p1.get
    y <- p2.get
    z <- p3.get

    _ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
  } yield ()

  app.runSyncUnsafe()
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接