将InputStream转换为固定长度字符串的Stream<String>

3

我想把一个InputStream is转换为一个Stream<String> stream,就像在将InputStream转换为Stream<String>给定一个字符集中一样。但这次,我不想按照换行符分割InputStream,而是想将其分割成相等长度的部分。因此,流中所有字符串的长度都相同(最后一个元素可能会短一些)。


字节长度和字符长度相同吗? - biziclop
2
阅读BufferedReader.lines()的源代码,并将其用作您自己方法的模型。不要读取下一行,而是必须读取下一个N个字符。 - JB Nizet
2个回答

3
我认为仅使用类库方法不可能实现这一点,所以您需要编写自己的逻辑,按照与 BufferedReader.lines 相同的模式执行操作:
  1. InputStreamReader - 首先创建一个 InputStreamReader。
  2. Iterator<String> - 实现一个自定义 Iterator 子类,将流分成任意大小的块。您可能需要实现 hasNext()next() 方法,调用 readPart() 方法来读取最多 N 个字符。
  3. Spliterators.spliteratorUnknownSize - 将迭代器传递给此方法,以创建 Spliterator。
  4. StreamSupport.stream - 将 Spliterator 传递给此方法,以创建一个流。
最终,类库没有内置的方法可以从输入流中读取并转换成固定大小的字符串,因此您需要为 #1/#2 编写这些方法。之后,在 #3/#4 转换为流并不太难,因为有类库方法可用。

直接实现 “Iterator” 似乎是一个常见的错误,这很可能是因为每个人都熟悉这种类型。但是,将像“readPart()”这样的逻辑拆分成“hasNext()” / “next()”这两种方法并不是一件简单的事情。相反,“Spliterator”的“tryAdvance”方法更加合适且实际上更容易实现。如果您直接实现“Spliterator”,则无需将“Iterator”包装到“Spliterator”中... - Holger
我猜这是很常见的,Java类库的开发人员已经为BufferedReader.lines()做了这件事:-),但你的建议也很好。 - Brett Kail
我不认为特定 bug 相关(基于 I/O 流的通常无法并行处理,因为你不知道有多少数据存在),但也许某个地方存在另一个 bug 建议用 AbstractSpliterator 子类替换类库中对 Spliterators.spliteratorUnknownSize 的使用(为了节省一个对象的分配,我猜)。 - Brett Kail
当然,首先解决具有更高并行执行潜力的流问题是一个很有动力的因素。但是,即使基于I/O的流也可以更加并行友好,如果它们能够适应分割策略以与底层缓冲策略协调一致的话。有趣的是,提问者还有另外一个问题(https://dev59.com/TIrda4cB1Zd3GeqPQsWM),在那里我讨论了为什么基于项(即行)的缓冲不是处理此类流的并行处理的正确策略。 - Holger

3

这方面没有直接的支持。您可以创建一个简单的工厂方法:

static Stream<String> strings(InputStream is, Charset cs, int size) {
    Reader r=new InputStreamReader(is, cs);
    CharBuffer cb=CharBuffer.allocate(size);
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<String>(
        Long.MAX_VALUE, Spliterator.ORDERED|Spliterator.NONNULL) {
            public boolean tryAdvance(Consumer<? super String> action) {
                try { while(cb.hasRemaining() && r.read(cb)>0); }
                catch(IOException ex) { throw new UncheckedIOException(ex); }
                cb.flip();
                if(!cb.hasRemaining()) return false;
                action.accept(cb.toString());
                cb.clear();
                return true;
            }
        }, false).onClose(()->{
            try { r.close(); }catch(IOException ex) { throw new UncheckedIOException(ex); }
        });
}

它可以像这样使用:

try(Stream<String> chunks=strings(is, StandardCharsets.UTF_8, 100)) {
    // perform operation with chunks
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接