Java 8 Stream,如何获取头部和尾部

24

Java 8 引入了一个类似于 Scala 的 StreamStream 类,这是一个强大的延迟构造,使用它可以非常简洁地完成如下操作:

def from(n: Int): Stream[Int] = n #:: from(n+1)

def sieve(s: Stream[Int]): Stream[Int] = {
  s.head #:: sieve(s.tail filter (_ % s.head != 0))
}

val primes = sieve(from(2))

primes takeWhile(_ < 1000) print  // prints all primes less than 1000

我在想是否可以在Java 8中实现这个,所以我写了类似这样的代码:

IntStream from(int n) {
    return IntStream.iterate(n, m -> m + 1);
}

IntStream sieve(IntStream s) {
    int head = s.findFirst().getAsInt();
    return IntStream.concat(IntStream.of(head), sieve(s.skip(1).filter(n -> n % head != 0)));
}

IntStream primes = sieve(from(2));

这很简单,但它会产生 java.lang.IllegalStateException: stream has already been operated upon or closed 的异常,因为findFirst()skip() 都是终端操作,只能执行一次。

实际上我不需要两次使用流,因为我只需要流中的第一个数字和其余部分作为另一个流,即相当于Scala的Stream.headStream.tail。在Java 8的Stream中是否有我可以使用的方法来实现这一点?

谢谢。


2
如果你想要以那种方式操作流,最好的方式可能是包装它的 iterator()。(更不用说你的实现实际上并不是一个适当的质数筛选器;参见例如这篇论文。) - Louis Wasserman
我尝试过提取第一个数字并重构另一个流,例如 IntStream.generate(() -> it.next()),但迭代器的 hasNext() 方法会急切地工作并导致无限递归。 - lyomi
是的,那行不通。实际上,这根本无法与Stream干净地配合使用。 - Louis Wasserman
请参阅此博客文章,了解如何在Java中自己构建惰性序列的解释。 - kiritsuku
我最近使用JDK8编写了这个程序(https://gist.github.com/edalorzo/5135612),但是我构建了自己的流,因为我认为JDK 8的流与Scala的惰性流并不完全相同。 - Edwin Dalorzo
10个回答

11

即使您没有无法拆分IntStream的问题,您的代码也无法正常工作,因为您递归调用了sieve方法而不是懒惰地调用。 因此,在您查询第一个值之前,您已经进入了无限递归。

IntStream s拆分为头部和尚未使用的尾部IntStream是可能的:

PrimitiveIterator.OfInt it = s.iterator();
int head = it.nextInt();
IntStream tail = IntStream.generate(it::next).filter(i -> i % head != 0);

在这里,您需要一个调用尾部惰性筛法的构造。 Stream 不提供此功能;concat 期望现有的流实例作为参数,并且您无法使用 lambda 表达式惰性地创建调用 sieve 的流,因为惰性创建仅适用于可变状态,而 lambda 表达式不支持可变状态。如果您没有隐藏可变状态的库实现,则必须使用可变对象。但是,一旦接受了可变状态的要求,解决方案甚至可以比您的第一种方法更简单:

IntStream primes = from(2).filter(i -> p.test(i)).peek(i -> p = p.and(v -> v % i != 0));

IntPredicate p = x -> true;

IntStream from(int n)
{
  return IntStream.iterate(n, m -> m + 1);
}

这将递归创建一个过滤器,但最终创建的是IntPredicate树还是IntStream树并不重要(例如使用IntStream.concat方法创建的方法)。如果您不喜欢过滤器的可变实例字段,可以将其隐藏在内部类中(但不能使用lambda表达式)。


我认为这不起作用,你不能在lambda表达式中引用非final变量。 - djjeck
1
@djjeck:如果你指的是第二个解决方案,那么这些是字段,而不是局部变量,可以进行修改。第一个示例中的变量是有效地最终的。这两个示例都经过测试并被证明可行。 - Holger
你说得对,那个方法可行(而且你在答案中也提到了它)。感谢你的澄清。 - djjeck

4

我的StreamEx库现在有了headTail()操作,可以解决以下问题:

public static StreamEx<Integer> sieve(StreamEx<Integer> input) {
    return input.headTail((head, tail) -> 
        sieve(tail.filter(n -> n % head != 0)).prepend(head));
}

headTail 方法接受一个 BiFunction,该函数在流终端操作执行期间最多执行一次。因此,这个实现是惰性的:它在遍历开始之前不计算任何内容,并且只计算所请求的质数数量。 BiFunction 接收第一个流元素 head 和其余元素的流 tail,并且可以以任何希望的方式修改 tail。您可以将其与预定义输入一起使用:

sieve(IntStreamEx.range(2, 1000).boxed()).forEach(System.out::println);

但是无限流也同样可以工作。
sieve(StreamEx.iterate(2, x -> x+1)).takeWhile(x -> x < 1000)
     .forEach(System.out::println);
// Not the primes till 1000, but 1000 first primes
sieve(StreamEx.iterate(2, x -> x+1)).limit(1000).forEach(System.out::println);

还有一种替代方案,使用 headTail 和谓词连接:

public static StreamEx<Integer> sieve(StreamEx<Integer> input, IntPredicate isPrime) {
    return input.headTail((head, tail) -> isPrime.test(head) 
            ? sieve(tail, isPrime.and(n -> n % head != 0)).prepend(head)
            : sieve(tail, isPrime));
}

sieve(StreamEx.iterate(2, x -> x+1), i -> true).limit(1000).forEach(System.out::println);

比较递归解法的有趣之处在于:它们能够生成多少个质数。

@John McClean 的解法 (StreamUtils)

John McClean 的解法不是惰性求值的:你不能用无限流来输入它们。因此,我通过试错找到了最大允许的上限(17793)(在那之后会出现 StackOverflowError 错误):

public void sieveTest(){
    sieve(IntStream.range(2, 17793).boxed()).forEach(System.out::println);
}

@John McClean的解决方案(可流式处理的

public void sieveTest2(){
    sieve(Streamable.range(2, 39990)).forEach(System.out::println);
}

将上限增加到39990以上会导致StackOverflowError。

@frhack的解决方案(LazySeq

LazySeq<Integer> ints = integers(2);
LazySeq primes = sieve(ints); // sieve method from @frhack answer
primes.forEach(p -> System.out.println(p));

结果:在质数为 53327 时卡住,堆分配和垃圾回收占用超过90%。从53323到53327需要几分钟,因此等待更长时间似乎不切实际。

@vidi的解决方案

Prime.stream().forEach(System.out::println);

结果:质数为134417后出现了StackOverflowError错误。

我的解决方案(StreamEx)

sieve(StreamEx.iterate(2, x -> x+1)).forEach(System.out::println);

结果:在质数为236167后发生了StackOverflowError错误。

@frhack的解决方案(rxjava

Observable<Integer> primes = Observable.from(()->primesStream.iterator());
primes.forEach((x) -> System.out.println(x.toString()));            

结果:质数为367663后出现了StackOverflowError错误。

@Holger的解决方案

IntStream primes=from(2).filter(i->p.test(i)).peek(i->p=p.and(v->v%i!=0));
primes.forEach(System.out::println);

结果:质数为 368089 时出现堆栈溢出错误。

我的解决方案(使用 StreamEx 和谓词连接)

sieve(StreamEx.iterate(2, x -> x+1), i -> true).forEach(System.out::println);

结果:当质数为368287时,出现了StackOverflowError错误。


因此,涉及谓词连接的三个解决方案获胜,因为每个新条件只添加了2个堆栈帧。我认为它们之间的差异微不足道,不应该被视为定义获胜者的因素。然而,我更喜欢我的第一个StreamEx解决方案,因为它更类似于Scala代码。


请您能否添加各自解决方案的时间比较?因为它们涉及谓词连接(只是更或者更少明确)和模数操作,所以它们可能得分相等。 - charlie
嘿,伙计,这个库做得很棒。我们在相当大的范围内使用它。MoreCollectors.flatMapping非常有用,现在已经在Java 9中了。你在这里做的很多事情实际上可以成为主要的JDK基线10+的候选项。 - ailveen

3
下面的解决方案不进行状态变化,除了流的头/尾解构。
使用 IntStream.iterate 来获取惰性。Prime 类用于保持生成器状态。
    import java.util.PrimitiveIterator;
    import java.util.stream.IntStream;
    import java.util.stream.Stream;

    public class Prime {
        private final IntStream candidates;
        private final int current;

        private Prime(int current, IntStream candidates)
        {
            this.current = current;
            this.candidates = candidates;
        }

        private Prime next()
        {
            PrimitiveIterator.OfInt it = candidates.filter(n -> n % current != 0).iterator();

            int head = it.next();
            IntStream tail = IntStream.generate(it::next);

            return new Prime(head, tail);
        }

        public static Stream<Integer> stream() {
            IntStream possiblePrimes = IntStream.iterate(3, i -> i + 1);

            return Stream.iterate(new Prime(2, possiblePrimes), Prime::next)
                         .map(p -> p.current);
        }
    }

使用方法如下:
Stream<Integer> first10Primes = Prime.stream().limit(10)

1
避免使用模数操作并利用筛法的加性特性会很好。可以尝试类似于IntStream impossiblePrimes = IntStream.iterate(head * head, i -> i + 2 * head);这样的方法。但是,这更多地与质数相关,而不是与头/尾相关。 - charlie

2
您可以基本上按照以下方式实现它:
static <T> Tuple2<Optional<T>, Seq<T>> splitAtHead(Stream<T> stream) {
    Iterator<T> it = stream.iterator();
    return tuple(it.hasNext() ? Optional.of(it.next()) : Optional.empty(), seq(it));
}

在上面的示例中,Tuple2Seq是从我们为jOOQ集成测试开发的库jOOλ中借来的类型。如果您不想有任何额外的依赖,那么您也可以自己实现它们:
class Tuple2<T1, T2> {
    final T1 v1;
    final T2 v2;

    Tuple2(T1 v1, T2 v2) {
        this.v1 = v1;
        this.v2 = v2;
    }

    static <T1, T2> Tuple2<T1, T2> tuple(T1 v1, T2 v2) {
        return new Tuple<>(v1, v2);
    }
}

static <T> Tuple2<Optional<T>, Stream<T>> splitAtHead(Stream<T> stream) {
    Iterator<T> it = stream.iterator();
    return tuple(
        it.hasNext() ? Optional.of(it.next()) : Optional.empty,
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            it, Spliterator.ORDERED
        ), false)
    );
}

1
如果您不介意使用第三方库cyclops-streams,我编写的一个库可能有许多潜在的解决方案。StreamUtils类拥有大量静态方法,可直接用于处理java.util.stream.Streams,包括headAndTail
HeadAndTail<Integer> headAndTail = StreamUtils.headAndTail(Stream.of(1,2,3,4));
int head = headAndTail.head(); //1
Stream<Integer> tail = headAndTail.tail(); //Stream[2,3,4]

Streamable类表示可重播的Stream,通过构建惰性缓存中间数据结构来工作。由于它具有缓存和可重放性,因此可以直接分别实现头部和尾部。

Streamable<Integer> replayable=  Streamable.fromStream(Stream.of(1,2,3,4));
int head = repayable.head(); //1
Stream<Integer> tail = replayable.tail(); //Stream[2,3,4]

cyclops-streams 还提供了一个顺序Stream扩展,这个扩展会继承jOOλ,同时还提供了基于Tuple(来自jOOλ)和基于域对象(HeadAndTail)的解决方案,用于头部和尾部的提取。

SequenceM.of(1,2,3,4)
         .splitAtHead(); //Tuple[1,SequenceM[2,3,4]

SequenceM.of(1,2,3,4)
         .headAndTail();

根据Tagir的要求更新 -> 使用SequenceM编写的Scala筛法的Java版本

public void sieveTest(){
    sieve(SequenceM.range(2, 1_000)).forEach(System.out::println);
}

SequenceM<Integer> sieve(SequenceM<Integer> s){

    return s.headAndTailOptional().map(ht ->SequenceM.of(ht.head())
                            .appendStream(sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(SequenceM.of());
}

另一种版本通过Streamable
public void sieveTest2(){
    sieve(Streamable.range(2, 1_000)).forEach(System.out::println);
}

Streamable<Integer> sieve(Streamable<Integer> s){

    return s.size()==0? Streamable.of() : Streamable.of(s.head())
                                                    .appendStreamable(sieve(s.tail()
                                                                    .filter(n -> n % s.head() != 0)));
}

注意 - 既StreamableSequenceM都没有Empty实现 - 因此对Streamable进行大小检查并使用headAndTailOptional
最后,使用普通的java.util.stream.Stream版本。
import static com.aol.cyclops.streams.StreamUtils.headAndTailOptional;

public void sieveTest(){
    sieve(IntStream.range(2, 1_000).boxed()).forEach(System.out::println);
}

Stream<Integer> sieve(Stream<Integer> s){

    return headAndTailOptional(s).map(ht ->Stream.concat(Stream.of(ht.head())
                            ,sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                    .orElse(Stream.of());
}

另一个更新 - 基于@Holger的版本,采用对象而不是基本类型进行懒惰迭代(注意,也可以使用基本类型的版本)

  final Mutable<Predicate<Integer>> predicate = Mutable.of(x->true);
  SequenceM.iterate(2, n->n+1)
           .filter(i->predicate.get().test(i))
           .peek(i->predicate.mutate(p-> p.and(v -> v%i!=0)))
           .limit(100000)
           .forEach(System.out::println);

希望能够看到使用您的库解决质数问题的完整解决方案。 - Tagir Valeev
队友打算使用Streamable / SequenceM实现Eratosthenes筛法,我们会及时向您报告。 - John McClean
1
final List<Integer> prime = new ArrayList<>(); SequenceM<Integer> primeNumbers = SequenceM.generate(() -> { synchronized (prime) { if (prime.isEmpty()) { prime.add(2); } else { int last = prime.get(prime.size() - 1); do { int candidate = last; if (!prime.stream().parallel().anyMatch(c -> (candidate) % c == 0)) { prime.add(candidate); break; } last++; } while (true); } return prime.get(prime.size() - 1); } }); - Nikita Sapozhnikov
看起来这两个解决方案都不是懒惰的。例如,尝试使用sieve(Stream.iterate(2, x -> x+1)).limit(1000)作为源代码。 - Tagir Valeev
是的,每次都会立即评估头部,尾部则是惰性处理。不过,将HeadAndTail中的head()方法改为惰性评估是一个相当琐碎的修改,我们很快就会添加这个功能。 - John McClean
你可能是指提供的解决方案不是Lazy的吗?是的,没错,我会更新一个Lazy版本(尽管问题最初规定了一个相当低的限制)。 - John McClean

1

这里提供了许多有趣的建议,但如果有人需要一个不依赖第三方库的解决方案,我想到了这个:

    import java.util.AbstractMap;
    import java.util.Optional;
    import java.util.Spliterators;
    import java.util.stream.StreamSupport;

    /**
     * Splits a stream in the head element and a tail stream.
     * Parallel streams are not supported.
     * 
     * @param stream Stream to split.
     * @param <T> Type of the input stream.
     * @return A map entry where {@link Map.Entry#getKey()} contains an
     *    optional with the first element (head) of the original stream
     *    and {@link Map.Entry#getValue()} the tail of the original stream.
     * @throws IllegalArgumentException for parallel streams.
     */
    public static <T> Map.Entry<Optional<T>, Stream<T>> headAndTail(final Stream<T> stream) {
        if (stream.isParallel()) {
            throw new IllegalArgumentException("parallel streams are not supported");
        }
        final Iterator<T> iterator = stream.iterator();
        return new AbstractMap.SimpleImmutableEntry<>(
                iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty(),
                StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
        );
    }

0
要获取头和尾,您需要使用Lazy Stream实现。Java 8 stream或RxJava不适用。
例如,您可以使用LazySeq如下。

惰性序列始终从开头遍历,使用非常便宜的first/rest分解(head()和tail())

LazySeq实现了java.util.List接口,因此可以在各种地方使用。此外,它还实现了Java 8对集合的增强,即流和收集器


package com.company;

import com.nurkiewicz.lazyseq.LazySeq;

public class Main {

    public static void main(String[] args) {

        LazySeq<Integer> ints = integers(2);
        LazySeq primes = sieve(ints);
        primes.take(10).forEach(p -> System.out.println(p));

    }

    private static LazySeq<Integer> sieve(LazySeq<Integer> s) {
        return LazySeq.cons(s.head(), () -> sieve(s.filter(x -> x % s.head() != 0)));
    }

    private static LazySeq<Integer> integers(int from) {
        return LazySeq.cons(from, () -> integers(from + 1));
    }

}

0

这里有另一个使用Holger建议的方法的示例。 它使用RxJava仅仅是为了添加使用take(int)方法和其他许多方法的可能性。

package com.company;

import rx.Observable;

import java.util.function.IntPredicate;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) {

        final IntPredicate[] p={(x)->true};
        IntStream primesStream=IntStream.iterate(2,n->n+1).filter(i -> p[0].test(i)).peek(i->p[0]=p[0].and(v->v%i!=0)   );

        Observable primes = Observable.from(()->primesStream.iterator());

        primes.take(10).forEach((x) -> System.out.println(x.toString()));


    }

}

0

这个也应该适用于并行流:

public static <T> Map.Entry<Optional<T>, Stream<T>> headAndTail(final Stream<T> stream) {
    final AtomicReference<Optional<T>> head = new AtomicReference<>(Optional.empty());
    final var spliterator = stream.spliterator();
    spliterator.tryAdvance(x -> head.set(Optional.of(x)));
    return Map.entry(head.get(), StreamSupport.stream(spliterator, stream.isParallel()));
}

-2
如果你想获取流的头部,只需:
IntStream.range(1, 5).first();

如果你想获取流的尾部,只需:
IntStream.range(1, 5).skip(1);

如果您想获取流的头部和尾部,请使用以下代码:

stream.head() + stream.tail()

IntStream s = IntStream.range(1, 5);
int head = s.head();
IntStream tail = s.tail();

如果你想找到质数,只需要:
LongStream.range(2, n)
   .filter(i -> LongStream.range(2, (long) Math.sqrt(i) + 1).noneMatch(j -> i % j == 0))
   .forEach(N::println);

如果想了解更多信息,请前往获取abacus-common

声明:我是abacus-common的开发者。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接