Java 8流:混合两个元素。

9

我有一个类型为 Slot 的对象数组列表。

Slot 类如下所示-

Slot{
   int start;
   int end;
}

将类型为List<Slot>的列表称为slots。这些插槽基于开始时间进行排序。一个插槽的结束时间可能等于下一个插槽的开始时间,但它们永远不会重叠。

是否有可能使用Java 8流迭代此列表,并在一个插槽的结束时间与下一个插槽的开始时间匹配时将它们组合起来并输出到ArrayList中?


3
“记住上一个元素”并不适用于Stream流的操作。更好的解决方案几乎肯定是使用一个简单的循环。 - Louis Wasserman
@LouisWasserman 噢,你是对的。我错过了“Java 8 streams”部分(可能是由于问题中缺少适当的标签而导致的脑暴 - 现在已经修复)。 - Pshemo
3
你可以使用reduce()方法来完成,其中U是另一个List<Slot>类型的对象。但与使用for循环相比,这种方法会更加复杂,除非需要进行并行处理。注意不要改变原意。 - Andreas
流 API 是必需的吗?您看过 Guava Ranges 吗?(https://github.com/google/guava/wiki/RangesExplained) - LoganMzz
7个回答

5

由于这些类型的问题经常出现,我认为写一个收集器来按谓词分组相邻元素可能是一种有趣的练习。

假设我们可以给Slot类添加合并逻辑。

boolean canCombine(Slot other) {
    return this.end == other.start;
}

Slot combine(Slot other) {
    if (!canCombine(other)) {
        throw new IllegalArgumentException();
    }
    return new Slot(this.start, other.end);
}

然后可以使用groupingAdjacent收集器,如下:

List<Slot> combined = slots.stream()
    .collect(groupingAdjacent(
        Slot::canCombine,         // test to determine if two adjacent elements go together
        reducing(Slot::combine),  // collector to use for combining the adjacent elements
        mapping(Optional::get, toList())  // collector to group up combined elements
    ));

另外,第二个参数可以是collectingAndThen(reducing(Slot::combine), Optional::get),第三个参数是toList()

下面是groupingAdjacent的源代码。它能处理null元素并且支持并行操作。如果再花费一点精力,也可以使用Spliterator来实现类似的功能。

public static <T, AI, I, AO, R> Collector<T, ?, R> groupingAdjacent(
        BiPredicate<? super T, ? super T> keepTogether,
        Collector<? super T, AI, ? extends I> inner,
        Collector<I, AO, R> outer
) {
    AI EMPTY = (AI) new Object();

    // Container to accumulate adjacent possibly null elements.  Adj can be in one of 3 states:
    // - Before first element: curGrp == EMPTY
    // - After first element but before first group boundary: firstGrp == EMPTY, curGrp != EMPTY
    // - After at least one group boundary: firstGrp != EMPTY, curGrp != EMPTY
    class Adj {

        T first, last;     // first and last elements added to this container
        AI firstGrp = EMPTY, curGrp = EMPTY;
        AO acc = outer.supplier().get();  // accumlator for completed groups

        void add(T t) {
            if (curGrp == EMPTY) /* first element */ {
                first = t;
                curGrp = inner.supplier().get();
            } else if (!keepTogether.test(last, t)) /* group boundary */ {
                addGroup(curGrp);
                curGrp = inner.supplier().get();
            }
            inner.accumulator().accept(curGrp, last = t);
        }

        void addGroup(AI group) /* group can be EMPTY, in which case this should do nothing */ {
            if (firstGrp == EMPTY) {
                firstGrp = group;
            } else if (group != EMPTY) {
                outer.accumulator().accept(acc, inner.finisher().apply(group));
            }
        }

        Adj merge(Adj other) {
            if (other.curGrp == EMPTY) /* other is empty */ {
                return this;
            } else if (this.curGrp == EMPTY) /* this is empty */ {
                return other;
            } else if (!keepTogether.test(last, other.first)) /* boundary between this and other*/ {
                addGroup(this.curGrp);
                addGroup(other.firstGrp);
            } else if (other.firstGrp == EMPTY) /* other container is single-group. prepend this.curGrp to other.curGrp*/ {
                other.curGrp = inner.combiner().apply(this.curGrp, other.curGrp);
            } else /* other Adj contains a boundary.  this.curGrp+other.firstGrp form a complete group. */ {
                addGroup(inner.combiner().apply(this.curGrp, other.firstGrp));
            }
            this.acc = outer.combiner().apply(this.acc, other.acc);
            this.curGrp = other.curGrp;
            this.last = other.last;
            return this;
        }

        R finish() {
            AO combined = outer.supplier().get();
            if (curGrp != EMPTY) {
                addGroup(curGrp);
                assert firstGrp != EMPTY;
                outer.accumulator().accept(combined, inner.finisher().apply(firstGrp));
            }
            return outer.finisher().apply(outer.combiner().apply(combined, acc));
        }
    }
    return Collector.of(Adj::new, Adj::add, Adj::merge, Adj::finish);
}

5
这种情况可以完美地由我免费的StreamEx库支持,该库增强了标准的Stream API。有一个intervalMap中间操作,可以将多个相邻的流元素合并为单个元素。以下是完整示例:
// Slot class and sample data are taken from @Andreas answer
List<Slot> slots = Arrays.asList(new Slot(3, 5), new Slot(5, 7), 
                new Slot(8, 10), new Slot(10, 11), new Slot(11, 13));

List<Slot> result = StreamEx.of(slots)
        .intervalMap((s1, s2) -> s1.end == s2.start,
                     (s1, s2) -> new Slot(s1.start, s2.end))
        .toList();
System.out.println(result);
// Output: [3-7, 8-13]

intervalMap 方法接受两个参数。第一个是 BiPredicate,它接受来自输入流的两个相邻元素,并在它们必须合并时返回 true(这里的条件是 s1.end == s2.start)。第二个参数是一个 BiFunction,它从合并序列的第一个和最后一个元素中提取结果元素。

请注意,如果您有100个相邻的插槽应该合并为一个,这个解决方案不会创建100个中间对象(就像 @Misha 的答案一样,尽管非常有趣),它会立即跟踪系列中的第一个和最后一个插槽,忘记中间的插槽。当然,这个解决方案是并行友好的。如果您有成千上万的输入插槽,使用 .parallel() 可能会提高性能。

请注意,当前实现将重新创建 Slot,即使它没有与任何东西合并。在这种情况下,BinaryOperator 会两次接收相同的 Slot 参数。如果您想优化此情况,可以进行额外的检查,例如 s1 == s2 ? s1 : ...

List<Slot> result = StreamEx.of(slots)
        .intervalMap((s1, s2) -> s1.end == s2.start,
                     (s1, s2) -> s1 == s2 ? s1 : new Slot(s1.start, s2.end))
        .toList();

1
你让我感到羞愧,因为我没有做出一个正确的实现... - Bohemian

4
您可以使用 reduce() 方法,并使 U 成为另一个 List<Slot>,但除非需要并行处理,否则比使用 for 循环更加复杂。

有关测试设置,请参见答案末尾。

以下是 for 循环实现:

List<Slot> mixed = new ArrayList<>();
Slot last = null;
for (Slot slot : slots)
    if (last == null || last.end != slot.start)
        mixed.add(last = slot);
    else
        mixed.set(mixed.size() - 1, last = new Slot(last.start, slot.end));

输出

[3-5, 5-7, 8-10, 10-11, 11-13]
[3-7, 8-13]

这是流reduce的实现:

List<Slot> mixed = slots.stream().reduce((List<Slot>)null, (list, slot) -> {
    System.out.println("accumulator.apply(" + list + ", " + slot + ")");
    if (list == null) {
        List<Slot> newList = new ArrayList<>();
        newList.add(slot);
        return newList;
    }
    Slot last = list.get(list.size() - 1);
    if (last.end != slot.start)
        list.add(slot);
    else
        list.set(list.size() - 1, new Slot(last.start, slot.end));
    return list;
}, (list1, list2) -> {
    System.out.println("combiner.apply(" + list1 + ", " + list2 + ")");
    if (list1 == null)
        return list2;
    if (list2 == null)
        return list1;
    Slot lastOf1 = list1.get(list1.size() - 1);
    Slot firstOf2 = list2.get(0);
    if (lastOf1.end != firstOf2.start)
        list1.addAll(list2);
    else {
        list1.set(list1.size() - 1, new Slot(lastOf1.start, firstOf2.end));
        list1.addAll(list2.subList(1, list2.size()));
    }
    return list1;
});

输出

accumulator.apply(null, 3-5)
accumulator.apply([3-5], 5-7)
accumulator.apply([3-7], 8-10)
accumulator.apply([3-7, 8-10], 10-11)
accumulator.apply([3-7, 8-11], 11-13)
[3-5, 5-7, 8-10, 10-11, 11-13]
[3-7, 8-13]

将其更改为并行处理(多线程):

List<Slot> mixed = slots.stream().parallel().reduce(...

输出

accumulator.apply(null, 8-10)
accumulator.apply(null, 3-5)
accumulator.apply(null, 10-11)
accumulator.apply(null, 11-13)
combiner.apply([10-11], [11-13])
accumulator.apply(null, 5-7)
combiner.apply([3-5], [5-7])
combiner.apply([8-10], [10-13])
combiner.apply([3-7], [8-13])
[3-5, 5-7, 8-10, 10-11, 11-13]
[3-7, 8-13]

注意事项

如果slots是一个空列表,则for循环版本的结果是一个空列表,而流版本的结果是一个null值。


测试设置

以上所有代码使用了以下Slot类:

class Slot {
    int start;
    int end;
    Slot(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    public String toString() {
        return this.start + "-" + this.end;
    }
}
slots变量被定义为:
List<Slot> slots = Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13));

使用以下方式打印slots和结果mixed:

System.out.println(slots);
System.out.println(mixed);

我还没有分析过,但根据@Bohemian的说法,它应该要简单得多。 - Puce
@Puce 你说得对,Bohemian的版本更小,但它不支持并行处理。我超过一半的代码都是为了支持并行处理。 - Andreas
感谢您的澄清。 - Puce

3

这是一个两行的代码:

List<Slot> condensed = new LinkedList<>();
slots.stream().reduce((a,b) -> {if (a.end == b.start) return new Slot(a.start, b.end); 
  condensed.add(a); return b;}).ifPresent(condensed::add);

如果slot的字段不可见,您需要将a.end更改为a.getEnd()等等。
一些带有一些边缘情况的测试代码:
List<List<Slot>> tests = Arrays.asList(
        Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13)),
        Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(12, 13)),
        Arrays.asList(new Slot(3, 5), new Slot(5, 7)),
        Collections.emptyList());
for (List<Slot> slots : tests) {
    List<Slot> condensed = new LinkedList<>();
    slots.stream().reduce((a, b) -> {if (a.end == b.start) return new Slot(a.start, b.end);
        condensed.add(a); return b;}).ifPresent(condensed::add);
    System.out.println(condensed);
}

输出:

[3-7, 8-13]
[3-7, 8-11, 12-13]
[3-7]
[]

一如既往地,应该注意到这种解决方案违反了reduce方法的契约,并且不适合并行处理。如果在Stream API中实现了foldLeft(accumulator),这种解决方案将更加可靠... - Tagir Valeev
@Tagir在这种情况下,保持顺序至关重要,因此它永远不会被并行化。但是,“违规”折衷似乎是一个很好的交易,可以将任务简化为单个相对琐碎的lambda表达式。我喜欢你的解决方案 - 我之前没有注意到intervalMap() - 很棒! - Bohemian
1
实际上,所有其他解决方案(不仅仅是我的)都支持并行处理。并行流仍然可以保持顺序,您只需要能够将任务分成独立的部分(例如列表的前半部分,然后是后半部分),独立地处理它们,然后将结果合并在一起。我的解决方案虽然最灵活,因为它作为中间操作工作,但实际上在代码下面有非常大的代码块。如果没有第三方库Misha和Holger的解决方案也很好,尽管它们终止了流。你的解决方案可能赢得了“最佳规则滥用”类别的奖项 :-) - Tagir Valeev

3

一种干净(并行安全)的解决方案,不需要任何新方法:

List<Slot> condensed = slots.stream().collect(LinkedList::new,
  (l, s) -> l.add(l.isEmpty() || l.getLast().end != s.start ?
    s : new Slot(l.removeLast().start, s.end)),
  (l, l2) -> {if (!l.isEmpty() && !l2.isEmpty() && l.getLast().end == l2.getFirst().start) {
    l.add(new Slot(l.removeLast().start, l2.removeFirst().end));} l.addAll(l2);});

在合并 Slots 时,使用更适合的列表实现 LinkedList 可以简化删除和访问列表中最后一个元素的操作。


List<List<Slot>> tests = Arrays.asList(
            Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13)),
            Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(12, 13)),
            Arrays.asList(new Slot(3, 5), new Slot(5, 7)),
            Collections.emptyList());
for (List<Slot> slots : tests) {
    List<Slot> condensed = slots.stream().collect(LinkedList::new,
      (l, s) -> l.add(l.isEmpty() || l.getLast().end != s.start ?
        s : new Slot(l.removeLast().start, s.end)),
      (l, l2) -> {if (!l.isEmpty() && !l2.isEmpty() && l.getLast().end == l2.getFirst().start) {
        l.add(new Slot(l.removeLast().start, l2.removeFirst().end));} l.addAll(l2);});
    System.out.println(condensed);
}

输出:

[[3, 7], [8, 13]]
[[3, 7], [8, 11], [12, 13]]
[[3, 7]]
[]

2
如果您在Slot类中添加以下方法:
public boolean join(Slot s) {
    if(s.start != end)
        return false;
    end = s.end;
    return true;
}

您可以使用标准API进行整个操作,方法如下:
List<Slot> result = slots.stream().collect(ArrayList::new,
    (l, s)-> { if(l.isEmpty() || !l.get(l.size()-1).join(s)) l.add(s); },
    (l1, l2)-> l1.addAll(
        l1.isEmpty()||l2.isEmpty()||!l1.get(l1.size()-1).join(l2.get(0))?
        l2: l2.subList(1, l2.size()))
);

这遵循API的契约(不像滥用reduce),因此与并行流无缝协作(尽管您需要非常大的源列表才能从并行执行中受益)。
然而,上面的解决方案使用了Slot的原地连接,仅在您不再需要源列表/项时才可接受。否则,或者如果您仅使用不可变的Slot实例,则必须创建新的Slot实例来表示连接的插槽。
一个可能的解决方案如下:
BiConsumer<List<Slot>,Slot> joinWithList=(l,s) -> {
    if(!l.isEmpty()) {
        Slot old=l.get(l.size()-1);
        if(old.end==s.start) {
            l.set(l.size()-1, new Slot(old.start, s.end));
            return;
        }
    }
    l.add(s);
};
List<Slot> result = slots.stream().collect(ArrayList::new, joinWithList,
    (l1, l2)-> {
        if(!l2.isEmpty()) {
            joinWithList.accept(l1, l2.get(0));
            l1.addAll(l2.subList(1, l2.size()));
        }
    }
);

0
这是一个可以用来帮助进行邻居合并流操作的库函数。
/**
 * Returns a {@link Collector} that returns a list which can combine elements of
 * the incoming stream into merged elements.  (Call stream() again on the list to
 * continue stream processing.)
 * @param testNeighbors Predicate that tests the preceding and next elements, returning
 *  true if they should be combined
 * @param combineNeighbors Operator that combines two neighboring items which should
 *  be combined
 * @param <R> type of the elements
 */
public static <R> Collector<R,LinkedList<R>,LinkedList<R>> combineNeighborsCollector(
        BiPredicate<R,R> testNeighbors,
        BinaryOperator<R> combineNeighbors) {
    return Collector.of(
        LinkedList::new,
        (LinkedList<R> list1, R next) -> {
            // Can't combine if list is empty.
            if (! list1.isEmpty() && testNeighbors.test(list1.getLast(), next)) {
                R lCombined = combineNeighbors.apply(list1.getLast(), next);
                list1.removeLast();
                list1.add(lCombined);
            } else {
                list1.add(next);
            }
        },
        (LinkedList<R> list1, LinkedList<R> list2) -> {
            // Can't combine if either list is empty.
            while (! list1.isEmpty() && ! list2.isEmpty()
                    && testNeighbors.test(list1.getLast(), list2.getFirst())) {
                R last = list1.getLast();
                R next = list2.getFirst();
                list1.removeLast();
                list2.removeFirst();
                list1.add(combineNeighbors.apply(last, next));
            }
            list1.addAll(list2);
            return list1;
        });
}

测试一下:
record Slot(int start, int end) {
    @Override public String toString() { return "[%d,%d]".formatted(start,end); }
}

public static final Collector<Slot, LinkedList<Slot>, LinkedList<Slot>>
SLOT_COMBINING_COLLECTOR = 
    combineNeighborsCollector(
        (Slot slot1, Slot slot2) -> slot1.end == slot2.start, // Test neighbors.
        (Slot slot1, Slot slot2) -> new Slot(slot1.start, slot2.end)); // Combine neighbors.

public static void main(String[] args) {
    System.out.println(Stream.of(
            new Slot(1, 3), new Slot(4, 7), new Slot(7, 10), new Slot(10,12), new Slot(20, 25))
        .collect(SLOT_COMBINING_COLLECTOR)
        .stream()
        .map(Objects::toString)
        .collect(Collectors.joining(", ")));
}

打印:[1,3],[4,12],[20,25]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接