使用JPA Repository创建Java 8流

4
我希望创建一个来自JPA Repository的Stream。目标是将来自Repo的实体(可能超过一百万)映射到其他实体,这些实体将再存储在另一个Repo中。
到目前为止,我已经构建了一个Collector,该Collector将收集给定数量(例如1000)的实体,然后将它们存储到目标Repo中。这将在并行流中工作。现在我需要一种好的方法来从源Repo中获取实体,并在需要时将它们馈送到Stream中。
到目前为止最有前途的是通过generate实现Supplier (http://docs.oracle.com/javase/8/docs/api/java/util/function/Supplier.html) 来构建Stream,但我没找到一种在查询源Repo不提供另一个实体时终止进程的方法。
任何提示?

可能是重复的问题:如何从迭代器创建Java 8 Stream? - a better oliver
供应商仅适用于无限流。您可以编写一个普通的迭代器。 - a better oliver
Spliterator是我的第二个猜测。我希望有一个更直接的解决方案。问题仍然是我不想获取完整的实体集合,仅使用存储库作为迭代器也没有好处。 - Christoph Grimmer
您可以批量加载实体。 - a better oliver
也许我漏掉了什么或者存在误解,但我不明白为什么分页无法满足您的要求。一次加载20个实体或一个接一个地加载并没有太大的区别。在我看来,前者甚至更加高效,并且确实满足了“根据需要”的要求。 - a better oliver
显示剩余3条评论
4个回答

3

嗨Thomas,非常感谢你。实际上我在我的博客中写到了我们见面后的启发(用德语写的):http://blog.flavia-it.de/trefft-euch/ - Christoph Grimmer

2
如果你能够将源代码表达为Supplier的实现,则也可能实现Spliterator。 你需要实现boolean tryAdvance(Consumer),而不是Supplier.get, 如果有新项目则不会返回新值,但会在Consumer上调用accept,否则返回false。 对于大多数情况,与必须处理两个方法hasNextnextIterator相比,这样可以简化实现。
你还需要实现Spliterator的其他几个方法,但幸运的是,有直接的实现方式。
public Spliterator<T> trySplit() {
    return null;// simple answer when splitting is not supported
}
public long estimateSize() {
    return Long.MAX_VALUE; // the value which should be used for UNKNOWN
}
public int characteristics() {
    return 0; // no flags but check out whether some flags fit
}

对于 characteristics 方法,有必要查找 可能的值,如果它们符合您的源特征,则可以改善流处理。

一旦您拥有了 Spliterator,就可以从中创建一个流:

Stream<T> s=StreamSupport.stream(sp, false);

如果您的源代码更适合hasNext/next模式,则可以实现普通的Iterator并让JRE创建一个Spliterator,如“如何从迭代器创建Java 8 Stream?”中所述。


非常感谢您的输入! - Christoph Grimmer

0
一个简单的例子可能是:
  @Repository
  public class MyEntityRepository extends CrudRepository<MyEntity, Long> {           
  }

  @Component
  public class MyEntityService {

       @Autowired
       private MyEntityRepository myEntityRepository;


       public void() {
            // if the findAll() method returns List
            Stream<MyEntity> streamFromList = myEntityRepository.findAll().stream();


            // if the findAll() method returns Iterable
            Stream<MyEntity> streamFromIterable = StreamSupport.stream(myEntityRepository.findAll().spliterator(), true);

       }
  } 

据我所知,findAll将急切地获取完整的集合。我需要延迟获取。 - Christoph Grimmer

0

好的,感谢所有的贡献。我将所说的内容结合起来,并实现了我需要的功能。也许实现过程会澄清我最初想要的。

我创建了两个类,RepositryCollectorRepositorySpliterator

public class RepositoryCollector<T> implements Collector<T, Tuple2<Integer,List<T>>, Integer>{

    private JpaRepository<T, ?> repository;
    private int threshold;

    public BinaryOperator<Tuple2<Integer, List<T>>> combiner() {
        return (listTuple, itemsTuple) -> {
            List<T> list = listTuple._2;
            List<T> items = itemsTuple._2;
            list.addAll(items);
            int sum = listTuple._1 + itemsTuple._1;
            if(list.size() >= this.threshold){
                this.repository.save(list);
                this.repository.flush();
                list = new LinkedList<>();
            }
            return new Tuple2<>(sum, list);
        };
    }
}

我省略了收集器所需的其他函数,因为组合器中已经包含了所有相关信息。对于Spliterator也是如此。

public class RepositorySpliterator<T> implements Spliterator<T> {

    private Slice<T> slice;
    private Function<Pageable, Slice<T>> getSlice;
    private Iterator<T> sliceIterator;

    public RepositorySpliterator(Pageable pageable, Function<Pageable, Slice<T>> getSlice) {
        this.getSlice = getSlice;
        this.slice = this.getSlice.apply(pageable);
        this.sliceIterator = slice.iterator();
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if(sliceIterator.hasNext()) {
            action.accept(sliceIterator.next());
            return true;
        } else if (slice.hasNext()) {
            this.slice = getSlice.apply(slice.nextPageable());
            this.sliceIterator = this.slice.iterator();
            if(sliceIterator.hasNext()){
                action.accept(sliceIterator.next());
                return true;
            }
        }
        return false;
    }

    public Stream<T> getStream(boolean parallel){
        return StreamSupport.stream(this, parallel);
    }
}

正如您所见,我添加了一个帮助函数来生成所需的流。也许这有点粗糙,但是...算了。

因此,现在我只需要在我的映射类中加入几行代码就可以开始了。

    public void start(Timestamp startTimestamp, Timestamp endTimestamp) {
        new RepositorySpliterator<>(
                new PageRequest(0, 10000), pageable -> sourceRepository.findAllBetween(startTimestamp, endTimestamp, pageable))
                .getStream(true)
                .map(entity -> mapToTarget(endTimestamp, entity))
                .collect(new RepositoryCollector<>(targetRepository, 1000));
    }

映射器将从源中获取10000个实体,将它们倒入流池中,以便进行映射和存储。每当一个流用完新的实体时,就会获取一批新的实体,并将其馈送到同一个流池中。

如果我的实现中存在明显的错误,请随时评论并改进!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接