Scheduler
。它还通过Mono.fromFuture(..)
提供了与使用CompletableFuture
的库之间的桥梁。AWS的DynamoDB异步客户端会在
java.util.concurrent.Executor
上执行从API调用返回的CompletableFuture
。默认情况下,它创建了一个由线程池支持的Executor
。结果是,即使是已定义Scheduler
的流(如Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())
),也会在库创建的线程池中执行,而不是在Schedulers.boundedElastic()
中的线程上执行。因此,我们看到的线程名称类似于sdk-async-response-0-2
,而不是类似于boundedElastic-1
的名称。幸运的是,该库允许我们提供自己的
Executor
,就像这里所示的那样,因此我的问题是:
用例 我们有一个存储库类,具有如何构建一个
Executor
,以便在运行时使用来自流的Scheduler
中的线程?
findById
方法,我们需要调用者能够控制要运行的Scheduler
,因为它在以下明显不同的上下文中使用:
- 在
Schedulers.boundedElastic()
调度程序上运行的API响应。 - 处理按顺序执行的Kafka消息,每个分区都在定义的调度程序上的单个线程上执行,如Reactor Kafka docs所示。
Schedulers.immediate()
和Runnable::run
来定义Executor
,但两者都会导致在Netty事件循环线程(例如名称:aws-java-sdk-NettyEventLoop-0-2
)上执行,而不是在定义的Scheduler
的线程上执行。DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
runnable -> Schedulers.immediate().schedule(runnable)
))
.build();
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
Runnable::run
))
.build();