如果IO可以替换Scala的Future,我们如何创建异步IO任务?
首先,我们需要澄清什么是“异步任务”。通常,“异步”意味着“不会阻塞操作系统线程”,但由于提到了Future
,这有点模糊。比如,如果我写:
Future { (1 to 1000000).foreach(println) }
由于这是一个阻塞循环和阻塞输出,所以它不会是异步的,但可能会在另一个操作系统线程上执行,由隐式的ExecutionContext管理。cats-effect代码的等效部分如下:
for {
_ <- IO.shift
_ <- IO.delay { (1 to 1000000).foreach(println) }
} yield ()
所以,
IO.shift
用于可能更改线程/线程池。每次操作Future
都会这样做,但从性能角度来看,它并不免费。
IO.delay
{ ... }(又名IO{ ... }
)不会使任何内容异步化,也不会切换线程。它用于从同步的副作用 API 创建简单的 IO
值。
现在,让我们回到真正的异步部分。这里需要理解的是:
每个异步计算都可以表示为一个接受回调函数的函数。
无论您是使用返回Future
或Java的CompletableFuture
的API,还是类似于NIO的CompletionHandler
,都可以将其转换为回调。这就是IO.async
的用途:您可以将任何接受回调函数的函数转换为IO
。在这种情况下:
for {
_ <- IO.async { ... }
_ <- IO(println("Done"))
} yield ()
只有在计算中调用回调函数时,
Done
才会被打印。您可以将其视为阻止绿色线程,但不是操作系统线程。
所以,
IO.async
用于将任何已经异步化的计算转换为IO
。
IO.delay
用于将任何完全同步的计算转换为IO
。
- 具有真正异步计算的代码的行为就像阻塞了一个绿色线程。
使用
Future
时最接近的类比是创建
scala.concurrent.Promise
并返回
p.future
。
或者当我们使用unsafeToAsync或unsafeToFuture调用IO时会发生什么?
有点像。使用
IO
,除非您调用其中之一(或使用
IOApp
),否则
什么也不会发生。但是,除非您使用
IO.shift
或
IO.async
显式请求,否则IO不能保证您会在不同的操作系统线程上执行,甚至是异步执行。
您可以随时使用例如
(IO.shift *> myIO).unsafeRunAsyncAndForget()
来保证线程切换。这是可能的,因为
myIO
只有在被要求时才会执行,无论您将其作为
val myIO
还是
def myIO
。
但是,您不能神奇地将阻塞操作转换为非阻塞操作。这既不适用于
Future
,也不适用于
IO
。
cats-effect中的Async和Concurrent的意义是什么?为什么它们被分开?
Async
和
Concurrent
(以及
Sync
)都是类型类。它们的设计是让程序员避免被锁定到
cats.effect.IO
,并可以给您支持您选择的任何其他API,例如monix Task或Scalaz 8 ZIO,甚至是
OptionT[Task, *something*]
之类的单子变换器类型。像fs2、monix和http4s这样的库利用它们来为您提供更多的选择。
Concurrent
在
Async
之上添加了额外的功能,其中最重要的是
.cancelable
和
.start
。这些与
Future
没有直接的类比,因为它根本不支持取消。
.cancelable
是 .async
的一种版本,它允许您指定一些逻辑来取消您正在封装的操作。一个常见的例子是网络请求 - 如果您不再对结果感兴趣,您可以立即中止它们,而不必等待服务器响应,并且不会浪费任何套接字或处理时间来读取响应。您可能永远不会直接使用它,但它有它的地方。
但如果您不能取消可取消的操作,那么这些操作有什么用呢?关键观察结果是,您无法从其自身内部取消操作。某个人必须做出决定,并且这将与操作本身同时进行(这就是类型类命名的原因)。这就是 .start
的用处。简而言之,
.start
是绿色线程的显式 fork。
执行 someIO.start
就像执行 val t = new Thread(someRunnable); t.start()
一样,除了现在是绿色的。而 Fiber
实质上是 Thread
API 的简化版本:您可以使用 .join
,这类似于 Thread#join()
,但它不会阻塞操作系统线程;和 .cancel
,这是 .interrupt()
的安全版本。
请注意,还有其他方法可以 fork 绿色线程。例如,执行并行操作:
val ids: List[Int] = List.range(1, 1000)
def processId(id: Int): IO[Unit] = ???
val processAll: IO[Unit] = ids.parTraverse_(processId)
将所有ID都处理为绿色线程,然后将它们全部加入。或者使用.race:
val fetchFromS3: IO[String] = ???
val fetchFromOtherNode: IO[String] = ???
val fetchWhateverIsFaster = IO.race(fetchFromS3, fetchFromOtherNode).map(_.merge)
将会并行执行获取操作,给出第一个完成的结果并自动取消较慢的获取操作。因此,使用.start
和Fiber
不是分叉更多绿色线程的唯一方法,只是最明确的一种方式。这回答了以下问题:
IO是否是绿色线程?如果是,为什么cats-effect中还有Fiber对象?我理解的是Fiber是绿色线程,但文档声称我们可以将IO视为绿色线程。
Future { (1 to 1000).foreach(println) }; Thread.sleep(1); println("Hello")
,看起来事情正在异步执行,但也许我对我们在这里使用的异步定义还不太清楚。 - wafflesFuture.traverse(1 to 1000)(a => Future(println(a)))
相比-这个应该会不时地释放一个线程,然后为更多的工作声明一个新线程。 - Oleg Pyzhcov