Java流 - 带有前置操作和后置操作的forEach

3
使用Stream.forEach()时,我在想是否有可能在流不为空时添加预动作和后动作。例如,在打印列表时,可以在流为空时添加一些内容或写入其他内容。
现在我想到了以下解决方案:
private static <T> void forEach(Stream<T> stream, Consumer<? super T> action,
    Runnable preAction, Runnable postAction, Runnable ifEmpty) {
    AtomicBoolean hasElements = new AtomicBoolean(false);
    Consumer<T> preActionConsumer = x -> {
        if (hasElements.compareAndSet(false, true)) {
            preAction.run();
        }
    };
    stream.forEach(preActionConsumer.andThen(action));
    if (hasElements.get()) {
        postAction.run();
    } else {
        ifEmpty.run();
    }
}

对于顺序流,这应该可以工作,不是吗?这种方法是否正确,是否有“好主意”使用这种方法或者是否有任何注意事项?

对于并行流,这种方法不起作用,因为preAction可能比执行action的另一个线程慢,但是正确实现它而不借助于synchronized或其他破坏并行流目的的并发工具可能不容易...

编辑:添加用例。使用正则表达式从文件中读取搜索整数,并将它们写入另一个文件。使用这种方法,我不必在内存中创建一个字符串,然后将其写入某个文件。(显然,对于我的实际任务,我使用更复杂的正则表达式。)

public static void main(String[] args) throws IOException {
    Stream<String> lines = Files.lines(Paths.get("foo.txt"));

    Pattern findInts = Pattern.compile("(\\d+)");
    Path barFile = Paths.get("bar.txt");
    try (BufferedWriter writer = Files.newBufferedWriter(barFile , StandardOpenOption.CREATE_NEW)) {
        lines.flatMap(x -> findInts.matcher(x).results())
                .forEach(x-> convertCheckedIOException(() ->  {
                            writer.write(x.group(1));
                            writer.newLine();
                        })
                );
    }
}

public static void convertCheckedIOException(Run r) {
    try {
        r.run();
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

interface Run {
    void run() throws IOException;
}

2
看起来不错,除了 isEmpty 如果 Stream 不为空,则为 true... - tobias_k
好的,我想我会更改它,因为变量名不是问题的重点。 - user140547
@MalteHartwig:目标是仅在流不为空时执行预操作。否则,也可以在流外运行。 - user140547
1
装饰 forEach 明显是走错了方向。还有许多其他终端操作可能适合您的特定实际任务,例如 Collectors.joining 允许指定前缀和后缀。如果您能想象到的唯一适合实际任务的操作是 forEach,则流 API 可能不是该特定工作的正确工具。 - Holger
我明白了,我忽略了那个。我在下面的答案中添加了一个尝试,仅当元素存在时才执行 preAction。你可以尝试看看它是否适用于你。 - Malte Hartwig
显示剩余2条评论
2个回答

2

使用适合您工作的工具。该任务不受流API的益处。

Pattern intPattern = Pattern.compile("\\d+");
try(Scanner scanner = new Scanner(Paths.get("foo.txt"));
    BufferedWriter writer = Files.newBufferedWriter(Paths.get("bar.txt"), CREATE_NEW)) {

    String s = scanner.findWithinHorizon(intPattern, 0);
    if(s == null) {
        // perform empty action
    } else {
        // perform pre action
        do {
            writer.append(s);
            writer.newLine();
        } while( (s=scanner.findWithinHorizon(intPattern, 0)) != null);
        // perform post action
    }
}

您仍然可以使用流操作,例如:
Pattern intPattern = Pattern.compile("\\d+");
try(Scanner scanner = new Scanner(Paths.get("foo.txt"));
    BufferedWriter writer = Files.newBufferedWriter(Paths.get("bar.txt"), CREATE_NEW)) {

    String firstLine = scanner.findWithinHorizon(intPattern, 0);
    if(firstLine == null) {
        // perform empty action
    } else {
        // perform pre action
        Stream.concat(Stream.of(firstLine),
                      scanner.findAll(intPattern).map(MatchResult::group))
            .forEach(line -> convertCheckedIOException(() ->  {
                    writer.write(line);
                    writer.newLine();
                })
            );
        // perform post action
    }
}

但是,必须处理已检查的 IOException 只会使代码变得更加复杂,却没有任何好处。


1
我很喜欢有这样一个工具的想法。起初我认为使用第二个标志,由preaction设置/取消并停止操作可能已经足够了。但更复杂的是,每次调用操作时都会将preAction放在第一位,而不仅仅是第一次调用。

我提出了一个同步解决方案,强制执行顺序

pre
actions
post/empty
。一个注意点是,在第一个批处理平行线程中,它们中的第一个将需要等待完成,因为它们将遇到:。
private static <T> void forEach(Stream<T> stream, Consumer<? super T> action, Runnable preAction, Runnable postAction, Runnable ifEmpty)
{
    AtomicBoolean hasElements = new AtomicBoolean(false);

    stream.forEach(new Consumer<T>()
    {
        private Consumer<? super T> delegate = new Consumer<T>()
        {
            private Consumer<? super T> delegate2 = new Consumer<T>()
            {
                @Override
                public void accept(T x)
                {
                    System.out.println("check");
                    hasElements.set(true);
                    preAction.run();
                    action.accept(x);
                    delegate2 = action; // rest of first batch won't run preAction anymore
                    delegate = action; // next batches won't even synchronize anymore
                }
            };

            @Override
            public void accept(T x)
            {
                synchronized (this)
                {
                    delegate2.accept(x);
                }
            }
        };

        @Override
        public void accept(T x)
        {
            delegate.accept(x);
        }
    });

    if (hasElements.get()) { postAction.run(); } else { ifEmpty.run(); }
}

public static void main(String[] args)
{
    Stream<Integer> s = Stream.generate(() -> 1).limit(1000).parallel();
    forEach(s, i -> System.out.println(Thread.currentThread().getId()), () -> System.out.println("pre"),
            () -> System.out.println("post"), () -> System.out.println("empty"));
}

Output:
check
pre
...
many thread IDs
...
post

在创建如此复杂的解决方案之前,我希望能够从 OP 那里获得更多关于实际任务的澄清。例如,如果需要并行支持,请注意 forEach 不会按顺序运行,而 forEachOrdered 使同步变得过时。顺序执行也是如此。但是,如果以不同的顺序运行不是问题,则基于 Collector 的解决方案将更简单。 - Holger
我可以看出收集器如何在完成器中处理postActionempty,但是它如何处理preAction呢?它不也需要保持类似的状态吗? - Malte Hartwig
也许我需要再考虑一段时间,但首先要问的问题是是否真的值得花那么多时间去思考它,因为并不清楚这样的解决方案是否合适(或者根本不需要)。 - Holger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接