Java 8流和批处理

126

我有一个包含项目列表的大文件。

我想创建一批项目,将此批次作为参数在HTTP请求中发送(所有项目都需要作为HTTP请求中的参数)。我可以很容易地使用for循环来完成,但作为Java 8的爱好者,我想尝试使用Java 8的Stream框架来编写这个程序(并获得延迟处理的好处)。

示例:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

我想做类似于lazyFileStream.group(500).map(processBatch).collect(toList())的事情。

最佳方法是什么?


我还无法弄清如何执行分组,抱歉,但是Files#lines会惰性地读取文件的内容。 - user1038550
1
所以你基本上需要一个 flatMap 的反函数(+ 一个额外的 flatMap 来再次折叠流)?我不认为标准库中存在这样一个方便的方法。你要么得找到一个第三方库,要么就得根据 spliterator 和/或发出流的收集器编写自己的库。 - the8472
3
也许你可以将Stream.generatereader::readLinelimit结合起来,但问题是流与异常不太兼容。此外,这也很可能不容易并行化。我认为for循环仍然是最好的选择。 - tobias_k
我刚刚添加了一个示例代码。我认为flatMap不是正确的方法。我怀疑我可能需要编写自定义的Spliterator - Andy Dang
1
我正在创造“流滥用”这个术语来形容像这样的问题。 - kervin
1
为什么叫做“滥用”?它完美地适合流的概念,特别是对于惰性流。基本上,这需要一个“groupBy”,我不清楚在Java 8中如何清晰地编写它。 - Andy Dang
15个回答

4

公正地说,看看优雅的Vavr解决方案:

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);

3
你可以使用apache.commons库:
ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

分区部分是不懒惰地完成的,但在列表被分区之后,您可以获得使用流的好处(例如使用并行流、添加过滤器等等)。其他答案提供了更为复杂的解决方案,但有时可读性和可维护性更重要(有时则不是:-))。

不确定是谁给我点了踩,但能知道原因会更好。我提供了一个答案,为那些无法使用Guava的人提供了补充。 - Tal Joffe
@Drakemor 我正在处理一个子列表的流。请注意stream()函数调用。 - Tal Joffe
不,OP说“享受惰性处理的好处”,但把整个流放入内存中创建列表,再转换回流,这不是惰性处理。这是懒惰编程。 - Drakemor
@Drakemor 好的,你说得有道理,但仍然有些处理是懒惰的。只是不需要将其分成块的部分。没有必要追求速度...编写更少的代码并不是“懒惰编程”,而是更易于阅读和维护。 - Tal Joffe
1
说实话,我不太明白你的论点,但我想我们可以同意不同意。我已经编辑了我的答案以反映我们在这里的对话。感谢讨论。 - Tal Joffe
显示剩余4条评论

1
这是一个纯Java解决方案,采用惰性评估。
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}

1
简单示例使用Spliterator
    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

Bruce的回答更加全面,但我正在寻找一些快速而简单的方法来处理一堆文件。


1
使用Java 8和com.google.common.collect.Lists,您可以执行以下操作:
public class BatchProcessingUtil {
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    }
}

这里的T是输入列表中项目的类型,U是输出列表中项目的类型。

您可以像这样使用它:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接