Project Reactor的flatMap中关于线程的混淆

5

我正在尝试使用Project Reactor和响应式MongoDB存储库。我有以下代码:

@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
    @Id
    Integer id;
    String name;
}

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}

并且主要的 @SpringBootApplication 类:

@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {

    private final ReactivePersonRepository reactivePersonRepository;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveDatabaseApplication.class, args);
    }

    @PostConstruct
    public void postConstruct() {
        Scheduler single = Schedulers.newSingle("single-scheduler");
        IntStream.range(0, 10).forEach(i ->
                Flux.just(Person.builder()
                        .id(i)
                        .name("PersonName")
                        .build())
                        .flatMap(personToSave -> {
                            System.out.println(String.format(
                                    "Saving person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.save(personToSave);
                        })
                        //.publishOn(single)
                        .flatMap(savedPerson -> {
                            System.out.println(String.format(
                                    "Finding person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.findById(savedPerson.getId());
                        })
                        //.publishOn(single)
                        .flatMap(foundPerson -> {
                            System.out.println(String.format(
                                    "Deleting person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.deleteById(foundPerson.getId());
                        })
                        //.publishOn(single)
                        .subscribeOn(single)
                        .subscribe(aVoid -> System.out.println(String.format(
                                "Subscription from thread %s", Thread.currentThread().getName()))));
    }
}
Flux::subscribeOn方法描述如下:
放置此操作符任何位置,都将影响链中开始部分到下一个{@link publishOn(Scheduler)publishOn}出现之间的onNext / onError / onComplete信号的执行上下文。
这有点令人困惑,因为当处理链中没有指定publishOn时,线程名称的打印值为:
保存人员来自single-scheduler-1线程 - 如预期
从Thread-13查找人员
从Thread-6查找人员
从Thread-15查找人员
在Thread-6和Thread-5和Thread-4中删除人员
我不明白为什么会这样。subscribeOn方法指定的调度程序不应该用于每个flatMap执行吗?
当我取消注释publishOn行时,所有内容都由给定的单个调度程序执行,这是预期的。
有人能解释一下为什么没有publishOn时,单个调度程序未被flatMap操作使用吗?

在flatMap中,您会订阅由Reactive Mongo仓库提供的新Mono,该仓库可能在不同于其订阅的线程上发布数据。另一方面,publishOn强制确定数据向下传递的线程。这就是为什么在这种情况下它能够按'预期'工作的原因。你想切换线程的原因是什么?除非您执行某些阻塞操作,否则您实际上根本不需要担心这个问题。 - Martin Tarjányi
1个回答

5
这个人工制造的例子可能会更加清晰明了:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
        .flatMap(x -> {
            System.out.println(String.format(
                    "Saving person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Finding person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Deleting person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .subscribeOn(single)
        .subscribe(aVoid -> System.out.println(String.format(
        "Subscription from thread %s", Thread.currentThread().getName())));

这将会得到类似于以下的结果:

Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4

换句话说,您的响应式存储库没有在相同的调度程序上发布,这就是为什么您会看到当前行为。"直到下一个 publishOn() 发生"并不意味着下一次 您的 代码调用 publishOn() - 它也可能出现在您任何一个 flatMap() 调用的任何出版者中,而您将无法控制它们。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接