在Java中,如何高效优雅地流式传输树节点的后代?

22
假设我们有一个对象集合,这些对象由唯一的字符串标识,以及一个定义它们层次结构的类Tree。该类使用从节点(通过它们的ID表示)到其各自子节点ID的Collection的Map实现。请注意,保留html标记。
class Tree {
  private Map<String, Collection<String>> edges;

  // ...

  public Stream<String> descendants(String node) {
    // To be defined.
  }
}

我希望能够启用节点后代的流式传输。一个简单的解决方法是:

private Stream<String> children(String node) {
    return edges.getOrDefault(node, Collections.emptyList()).stream();
}

public Stream<String> descendants(String node) {
    return Stream.concat(
        Stream.of(node),
        children(node).flatMap(this::descendants)
    );
}

在继续之前,我想对这个解决方案做出以下断言。(我的理解是否正确?)

  1. descendants返回的Stream会消耗资源(时间和内存) - 相对于树的大小而言 - 其复杂度与手工编写递归相同。特别是,表示迭代状态的中间对象(StreamSpliterator等)形成堆栈,因此在任何给定时间的内存需求与树的深度的复杂度相同。

  2. 据我理解this,一旦我在从descendants返回的Stream上执行终止操作,根级别对flatMap的调用将立即导致实现所有包含的Stream - 每个(递归)调用descendants都有一个。因此,结果Stream仅在第一层递归时是惰性的,但不会超过此层。 (根据Tagir Valeevs answer进行编辑。)

如果我正确理解这些要点,我的问题是:如何定义descendants以使得生成的Stream是惰性的?
我希望解决方案尽可能优雅,也就是说,我更喜欢一种将迭代状态隐式留下的解决方案。(为了澄清我的意思:我知道我可以编写一个Spliterator来遍历树,同时在每个级别上维护一个显式堆栈,但我想避免这种情况。)
(在Java中是否可能以生产者-消费者工作流程的方式表达这一点,就像Julia和Go等语言中可以使用的方式?)

1
一个自定义的Spliterator似乎是你想要的... - fge
1
非常相似的问题 - https://dev59.com/5FwY5IYBdhLWcg3waXEF - ZhongYu
3
我会认真考虑使用类似Guava的TreeTraverser,然后将其包装为一个Stream - Louis Wasserman
1
这个回答包含了一些增加懒惰度的建议,也许可以适用于你的情况。 - the8472
5个回答

18
对我来说,您的解决方案已经非常优雅,而其有限的懒惰并非您的错。最简单的解决方案是等待JRE开发人员修复它。通过Java 10已经解决了这个问题。
然而,如果今天实现的有限懒惰确实是一个问题,那么也许是时候以一种普遍的方式解决这个问题了。好吧,它确实涉及到实现一个Spliterator,但不是针对您的任务。相反,它是重新实现flatmap操作,适用于所有原始实现的有限懒惰问题的情况:
public class FlatMappingSpliterator<E,S> extends Spliterators.AbstractSpliterator<E>
implements Consumer<S> {

    static final boolean USE_ORIGINAL_IMPL
        = Boolean.getBoolean("stream.flatmap.usestandard");

    public static <T,R> Stream<R> flatMap(
        Stream<T> in, Function<? super T,? extends Stream<? extends R>> mapper) {

        if(USE_ORIGINAL_IMPL)
            return in.flatMap(mapper);

        Objects.requireNonNull(in);
        Objects.requireNonNull(mapper);
        return StreamSupport.stream(
            new FlatMappingSpliterator<>(sp(in), mapper), in.isParallel()
        ).onClose(in::close);
    }

    final Spliterator<S> src;
    final Function<? super S, ? extends Stream<? extends E>> f;
    Stream<? extends E> currStream;
    Spliterator<E> curr;

    private FlatMappingSpliterator(
        Spliterator<S> src, Function<? super S, ? extends Stream<? extends E>> f) {
        // actually, the mapping function can change the size to anything,
        // but it seems, with the current stream implementation, we are
        // better off with an estimate being wrong by magnitudes than with
        // reporting unknown size
        super(src.estimateSize()+100, src.characteristics()&ORDERED);
        this.src = src;
        this.f = f;
    }

    private void closeCurr() {
        try { currStream.close(); } finally { currStream=null; curr=null; }
    }

    public void accept(S s) {
        curr=sp(currStream=f.apply(s));
    }

    @Override
    public boolean tryAdvance(Consumer<? super E> action) {
        do {
            if(curr!=null) {
                if(curr.tryAdvance(action))
                    return true;
                closeCurr();
            }
        } while(src.tryAdvance(this));
        return false;
    }

    @Override
    public void forEachRemaining(Consumer<? super E> action) {
        if(curr!=null) {
            curr.forEachRemaining(action);
            closeCurr();
        }
        src.forEachRemaining(s->{
            try(Stream<? extends E> str=f.apply(s)) {
                if(str!=null) str.spliterator().forEachRemaining(action);
            }
        });
    }

    @SuppressWarnings("unchecked")
    private static <X> Spliterator<X> sp(Stream<? extends X> str) {
        return str!=null? ((Stream<X>)str).spliterator(): null;
    }

    @Override
    public Spliterator<E> trySplit() {
        Spliterator<S> split = src.trySplit();
        if(split==null) {
            Spliterator<E> prefix = curr;
            while(prefix==null && src.tryAdvance(s->curr=sp(f.apply(s))))
                prefix=curr;
            curr=null;
            return prefix;
        }
        FlatMappingSpliterator<E,S> prefix=new FlatMappingSpliterator<>(split, f);
        if(curr!=null) {
            prefix.curr=curr;
            curr=null;
        }
        return prefix;
    }
}

如果您想使用它,只需要在您的代码中添加import static flatMap方法,并将形如stream.flatmap(function)的表达式更改为flatmap(stream, function)

也就是说,在您的代码中:

public Stream<String> descendants(String node) {
    return Stream.concat(
        Stream.of(node),
        flatMap(children(node), this::descendants)
    );
}

然后您就拥有完全的惰性行为。我甚至测试了无限流……

请注意,我添加了一个切换选项,允许返回到原始实现,例如在命令行上指定-Dstream.flatmap.usestandard=true


1
有趣的解决方案。尝试使用 flatMap(IntStream.range(0, 1000000).boxed(), Stream::of).parallel().collect(Collectors.summingInt(Integer::intValue)) 并与JDK版本进行比较。你的版本非常慢(大约慢30-50倍)。 - Tagir Valeev
3
请注意,当像这样提高每个项目的开销时,如 flatMap(IntStream.range(0, 1000).boxed().parallel(), Stream::of) .map(i->{ LockSupport.parkNanos(1); return i; }) .collect(Collectors.summingInt(Integer::intValue)); 执行实际上从并行执行中获益,并且两种实现方式效果相当。 - Holger
4
我明白了,我的测试真正的问题在于我应该在你的flatMap之前进行并行处理,因为添加无害的 boxed() 已经使源分裂器无法分割。 下面这个可以正常工作:flatMap(IntStream.range(0, 1000000).boxed().parallel(), Stream::of).collect(Collectors.summingInt(Integer::intValue)) - Tagir Valeev
4
API规定Long.MAX_VALUE为“未知大小”,而非“非常大的大小”。如果流实现错误地解释了它,那么就是一个bug。由于该函数可以返回从空流到无限流的任何内容,因此我认为估计任何大小都是无效的。这种推理是错误的。如果子任务由于异常或短路而更快地完成,它们不应该获得新任务,因为整个操作应该结束。但出于实际目的,我添加了它。 - Holger
2
也许你可以看一下这个; 它可能会对你有所帮助。 - fge
显示剩余6条评论

5

你稍微有点错误地说flatMap流不是惰性的。它有一定的惰性,但它的惰性真的很有限。让我们使用一些自定义的Collection来跟踪在你的Tree类中请求的元素:

private final Set<String> requested = new LinkedHashSet<>();

private class MyList extends AbstractList<String> implements RandomAccess
{
    private final String[] data;

    public MyList(String... data) {
        this.data = data;
    }

    @Override
    public String get(int index) {
        requested.add(data[index]);
        return data[index];
    }

    @Override
    public int size() {
        return data.length;
    }
}

现在让我们使用一些树数据对您的类进行预初始化:
public Tree() {
    // "1" is the root note, contains three immediate descendants
    edges.put("1", new MyList("2", "3", "4"));
    edges.put("2", new MyList("5", "6", "7"));
    edges.put("3", new MyList("8", "9", "10"));
    edges.put("8", new MyList("11", "12"));
    edges.put("5", new MyList("13", "14", "15"));
    edges.put("7", new MyList("16", "17", "18"));
    edges.put("6", new MyList("19", "20"));
}

最后,让我们检查在不同的限制值下实际请求了多少元素:
public static void main(String[] args) {
    for(int i=1; i<=20; i++) {
        Tree tree = new Tree();
        tree.descendants("1").limit(i).toArray();
        System.out.println("Limit = " + i + "; requested = (" + tree.requested.size()
                + ") " + tree.requested);
    }
}

输出结果如下:
Limit = 1; requested = (0) []
Limit = 2; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 3; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 4; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 5; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 6; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 7; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 8; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 9; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 10; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 11; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 12; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 13; requested = (12) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18]
Limit = 14; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 15; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 16; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 17; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 18; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 19; requested = (18) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10]
Limit = 20; requested = (19) [2, 5, 13, 14, 15, 6, 19, 20, 7, 16, 17, 18, 3, 8, 11, 12, 9, 10, 4]

因此,当只请求根节点时,不执行对子节点的访问(因为Stream.concat很聪明)。当请求第一个直接子节点时,即使不需要处理整个子节点的子树,也会处理它。然而,第二个直接子节点在第一个完成之前不会被处理。这可能对短路场景有问题,但在大多数情况下,您的终端操作不是短路的,因此这仍然是一种不错的方法。
至于您对内存消耗的担忧:是的,它会根据树深度消耗内存(更重要的是它会占用堆栈)。如果您的树有成千上万个嵌套级别,您将遇到问题,因为您可能会在默认的-Xss设置下遇到StackOverflowError。对于几百个深度级别,它将正常工作。
我们在应用程序的业务逻辑层中使用类似的方法,对我们来说效果很好,尽管我们的树很少超过10个级别的深度。

换句话说,Stream 在第一层上是惰性的,但在其后不是。我会编辑问题以反映这一点。然而,根本问题仍然存在,对吧? - user4235730
1
@user4235730,基本上是的。但很多事情取决于如何定义“惰性”。如果你想将结果收集到列表中(带有一些过滤、映射等),或者使用forEach终端操作,那么下一个元素在前一个元素被处理之前不会从树中请求。非惰性只能在短路时看到,或者直接使用stream.iterator()/stream.spliterator()。 - Tagir Valeev

1

这不是一个真正的答案,只是一个想法:

如果你查看值集合并在下一步“解析”最后一个已看到的值为新的值集合以递归相同方式返回下一个值,则无论如何实现,它都将以某种形式的“指针”结束当前元素在值集合中的“层”上,并且还具有某种保存所有这些“指针”的堆栈。

这是因为您需要树中较高级别的信息(堆栈)和当前级别上当前元素的“指针”。在这种情况下,一个引起另一个。

当然,您可以将其实现为持有迭代器堆栈(指向相应的值集合)的Spliterator,但我想,即使它隐藏在Java的flatMap(或相关)临时对象中,每个深度级别都会存在“指针”状态。

作为一种替代方案:使用一个带有指向其父节点的引用的“真实”树如何?此外,添加一个映射到树根的地图,其中包含对所有单个节点的引用,以简化对子子子孙的访问。我猜想Spliterator的实现将会非常容易,因为它只需要一个当前节点的引用来遍历和一个停止条件(初始节点值)来防止在树中走得太高。

我知道迭代状态必须存在于某个地方,但我希望它是隐式的。如果我手动实现递归,那么也会有一个堆栈,但从迭代的角度来看,它在运行时环境中是隐含的。如果我使用高阶迭代原语例如 flatMap,迭代状态则隐藏在该原语中,这对我而言也是可以接受的。我只是想避免定义自己的对象来保存迭代状态。关于你提出的替代方案:我不知道这会给我们带来什么好处。能否请您详细说明一下如何更轻松地实现 Spliterator - user4235730
顺便说一句,如果这不是“真正的答案”,也许它应该是一个或多个评论? - user4235730
注释太长了,无法放在注释字段中,所以我选择回答。当关注Spliterator.tryAdvance时,只需要记住子节点的索引和当前节点;树内部的深度由于每个节点都知道其父节点而自动维护。不需要使用堆栈。但这将是一种与您的数据结构通常不同的数据结构。 - jCoder
只是为了确保我理解您的意思:您在谈论“左儿子,右兄弟”树表示法吗?如果是这样的话:是的,只需要当前节点。但是“子节点索引”是什么意思呢? - user4235730
根据实现方式并假设一个父母可以有一个或两个以上的孩子,那么可能需要一个指示下一个孩子的索引。但是我猜这种方法在你的情况下走得太远了,所以最好坚持其他想法之一 ;) - jCoder

0

让我们首先通过提供技术讨论来回答这个问题--

  1. TreeNode 可以持有一个用户对象的引用,其使用取决于用户。通过调用 toString() 方法来获取 TreeNode 的字符串表示形式将返回其用户对象的字符串表示形式。
  2. 树节点最多可以有一个父节点和 0 或多个子节点。 TreeNode 提供了操作来检查和修改节点的父节点和子节点,还提供了操作来检查节点所在的树。节点的树是从该节点开始并沿着所有可能的链接到父节点和子节点的路径到达的所有节点的集合。没有父节点的节点是其树的根节点;没有子节点的节点是叶子节点。一棵树可以由许多子树组成,每个节点都充当其自己子树的根节点。
  3. Java 8 中现有的 DefaultMutableTrrNode 已被修改。
  4. 此类提供了枚举,以有效地遍历树或子树以不同的顺序或跟随两个节点之间的路径。
  5. 这不是一个线程安全的类。如果您打算在多个线程中使用 TreeNode(或 TreeNodes 树),则需要进行自己的同步。采用的一个好习惯是在树的根节点上同步。
  6. 此类的序列化对象将与未来的 Swing 发布不兼容。当前的序列化支持适用于短期存储或在运行相同版本 Swing 的应用程序之间进行 RMI。从 1.4 开始,已将所有 JavaBeans™ 的长期存储支持添加到 java.beans 包中。

请查看在Git上贡献的TreeNode的修改版本-TreeNode


你尝试过在Git中贡献的TreeNode的新修改方法了吗? - Vaibhav Atray

0

我建议的东西实际上与你不想要的相似,但比直接维护堆栈更容易和更优雅地实现。

public class TreeIterator {
    private Tree tree;
    private List<String> topLevelNodes;

    public TreeIterator(Tree t, String node) {
        topLevelNodes = new List();
        topLevelNodes.add(node);
        tree = t;
    }

    public String next() {
        if (topLevelNodes.size() > 0) {
            int last = topLevelNodes.size() - 1;
            String result = topLevelNodes.get(last);
            topLevelNodes.remove(last);
            topLevelNodes.addAll(tree.get(result));
            return result;
        }
        return null;
    }
}

对于使用 new List() 和其他不正确的方法,我很抱歉,只是想分享一下这个想法。


虽然你称它为“List”,但这里的topLevelNodes实际上是一个栈:你只在末尾添加和删除。与迭代器堆栈相比,这会在每个级别上实例化迭代下的对象,因此需要比必要更多的内存。(复杂度的顺序不再是高度,而是高度乘以节点度数。)很抱歉,但我并不认为这更加“优雅”。 - user4235730
是的,我同意你的观点。 - sbeliakov
2
为什么要使用 get(last) 后跟 remove(last)?这是不必要的步骤。只需使用 String result = topLevelNodes.remove(last);。除此之外,使用 ArrayDeque 可以有效地从头部删除,以保持节点的正确顺序... - Holger
感谢对remove的备注。然而,从头部删除并插入到尾部会增加内存消耗,因此list(或只是stack)更适合。 - sbeliakov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接