使用Rx Java,可以通过多种方式解决需求,同时仍然使用JDK中的DirectoryStream。
以下组合将给您所需的效果,我将按顺序解释它们:
方法1。使用flatMap()和defer()操作符进行递归处理
方法2。使用flatMap()和fromCallable操作符进行递归处理
注意:如果您将flatMap()的使用替换为concatMap(),则目录树导航将必须以深度优先搜索(DFS)的方式进行。使用flatMap(),不能保证DFS效果。
方法1:使用flatMap()和defer()
private Observable<Path> recursiveFileSystemNavigation_Using_Defer(Path dir) {
return Observable.<Path>defer(() -> {
try(DirectoryStream<Path> children = Files.newDirectoryStream(dir))
{
List<Path> subfolders = Observable.<Path>fromIterable(children)
.toList()
.blockingGet();
return Observable.<Path>fromIterable(subfolders)
.flatMap(p -> !isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_Using_Defer(p), Runtime.getRuntime().availableProcessors());
} catch (IOException e) {
return Observable.<Path>empty();
}
});
}
这种方法是通过查找给定目录的子文件和子目录,然后将它们作为Observables发出。如果子项是文件,则立即可供订阅者使用;否则,在
第X行上的flatMap()将递归调用该方法,并将每个子目录作为参数传递。对于每个这样的子目录,flatMap将在同一时间内内部订阅它们的所有子级。这就像一个需要控制的连锁反应。
因此,使用
Runtime.getRuntime().availableProcessors()设置了flatMap()的
最大并发级别,并防止其同时订阅所有子文件夹。如果不设置并发级别,想象一下当一个文件夹有1000个子项时会发生什么。
使用
defer()可以防止过早地创建DirectoryStream,并确保只有在真正订阅以查找其子文件夹时才会创建。
最后,该方法返回一个
Observable < Path >,以便客户端可以订阅并对结果执行有用的操作,如下所示:
//
// Using the defer() based approach
//
recursiveDirNavigation.recursiveFileSystemNavigation_Using_Defer(startingDir)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.subscribe(p -> System.out.println(p.toUri()))
使用defer()的缺点是,如果其参数函数抛出已检查异常,则无法很好地处理。因此,即使DirectoryStream (实现了Closeable)在try-resource块中创建,我们仍然需要捕获IOException,因为自动关闭DirectoryStream会抛出该已检查异常。
在使用基于Rx的样式时,为了处理错误,使用catch()块听起来有点奇怪,因为即使错误也作为事件发送到反应式编程中。因此,为什么不使用一种操作符将这些错误公开为事件呢。
Rx Java 2.x添加了一种更好的替代方案,名为fromCallable()。第二种方法展示了它的使用。
方法2.使用flatMap()和fromCallable操作符
此方法使用fromCallable()操作符,该操作符将Callable作为参数。由于我们希望采用递归方法,因此来自可调用对象的期望结果是给定文件夹的子级的Observable。由于我们希望订阅者在结果可用时接收结果,因此需要从此方法返回一个Observable。由于内部可调用对象的结果是Observables子级列表,因此净效果是Observables Observables。
private Observable<Observable<Path>> recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(Path dir) {
return Observable.<Observable<Path>> fromCallable(() -> traverse(dir))
.onErrorReturnItem(Observable.<Path>empty());
}
private Observable<Path> traverse(Path dir) throws IOException {
try(DirectoryStream<Path> children = Files.newDirectoryStream(dir))
{
List<Path> subfolders = Observable.<Path>fromIterable(children)
.toList()
.blockingGet();
return Observable.<Path>fromIterable(subfolders)
.flatMap(p -> ( !isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(p).blockingSingle())
,Runtime.getRuntime().availableProcessors());
}
}
订阅者将需要按照以下方式展开结果流:
recursiveDirNavigation.recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(startingDir)
.subscribeOn(Schedulers.io())
.flatMap(p -> p)
.observeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.subscribe(filePath -> System.out.println(filePath.toUri()));
在traverse()方法中,为什么X行使用阻塞Get?
因为递归函数返回一个Observable ,但是flatmap在该行需要一个可订阅的Observable。
两种方法中,Y行都使用了concatMap()。
因为如果我们不想在flatmap()内部订阅时并行执行,那么就可以舒适地使用concatMap()。
在两种方法中,方法isFolder的实现如下:
private boolean isFolder(Path p){
if(p.toFile().isFile()){
return false;
}
return true;
}
Java RX 2.0的Maven坐标
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.3</version>
</dependency>
Java文件中的导入
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;