Cats-effect和异步IO的特点

26

最近我一直在琢磨cats-effect和IO,但我觉得自己对这个effect有些误解,或者说我没有理解它的重点。

  1. 首先,如果IO能够替代Scala的Future,那么我们如何创建一个异步IO任务呢?使用 IO.shift?使用 IO.asyncIO.delay是同步还是异步的?我们能否使用以下代码创建通用的异步任务 Async[F].delay(...)?或者只有当我们使用unsafeToAsyncunsafeToFuture调用IO时才会发生异步操作?
  2. cats-effect中的Async和Concurrent有什么意义?为什么将它们分开?
  3. IO是绿色线程吗?如果是,为什么cats-effect中会有Fiber对象?正如我所理解的那样,Fiber就是绿色线程,但文档却声称我们可以把IO视为绿色线程。

我希望能就其中任何一个问题得到澄清,因为我未能理解cats-effect文档,并且互联网也没有提供太多帮助...

1个回答

66

如果IO可以替换Scala的Future,我们如何创建异步IO任务?

首先,我们需要澄清什么是“异步任务”。通常,“异步”意味着“不会阻塞操作系统线程”,但由于提到了Future,这有点模糊。比如,如果我写:

Future { (1 to 1000000).foreach(println) }

由于这是一个阻塞循环和阻塞输出,所以它不会是异步的,但可能会在另一个操作系统线程上执行,由隐式的ExecutionContext管理。cats-effect代码的等效部分如下:

for {
  _ <- IO.shift
  _ <- IO.delay { (1 to 1000000).foreach(println) }
} yield ()

所以,

  • IO.shift 用于可能更改线程/线程池。每次操作Future都会这样做,但从性能角度来看,它并不免费。
  • IO.delay { ... }(又名IO{ ... })不会使任何内容异步化,也不会切换线程。它用于从同步的副作用 API 创建简单的 IO 值。

现在,让我们回到真正的异步部分。这里需要理解的是:

     每个异步计算都可以表示为一个接受回调函数的函数。

无论您是使用返回Future或Java的CompletableFuture的API,还是类似于NIO的CompletionHandler,都可以将其转换为回调。这就是IO.async的用途:您可以将任何接受回调函数的函数转换为IO。在这种情况下:

for {
  _ <- IO.async { ... }
  _ <- IO(println("Done"))
} yield ()

只有在计算中调用回调函数时,Done 才会被打印。您可以将其视为阻止绿色线程,但不是操作系统线程。
所以,
  • IO.async 用于将任何已经异步化的计算转换为IO
  • IO.delay 用于将任何完全同步的计算转换为IO
  • 具有真正异步计算的代码的行为就像阻塞了一个绿色线程。
使用Future时最接近的类比是创建scala.concurrent.Promise并返回p.future

或者当我们使用unsafeToAsync或unsafeToFuture调用IO时会发生什么?

有点像。使用IO,除非您调用其中之一(或使用IOApp),否则什么也不会发生。但是,除非您使用IO.shiftIO.async显式请求,否则IO不能保证您会在不同的操作系统线程上执行,甚至是异步执行。
您可以随时使用例如(IO.shift *> myIO).unsafeRunAsyncAndForget()来保证线程切换。这是可能的,因为myIO只有在被要求时才会执行,无论您将其作为val myIO还是def myIO
但是,您不能神奇地将阻塞操作转换为非阻塞操作。这既不适用于Future,也不适用于IO

cats-effect中的Async和Concurrent的意义是什么?为什么它们被分开?

AsyncConcurrent(以及Sync)都是类型类。它们的设计是让程序员避免被锁定到cats.effect.IO,并可以给您支持您选择的任何其他API,例如monix Task或Scalaz 8 ZIO,甚至是OptionT[Task, *something*]之类的单子变换器类型。像fs2、monix和http4s这样的库利用它们来为您提供更多的选择。 ConcurrentAsync之上添加了额外的功能,其中最重要的是.cancelable.start。这些与Future没有直接的类比,因为它根本不支持取消。

.cancelable.async 的一种版本,它允许您指定一些逻辑来取消您正在封装的操作。一个常见的例子是网络请求 - 如果您不再对结果感兴趣,您可以立即中止它们,而不必等待服务器响应,并且不会浪费任何套接字或处理时间来读取响应。您可能永远不会直接使用它,但它有它的地方。

但如果您不能取消可取消的操作,那么这些操作有什么用呢?关键观察结果是,您无法从其自身内部取消操作。某个人必须做出决定,并且这将与操作本身同时进行(这就是类型类命名的原因)。这就是 .start 的用处。简而言之,

      .start 是绿色线程的显式 fork。

执行 someIO.start 就像执行 val t = new Thread(someRunnable); t.start() 一样,除了现在是绿色的。而 Fiber 实质上是 Thread API 的简化版本:您可以使用 .join,这类似于 Thread#join(),但它不会阻塞操作系统线程;和 .cancel,这是 .interrupt() 的安全版本。


请注意,还有其他方法可以 fork 绿色线程。例如,执行并行操作:

val ids: List[Int] = List.range(1, 1000)
def processId(id: Int): IO[Unit] = ???
val processAll: IO[Unit] = ids.parTraverse_(processId)

将所有ID都处理为绿色线程,然后将它们全部加入。或者使用.race:
val fetchFromS3: IO[String] = ???
val fetchFromOtherNode: IO[String] = ???

val fetchWhateverIsFaster = IO.race(fetchFromS3, fetchFromOtherNode).map(_.merge)

将会并行执行获取操作,给出第一个完成的结果并自动取消较慢的获取操作。因此,使用.startFiber不是分叉更多绿色线程的唯一方法,只是最明确的一种方式。这回答了以下问题:

IO是否是绿色线程?如果是,为什么cats-effect中还有Fiber对象?我理解的是Fiber是绿色线程,但文档声称我们可以将IO视为绿色线程。

  • IO就像一个绿色线程,这意味着您可以同时运行许多IO操作而无需OS线程的开销,并且在for-comprehension中的代码的行为就好像它正在等待结果计算。

  • Fiber是一个工具,用于显式控制分叉(等待完成或取消)的绿色线程。


你能详细说明一下你所说的第一个例子不是异步的含义吗?当使用类似于以下代码进行本地测试时:Future { (1 to 1000).foreach(println) }; Thread.sleep(1); println("Hello"),看起来事情正在异步执行,但也许我对我们在这里使用的异步定义还不太清楚。 - waffles
2
@uMdRupert 事情可能看起来会以非确定性的方式交错进行,但这种外观可以通过拥有足够的操作系统线程来调度 Futures 来实现。然而,操作系统线程具有相当大的开销,如果只有一个线程,则不会看到任何交错。我的意思是,未来的主体将在循环开始时声明一个线程,并且在循环完成之前不允许在该线程上执行任何其他工作。与 Future.traverse(1 to 1000)(a => Future(println(a))) 相比-这个应该会不时地释放一个线程,然后为更多的工作声明一个新线程。 - Oleg Pyzhcov
所有JVM线程都应该是绿色的吗?(不是操作系统发出的)你是指纤程吗? - tribbloid

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接