我正在尝试使用Project Reactor和响应式MongoDB存储库。我有以下代码:
@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
@Id
Integer id;
String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}
并且主要的 @SpringBootApplication
类:
@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {
private final ReactivePersonRepository reactivePersonRepository;
public static void main(String[] args) {
SpringApplication.run(ReactiveDatabaseApplication.class, args);
}
@PostConstruct
public void postConstruct() {
Scheduler single = Schedulers.newSingle("single-scheduler");
IntStream.range(0, 10).forEach(i ->
Flux.just(Person.builder()
.id(i)
.name("PersonName")
.build())
.flatMap(personToSave -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.save(personToSave);
})
//.publishOn(single)
.flatMap(savedPerson -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.findById(savedPerson.getId());
})
//.publishOn(single)
.flatMap(foundPerson -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.deleteById(foundPerson.getId());
})
//.publishOn(single)
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName()))));
}
}
Flux::subscribeOn
方法描述如下:放置此操作符任何位置,都将影响链中开始部分到下一个{@link publishOn(Scheduler)publishOn}出现之间的onNext / onError / onComplete信号的执行上下文。
这有点令人困惑,因为当处理链中没有指定
publishOn
时,线程名称的打印值为:保存人员来自single-scheduler-1线程 - 如预期
从Thread-13查找人员
从Thread-6查找人员
从Thread-15查找人员
在Thread-6和Thread-5和Thread-4中删除人员
我不明白为什么会这样。
subscribeOn
方法指定的调度程序不应该用于每个flatMap
执行吗?当我取消注释
publishOn
行时,所有内容都由给定的单个调度程序执行,这是预期的。有人能解释一下为什么没有
publishOn
时,单个调度程序未被flatMap
操作使用吗?