作为 Andreas 指出的,Consumer::andThen 是一个可交换函数,虽然产生的 consumer 可能具有不同的内部结构,但仍然是等价的。
但是让我们来进行调试。
public static void main(String[] args) {
performAllTasks(IntStream.range(0, 10)
.mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
reduced.accept(data);
System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
private final Consumer<Object> first, second;
private final boolean leaf;
DebuggableConsumer(String name) {
this(x -> System.out.println(name), x -> {}, true);
}
DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
first = a; second = b;
leaf = l;
}
public void accept(Object t) {
first.accept(t);
second.accept(t);
}
@Override public Consumer<Object> andThen(Consumer<? super Object> after) {
return new DebuggableConsumer(this, after, false);
}
public @Override String toString() {
if(leaf) return first.toString();
return toString(new StringBuilder(200), 0, 0).toString();
}
private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
int myHandle = sb.length()-2;
sb.append(leaf? first: "combined").append('\n');
if(!leaf) {
int nPreS=sb.length();
((DebuggableConsumer)first).toString(
sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
nPreS=sb.length();
sb.append(sb, preS, preEnd);
int lastItemHandle=sb.length();
((DebuggableConsumer)second).toString(sb.append(" "), nPreS, sb.length());
sb.setCharAt(lastItemHandle, '\u2514');
}
if(myHandle>0) {
sb.setCharAt(myHandle, '\u251c');
sb.setCharAt(myHandle+1, '\u2500');
}
return sb;
}
}
将会打印出来
0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─combined
│ │ │ ├─combined
│ │ │ │ ├─combined
│ │ │ │ │ ├─combined
│ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ │ ├─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
│ │ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
│ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
│ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
│ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
│ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
│ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
│ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
│ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
└─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a
将减少代码更改为
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
reduced.accept(data);
System.out.println(reduced);
}
在我的机器上打印
0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@49097b5d
│ │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6e2c634b
│ └─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@37a71e93
│ └─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7e6cbb7a
│ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7c3df479
└─combined
├─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7106e68e
│ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7eda2dbb
└─combined
├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6576fe71
└─combined
├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@76fb509a
└─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@300ffa5d
这段内容阐述了Andreas'回答的观点,但也突出了一个完全不同的问题。通过在示例代码中使用IntStream.range(0, 100)
等方法,你可以达到最大值。
与顺序评估创建不平衡树相比,并行评估的结果实际上更好。当接受任意一组消费者流时,这可能是一个真正的性能问题,或者甚至会导致在试图评估生成的消费者时发生StackOverflowError
。
对于任何数量不少的消费者,实际上您需要一个平衡的消费者树,但使用并行流不是正确的解决方案,因为a) Consumer::andThen
是一项廉价操作,没有从并行评估中获得真正的好处,b) 平衡取决于不相关的属性,例如流来源的性质和CPU核心数,这决定了规约何时退回到顺序算法。
当然,最简单的解决方案是
private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
consumers.forEachOrdered(c -> c.accept(data));
}
当您想构建一个用于重复使用的复合Consumer
时,可以使用以下方法:
private static final int ITERATION_THRESHOLD = 16;
public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
if(consumerList.isEmpty()) return t -> {};
if(consumerList.size() == 1) return consumerList.get(0);
if(consumerList.size() < ITERATION_THRESHOLD)
return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
if(end-start>2) {
int mid=(start+end)>>>1;
return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
}
T t = l.get(start++);
if(start<end) t = f.apply(t, l.get(start));
assert start==end || start+1==end;
return t;
}
代码将提供单一的
Consumer
,只需使用循环来处理超过阈值的消费者数量。 这是针对大量消费者最简单和最有效的解决方案。实际上,即使在较小的数字上,您也可以放弃所有其他方法,并仍然获得合理的性能...
请注意,如果流的构造确实从中受益,则这仍不会妨碍消费者的并行处理。
Stream<Consumer<T>> consumerStream
,这会带来一个UNORDERED
流的复杂性。但是,我仍然可以将流更改为遵循相遇顺序吗?(我希望我能够更好地解释) - NamanStream<Consumer<T>>
,即使有人传入了一个parallel
流或者显式调用了unordered()
方法,我仍然能够按照接收到的顺序精确地执行这些消费者。 - Eugene