如何在 Reactor 中进行多线程文件处理

3
我正在尝试使用Reactor的Flux并行处理多个文件。主要工作负载发生在对flatMap的调用中,然后对Flux进行转换和过滤。
每当我尝试订阅结果Flux时,主线程在我接收到任何值之前退出。
Flux.fromStream(Files.list(Paths.get("directory"))
    .flatMap(path -> { 
        return Flux.create(sink -> {
            try (
                RandomAccessFile file = new RandomAccessFile(new File(path), "r");
                FileChannel fileChannel = file.getChannel()
            ) {
                // Process file into tokens
                sink.next(new Token(".."));
            } catch (IOException e) {
                sink.error(e);
            } finally {
                sink.complete();
            }
        }).subscribeOn(Schedulers.boundedElastic());
    })
    .map(token -> /* Transform tokens */)
    .filter(token -> /* Filter tokens*/)
    .subscribe(token -> /* Store tokens in list */)


我期望在列表中找到处理流程的输出,但程序立即退出。首先我想知道我是否正确使用了Flux类,其次我应该如何等待subscribe调用完成?
1个回答

6
我希望在我的列表中找到处理管道的输出,但程序立即退出。
你提供的代码在主线程上设置了反应链,然后在主线程上没有做任何其他工作。因此,主线程已经完成了它的工作。由于boundedElastic()线程是守护线程,所以没有其他线程阻止程序退出,因此程序退出。
你可以通过一个非常简单的例子看到相同的行为:
Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
            .delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);

你当然可以调用 newBoundedElastic("name", false) 来创建一个非 守护线程 后台任务调度器,但这样你就需要追踪它,并在使用完后手动释放,因此这只是将问题反转(程序一直运行直到你释放了调度器)。

快速且简单的解决方案是,在你的程序中作为最后一行对 Flux 的最后一个元素进行阻塞 -- 因此如果我们添加:

f.blockLast();

然后程序会等待最后一个元素发出后才退出,这就是我们想要的行为。对于一个简单的概念验证来说,这样是可以的。然而在“生产”代码中并不理想。首先,"无阻塞" 是响应式代码的一般规则,因此如果您像这样有阻塞调用,很难确定它是有意还是偶然。如果您添加其他链,并且也希望它们完成,您必须为每个链添加阻塞调用。这很混乱,而且不可持续。一个更好的解决方案是使用 CountDownLatch:
CountDownLatch cdl = new CountDownLatch(1);

Flux.just(1, 2, 3, 4, 5)
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> cdl.countDown())
        .subscribe(System.out::println);

cdl.await();

这种方法的优点是不需要显式地阻塞,也可以处理多个发布者(如果将初始值设置为大于1)。通常,这也是我看到的推荐方法 - 所以如果您想要最广泛接受的解决方案,那可能就是它。

然而,在所有需要等待多个发布者而不仅仅是一个的示例中,我更倾向于使用Phaser - 它的工作方式类似于CountdownLatch,但也可以动态地register()deregister()。这意味着您可以创建一个单一的 Phaser,然后在需要时轻松地将多个发布者注册到其中,而无需更改初始值,例如:

Phaser phaser = new Phaser(1);

Flux.just(1, 2, 3, 4, 5)
        .doOnSubscribe(s -> phaser.register())
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> phaser.arriveAndDeregister())
        .subscribe(System.out::println);

Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .doOnSubscribe(s -> phaser.register())
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> phaser.arriveAndDeregister())
        .subscribe(System.out::println);

phaser.arriveAndAwaitAdvance();

(如果需要,您当然可以将onSubscribedoFinally逻辑封装在一个单独的方法中。)

2
我真的看不出使用CountDownLatch与调用block有什么不同。我的意思是它还是会阻塞,不是吗?我曾经检查过Mono.block()的内部实现,据我记得,它也在里面使用了CountDownLatch。所以,为什么要引入另一种复杂性到我的代码中,而不是选择简单的阻塞呢? - Martin Tarjányi
@MartinTarjányi 如果你想等待多个发布者,它允许你从一个较大的数字开始倒计时,但除此之外,在功能上没有任何区别。这纯粹是一种风格选择。我倾向于同意;对于一个简单的例子,如果需要的话,我倾向于直接阻塞,而对于一个需要阻塞的多个发布者的更复杂的例子,我会直接选择使用phaser。但无论如何,这绝对是一个我见过的选项,不管好坏。 - Michael Berry

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接