使用流和消费者链条,保证订单的顺序

6

目前的情况是,我们有以下列出的一组API:

Consumer<T> start();
Consumer<T> performDailyAggregates();
Consumer<T> performLastNDaysAggregates();
Consumer<T> repopulateScores();
Consumer<T> updateDataStore();

在这些任务中,我们的一个调度程序执行诸如以下任务:
private void performAllTasks(T data) {
    start().andThen(performDailyAggregates())
            .andThen(performLastNDaysAggregates())
            .andThen(repopulateScores())
            .andThen(updateDataStore())
            .accept(data);
}

在审查此内容时,我考虑采用更灵活的任务执行实现方式 1 ,它可能如下所示:

// NOOP in the context further stands for  'anything -> {}'
private void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    consumerList.reduce(NOOP, Consumer::andThen).accept(data);
}

现在我想到的重点是,Javadoc 明确说明了:

accumulator - 一个用于组合两个值的关联的、非干扰的、无状态函数

接下来我在考虑如何保证 Java8 流的处理顺序与遇到的顺序相同,可以参考这个问题:How to ensure order of processing in java8 streams?

好吧,从 List 生成的流会是有序的,除非在 reduce 之前将该流变为 parallel 流,否则下面的实现将可行。2

private void performAllTasks(List<Consumer<T>> consumerList, T data) {
    consumerList.stream().reduce(NOOP, Consumer::andThen).accept(data);
}

问:这个假设2是否成立?执行消费者的顺序是否总是按照原始代码中的顺序执行?

问:有没有可能以某种方式向调用方公开1,以执行任务?


1
“2”这个假设不应该包含“除非流被并行处理”的短语,因为解决方案只能是正确或错误的。依赖于顺序执行将是不正确的(我并不是说这是不正确的,答案更加复杂)。但是,“暴露1”是什么意思? - Holger
@Holger 我的意思是,我是否可以在我的服务中创建实现 1,并让其他使用它的服务传递一个 Stream<Consumer<T>> consumerStream,这会带来一个 UNORDERED 流的复杂性。但是,我仍然可以将流更改为遵循相遇顺序吗?(我希望我能够更好地解释) - Naman
3
我认为你想问的是:我的方法是否能接受 Stream<Consumer<T>>,即使有人传入了一个 parallel 流或者显式调用了 unordered() 方法,我仍然能够按照接收到的顺序精确地执行这些消费者。 - Eugene
@Eugene 实际上。谢谢您改进措辞。 - Naman
2
当流是无序的时候,就没有遇到顺序,也没有办法构建一个。假设你在顺序评估中展示的任何顺序都有特殊含义,这种假设基本上是错误的。然而,对于有序流,Andreas已经指出它可以工作。然而,这种方法还存在其他实际问题。我会整理一个答案。 - Holger
2个回答

9
作为 Andreas 指出的,Consumer::andThen 是一个可交换函数,虽然产生的 consumer 可能具有不同的内部结构,但仍然是等价的。
但是让我们来进行调试。
public static void main(String[] args) {
    performAllTasks(IntStream.range(0, 10)
        .mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
    private final Consumer<Object> first, second;
    private final boolean leaf;
    DebuggableConsumer(String name) {
        this(x -> System.out.println(name), x -> {}, true);
    }
    DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
        first = a; second = b;
        leaf = l;
    }
    public void accept(Object t) {
        first.accept(t);
        second.accept(t);
    }
    @Override public Consumer<Object> andThen(Consumer<? super Object> after) {
        return new DebuggableConsumer(this, after, false);
    }
    public @Override String toString() {
        if(leaf) return first.toString();
        return toString(new StringBuilder(200), 0, 0).toString();
    }
    private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
        int myHandle = sb.length()-2;
        sb.append(leaf? first: "combined").append('\n');
        if(!leaf) {
            int nPreS=sb.length();
            ((DebuggableConsumer)first).toString(
                sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
            nPreS=sb.length();
            sb.append(sb, preS, preEnd);
            int lastItemHandle=sb.length();
            ((DebuggableConsumer)second).toString(sb.append("  "), nPreS, sb.length());
            sb.setCharAt(lastItemHandle, '\u2514');
        }
        if(myHandle>0) {
            sb.setCharAt(myHandle, '\u251c');
            sb.setCharAt(myHandle+1, '\u2500');
        }
        return sb;
    }
}

将会打印出来

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─combined
│ │ │ ├─combined
│ │ │ │ ├─combined
│ │ │ │ │ ├─combined
│ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ │ ├─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
│ │ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
│ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
│ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
│ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
│ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
│ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
│ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
│ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
└─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a

将减少代码更改为

private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}

在我的机器上打印

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@49097b5d
│ │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6e2c634b
│ └─combined
│   ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@37a71e93
│   └─combined
│     ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7e6cbb7a
│     └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7c3df479
└─combined
  ├─combined
  │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7106e68e
  │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7eda2dbb
  └─combined
    ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6576fe71
    └─combined
      ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@76fb509a
      └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@300ffa5d

这段内容阐述了Andreas'回答的观点,但也突出了一个完全不同的问题。通过在示例代码中使用IntStream.range(0, 100) 等方法,你可以达到最大值。

与顺序评估创建不平衡树相比,并行评估的结果实际上更好。当接受任意一组消费者流时,这可能是一个真正的性能问题,或者甚至会导致在试图评估生成的消费者时发生StackOverflowError

对于任何数量不少的消费者,实际上您需要一个平衡的消费者树,但使用并行流不是正确的解决方案,因为a) Consumer::andThen 是一项廉价操作,没有从并行评估中获得真正的好处,b) 平衡取决于不相关的属性,例如流来源的性质和CPU核心数,这决定了规约何时退回到顺序算法。

当然,最简单的解决方案是

private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
    consumers.forEachOrdered(c -> c.accept(data));
}

当您想构建一个用于重复使用的复合Consumer时,可以使用以下方法:

private static final int ITERATION_THRESHOLD = 16; // tune yourself

public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
    List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
    if(consumerList.isEmpty()) return t -> {};
    if(consumerList.size() == 1) return consumerList.get(0);
    if(consumerList.size() < ITERATION_THRESHOLD)
        return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
    return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
    if(end-start>2) {
        int mid=(start+end)>>>1;
        return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
    }
    T t = l.get(start++);
    if(start<end) t = f.apply(t, l.get(start));
    assert start==end || start+1==end;
    return t;
}

代码将提供单一的Consumer,只需使用循环来处理超过阈值的消费者数量。 这是针对大量消费者最简单和最有效的解决方案。实际上,即使在较小的数字上,您也可以放弃所有其他方法,并仍然获得合理的性能...
请注意,如果流的构造确实从中受益,则这仍不会妨碍消费者的并行处理。

1
我无法理解你如何能够如此迅速地发布如此全面的答案,以及人们如何能够如此迅速地点赞它(我假设他们理解了它?)。 - Eugene
3
当我发表最后一条评论时,我已经在写答案了。当然,遇到这样的问题会有所帮助。比如说AWTEventMulticaster,即使在Java 1.1中也会遇到太多的监听器而导致问题。后来,在Java 8刚出来的时候,我在构建评估树时遇到了这个问题,所以我已经有了一种平衡规约代码的变体,只需要调整并测试它以用于这个答案。 - Holger
1
@Naman 我的假设是,“他”根本不是人类,而是某种混合型AI。答案非常有趣(开玩笑),从DebuggableConsumer中递归的toString开始,再到那个平衡的Split。我承认,balancedSplit看起来非常熟悉,就像ArrayList Spliterator一样...但是,这确实非常好。 - Eugene
@Holger 在 combineAllTasks 中对未使用的 data 进行了小修改 - Naman
1
@Eugene 我一直都有同样的想法 ;). 我需要一段时间才能完全理解答案的最后部分。像往常一样,非常好的回答。 - Thiyagu

4
即使将Stream<Consumer<T>>并行化,假定以下情况,结果的复合Consumer也会按顺序执行各个消费者:
  • Stream有序的。
    List创建的流是有序的,即使启用并行操作。

  • reduce()函数中传递的accumulator可结合的
    Consumer::andThen是可结合的。

例如,您有一个包含4个消费者的列表[A, B, C, D]。通常,没有并行处理的情况下,会发生以下情况:
x = A.andThen(B);
x = x.andThen(C);
compound = x.andThen(D);

使调用compound.apply()时按照ABCD的顺序进行调用。

如果启用了并行处理,流框架可能会将其拆分为2个线程进行处理,线程1处理[A, B],线程2处理[C, D]

这意味着以下情况将发生:

x = A.andThen(B);
y = C.andThen(D);
compound = x.andThen(y);

结果是先应用x,这意味着先执行A然后B,然后应用y,这意味着先执行C然后D
因此,尽管复合消费者的结构类似于[[A,B],[C,D]]而不是左关联的[[[A,B],C],D],但这4个消费者确实按顺序执行,这是因为Consumer::andThen关联的

非常感谢您的回答。我可能已经知道了顺序和结合部分,但是使用线程处理的示例验证可以更加容易地理解。不过需要指出的一点是,您肯定已经阅读了Holger的答案,我不能标记两个答案。再次非常感谢。 :) - Naman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接