Java 8流和批处理

126

我有一个包含项目列表的大文件。

我想创建一批项目,将此批次作为参数在HTTP请求中发送(所有项目都需要作为HTTP请求中的参数)。我可以很容易地使用for循环来完成,但作为Java 8的爱好者,我想尝试使用Java 8的Stream框架来编写这个程序(并获得延迟处理的好处)。

示例:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

我想做类似于lazyFileStream.group(500).map(processBatch).collect(toList())的事情。

最佳方法是什么?


我还无法弄清如何执行分组,抱歉,但是Files#lines会惰性地读取文件的内容。 - user1038550
1
所以你基本上需要一个 flatMap 的反函数(+ 一个额外的 flatMap 来再次折叠流)?我不认为标准库中存在这样一个方便的方法。你要么得找到一个第三方库,要么就得根据 spliterator 和/或发出流的收集器编写自己的库。 - the8472
3
也许你可以将Stream.generatereader::readLinelimit结合起来,但问题是流与异常不太兼容。此外,这也很可能不容易并行化。我认为for循环仍然是最好的选择。 - tobias_k
我刚刚添加了一个示例代码。我认为flatMap不是正确的方法。我怀疑我可能需要编写自定义的Spliterator - Andy Dang
1
我正在创造“流滥用”这个术语来形容像这样的问题。 - kervin
1
为什么叫做“滥用”?它完美地适合流的概念,特别是对于惰性流。基本上,这需要一个“groupBy”,我不清楚在Java 8中如何清晰地编写它。 - Andy Dang
15个回答

159

为了完整起见,这里是Guava解决方案。

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);
在这个问题中,集合已经可用,因此不需要流(stream),可以将其写成:

在问题中集合已经可用,因此不需要使用流,可以这样写:

Iterables.partition(data, batchSize).forEach(this::process);

14
Lists.partition是我应该提到的另一种变体。 - Ben Manes
2
这是懒惰的,对吧?在处理相关批次之前,它不会将整个“Stream”调用到内存中。 - orirab
1
@orirab 是的。它在批次之间是懒惰的,也就是说每次迭代会消耗 batchSize 个元素。 - Ben Manes

76

也可以使用纯Java-8实现:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

请注意,与 JOOl 不同的是,它可以在并行处理时良好运作(前提是您的 data 是一个随机访问列表)。


4
如果你的数据实际上是一个流(例如文件中的行,甚至来自网络),会怎样呢? - Omry Yadan
7
@OmryYadan,这个问题是关于从“List”中获取输入(参见问题中的“data.size()”、“data.get()”)的。我在回答所问的问题。如果你有另一个问题,请提出来(虽然我认为流问题也已经被问过了)。 - Tagir Valeev
2
如何并行处理批次? - soup_boy

52

纯Java 8解决方案:

我们可以创建一个自定义的收集器来优雅地完成此操作,该收集器接受一个批处理大小和一个Consumer来处理每个批次:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

可以选择创建一个辅助工具类:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

示例用法:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

如果有人想要查看的话,我也把我的代码发布到了GitHub上:

Github链接


2
这是一个不错的解决方案,除非你无法将流中的所有元素都放入内存中。此外,它也无法处理无限流 - collect方法是终端操作,这意味着它会等待流完成,然后才会按批次处理结果,而不是生成一系列批次的流。 - Alex Ackerman
2
@AlexAckerman 无限流意味着finisher永远不会被调用,但累加器仍将被调用,因此项目仍将被处理。此外,它只需要在任何时候将批处理大小的项目保存在内存中。 - Solubris
@Solubris,你是对的!我的错,谢谢你指出来 - 我不会删除评论以供参考,如果有人对collect方法的工作方式有相同的想法。 - Alex Ackerman
将发送给用户的列表复制以确保修改安全,例如:batchProcessor.accept(copyOf(ts))。 - Solubris

21

我为这样的场景编写了一个自定义 Spliterator。它将从输入流中填充给定大小的列表。这种方法的优点是它将执行惰性处理,并且可以与其他流函数一起使用。

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}

非常有帮助。如果有人想按一些自定义条件进行批处理(例如按字节大小排序),那么您可以委派您的自定义谓词,并将其用作for循环的条件(在我看来,使用while循环会更易读)。 - pls
我不确定这个实现是否正确。例如,如果基础流是SUBSIZED,则从trySplit返回的拆分可能比拆分前有更多的项(如果拆分发生在批处理的中间)。 - Malt
@Malt 如果我对 Spliterators 的理解是正确的,那么 trySplit 应该总是将数据分成大致相等的两部分,因此结果永远不应该比原始数据更大? - Bruce Hamilton
很遗憾,根据文档,这些部分不能大致相等。它们必须相等:如果此Spliterator是SUBSIZED,则拆分之前此Spliterator的estimateSize()必须等于拆分后返回的Spliterator的estimateSize()之和。 - Malt
是的,这符合我对Spliterator分割的理解。然而,我很难理解“trySplit返回的拆分结果可能比拆分前有更多的项”,您能详细说明一下您的意思吗? - Bruce Hamilton

19

我们曾经有类似的问题需要解决。我们想要处理一些大于系统内存的流(遍历数据库中的所有对象),并尽可能地将其随机排序 - 我们认为缓冲10,000个项目并对它们进行随机排序应该是可以的。

目标是一个接受流作为输入的函数。

在这里提出的解决方案中,似乎有各种选项:

  • 使用各种非Java 8附加库
  • 从不是流的东西开始 - 比如随机访问列表
  • 拥有一个可以轻松分割的流

我们最初的直觉是使用自定义收集器,但这意味着退出流处理。上面的自定义收集器解决方案非常好,我们差点用它。

这里有一个解决方案是通过利用Stream可以给你一个Iterator的事实来欺骗。你可以使用逃生口来做一些额外的事情,而流不支持这样做。使用另一个Java 8 StreamSupport魔法将Iterator转换回流。

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

使用它的简单示例如下所示:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

上面的内容打印出来是:

[A, B, C]
[D, E, F]

对于我们的应用场景,我们想要将批次打乱然后保持其作为一个流 - 看起来像这样:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

这将输出类似于以下内容(每次随机生成,因此每次不同)

A
C
B
E
D
F

这里的秘密在于始终有一个流,因此您可以对批处理数据流进行操作,或对每个批次执行某些操作,然后将其flatMap回流中。更好的是,所有上述操作只在最终的forEachcollect或其他终止表达式 拉取 数据通过流。

事实证明,iterator 是流的一种特殊类型的终止操作,不会导致整个流运行并进入内存!感谢Java 8开发者的出色设计!


而且当收集到每个批次并持久化到List时,完全迭代每个批次非常好 - 你不能推迟对批次内元素的迭代,因为消费者可能想跳过整个批次,如果你没有消费这些元素,那么他们将无法跳过太远。(我已经在C#中实现了其中一个,尽管它要简单得多。) - ErikE
1
这是最好的解决方案之一。完全被低估了。 - jtahlborn

13
注意!这种解决方案在运行forEach之前会读取整个文件。 你可以使用jOOλ来实现,它是一个为单线程、顺序流使用场景扩展Java 8流的库。
Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

在幕后,zipWithIndex() 实际上是这样的:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

...而groupBy()是API方便之处:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(免责声明:我是jOOλ背后公司的员工)


哇,这正是我要找的东西。我们的系统通常按顺序处理数据流,所以将其移植到Java 8是一个很好的选择。 - Andy Dang
20
请注意,这个解决方案不必要地将整个输入流存储到中间的“Map”中(与Ben Manes的解决方案不同)。 - Tagir Valeev
确定第一批的结束,会初始化整个流并在内部缓冲它。 - Robin479

13

你也可以使用RxJava:

RxJava v3:

int batchSize = 50;
List<Table> tables = new ArrayList<>();
Observable.fromIterable(_someStream_)
        .buffer(batchSize)
        .map(batch -> process(batch))
        .blockingSubscribe(tables::addAll, t -> Log.warning("Error", t));

以前的版本:

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

或者

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

或者

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();

8
您也可以查看cyclops-react,我是这个库的作者。它实现了jOOλ接口(以及JDK 8 Streams),但与JDK 8 Parallel Streams不同,它专注于异步操作(例如潜在阻塞的Async I/O调用)。相比之下,JDK Parallel Streams专注于CPU绑定操作的数据并行性。它通过在幕后管理基于Future的任务聚合来工作,但向最终用户提供了标准的扩展Stream API。以下示例代码可能会帮助您入门。
LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

这里有一个关于批处理的教程和一个更加通用的教程
如果要使用自己的线程池(这对于阻塞 I/O 可能更合适),你可以使用以下代码启动处理:
     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();

4

这是一个使用Java 8编写的示例,可以与并行流一起使用。

使用方法:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

方法声明和实现:
public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}

4

可以很容易地使用Reactor来完成:

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接