如何使用Project Reactor的调度器与基于Executor的库?

12
Project Reactor提供了一种很好的方法来定义代码运行的线程池,即通过定义一个Scheduler。它还通过Mono.fromFuture(..)提供了与使用CompletableFuture的库之间的桥梁。
AWS的DynamoDB异步客户端会在java.util.concurrent.Executor上执行从API调用返回的CompletableFuture。默认情况下,它创建了一个由线程池支持的Executor。结果是,即使是已定义Scheduler的流(如Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())),也会在库创建的线程池中执行,而不是在Schedulers.boundedElastic()中的线程上执行。因此,我们看到的线程名称类似于sdk-async-response-0-2,而不是类似于boundedElastic-1的名称。
幸运的是,该库允许我们提供自己的Executor,就像这里所示的那样,因此我的问题是:

如何构建一个Executor,以便在运行时使用来自流的Scheduler中的线程?

用例 我们有一个存储库类,具有findById方法,我们需要调用者能够控制要运行的Scheduler,因为它在以下明显不同的上下文中使用:
  1. Schedulers.boundedElastic()调度程序上运行的API响应。
  2. 处理按顺序执行的Kafka消息,每个分区都在定义的调度程序上的单个线程上执行,如Reactor Kafka docs所示。
尝试 我们尝试使用Schedulers.immediate()Runnable::run来定义Executor,但两者都会导致在Netty事件循环线程(例如名称:aws-java-sdk-NettyEventLoop-0-2)上执行,而不是在定义的Scheduler的线程上执行。
DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        runnable -> Schedulers.immediate().schedule(runnable)
    ))
    .build();

DynamoDbAsyncClient.builder()
    .asyncConfiguration(builder -> builder.advancedOption(
        SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
        Runnable::run
    ))
    .build();

1
你为什么要尝试使用即时调度程序?那不会切换线程。试着使用其他调度程序,比如并行或弹性调度程序。 - Martin Tarjányi
我尝试使用即时调度器,因为我不希望它从流中定义的线程切换。关于使用并行或弹性调度器的建议,利用其中一个在执行器中会导致它始终被使用并忽略在流中设置的调度器。在我们的用例中,我们需要它根据流中定义的内容在不同的线程上运行。 - peterl
如果我理解正确,您想要切换事件循环线程。但是,这种机制不可能实现。我怀疑这根本不可能。如果您在项目的GitHub存储库中打开一个问题,您可能会有更好的机会得到答案。 - Martin Tarjányi
不,我不想切换事件循环线程,也不想在它们上运行应用程序逻辑。我希望在运行时将DynamoDB逻辑运行在作为_active_调度程序的一部分定义的线程上。因此,如果活动线程是弹性线程池中的一个线程,或者是为该流程部分定义的单个线程,则希望在其上执行DynamoDB逻辑。 - peterl
1个回答

33

第一部分 观察 vs. 订阅

看到这个问题,我认为需要在特定线程上执行后观察元素。 确切地说,在这个上下文中,观察的意思是*能够在某个特定线程上处理流中的值。在 RxJava 中,我们有一个称为这样的适当运算符,但在 Project Reactor 中,我们将相同的操作称为publishOn

因此, *如果要处理数据*Schedulers.boundedElastic()上,那么应该使用以下结构

Mono.fromFuture(..)
    .publishOn(Schedulers.boundedElastic())

等等,.subscribeOn 也可以起作用吗???

看到前面的构造,你可能会感到担忧,因为你100%确定

Mono.fromRunnable(..)
    .subscribeOn(Schedulers.boundedElastic())

发送onNext到线程boundedElastic-1,那么同样的fromFuture有什么问题呢?
这里有一个技巧:

永远不要在Futures / CompletableFuture或任何可以使用自己的异步机制的东西中使用subscribeOn

如果我们看一下subscribeOn背后发生了什么,你会发现类似以下的内容:
//  Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Scheduler scheduler;
    Publisher<T> parent;
    scheduler.schedule(() -> parent.subscribe(actual));
}

这基本上意味着父级的 subscribe 方法将在单独的线程上被调用。
这种技术适用于 fromRunnablefromSupplierfromCallable,因为它们的逻辑发生在 subscribe 方法中。
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
    Operators.MonoSubscriber<T, T>
    sds = new Operators.MonoSubscriber<>(actual);

    actual.onSubscribe(sds);
    // skiped some parts 
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
}

这意味着它几乎等同于。
scheduler.schedule(() -> {
    T t = supplier.get();
    if (t == null) {
        sds.onComplete();
    }
    else {
        sds.complete(t);
    }
})

相比之下,fromFuture 的工作更加复杂。

一个简短的问题:

我们在哪个线程上可以观察到值?(假设执行在主线程上,并且任务正在ForkJoinPool上执行)
var future = CompletableFuture
.supplyAsync(() -> {
  return value;
})
... // some code here, does not metter just code

future.thenAccept(value -> {
  System.out.println(Thread.currentThread())
});

正确答案可能是Thread Main,也可能是来自ForkJoinPool的线程...因为它存在竞争条件...在我们消费值的时候,该值可能已经被传递,所以我们只需在读取线程(主线程)上读取volatile字段,否则,线程Main将会设置acceptor,因此acceptor稍后将在ForkJoinPool线程上调用。

没错,这就是为什么当您使用fromFuturesubscribeOn时,不能保证subscribeOn线程会观察给定CompletableFuture的值。

这就是为什么publishOn是确保值处理发生在所需线程上的唯一方法。

好吧,我应该一路使用publishOn吗?

是和不是。这取决于情况。

如果您使用Mono - 在99%的情况下,如果要确保数据处理在特定线程上进行-始终使用publishOn

不要担心潜在的开销,即使您意外地使用了它,Project Reactor也会替您处理。 Project Reactor具有几种优化方式,可以在运行时将您的publishOn替换为subscribeOn(如果安全而不会破坏行为),以便您获得最佳效果。

第二部分。掉进Schedulers的兔子洞

永远不要使用Schedulers.immediate()

它几乎是没有操作的调度程序,基本上只做了...

Schedulers.immediate().scheduler(runnable) {
   runnable.run()
}

没错,它对反应堆用户没有任何用处,我们仅在内部需要时使用它。

好的,那么我如何在命令式世界中使用调度程序作为执行器呢?

有两个选项:

快速路径:逐步指南

1.a) 创建有界的 Executor。(例如 Executors.fixed...
1.b) 如果您想获得周期任务和延迟任务的能力,则创建有界的 ScheduledExecutorService
2) 使用 Schedulers.fromExecutorXXX API 从您的执行器创建一个 Scheduler
3) 在命令式世界中使用您的有界 Executor,在反应式世界中使用包装有界执行器的 Scheduler

长路径

即将推出...

第三部分。如何序列化执行。

即将推出


2
嘿@Oleh,回答得非常好!对于初学者来说,这是超级不明显的东西,老实说,在文档中应该更清楚地说明。 - borowis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接