用一个InputStream包装一个ByteBuffer

60

我有一个方法,它接受一个InputStream并从中读取数据。我想也能够将这个方法用于ByteBuffer。是否有一种方式可以包装一个ByteBuffer,以便可以将其作为流访问?


它是一个本地的ByteBuffer,还是由一个字节数组支持的? - EboMike
在这种情况下,由一个字节数组支持。 - Erik
4
我发现Jackson有这个: Jackson ByteBufferBackedInputStream com.fasterxml.jackson.databind.util - Geoffrey Hendrey
7个回答

86

Thilo提到的实现存在一些错误,其他网站上也直接复制了这些错误的代码:

  1. ByteBufferBackedInputStream.read() 返回一个符号扩展的int表示它读取的字节,这是错误的(值应该在[-1..255]范围内)
  2. ByteBufferBackedInputStream.read(byte[], int, int) 不会像API规范所述,在缓冲区中没有剩余字节时返回-1

ByteBufferBackedOutputStream看起来比较可靠。

下面我呈现了一个“修正”的版本。如果我发现更多漏洞(或有人指出),我将在这里更新。

更新: 从read/write方法中删除了synchronized关键字

输入流

public class ByteBufferBackedInputStream extends InputStream {

    ByteBuffer buf;

    public ByteBufferBackedInputStream(ByteBuffer buf) {
        this.buf = buf;
    }

    public int read() throws IOException {
        if (!buf.hasRemaining()) {
            return -1;
        }
        return buf.get() & 0xFF;
    }

    public int read(byte[] bytes, int off, int len)
            throws IOException {
        if (!buf.hasRemaining()) {
            return -1;
        }

        len = Math.min(len, buf.remaining());
        buf.get(bytes, off, len);
        return len;
    }
}

输出流

public class ByteBufferBackedOutputStream extends OutputStream {
    ByteBuffer buf;

    public ByteBufferBackedOutputStream(ByteBuffer buf) {
        this.buf = buf;
    }

    public void write(int b) throws IOException {
        buf.put((byte) b);
    }

    public void write(byte[] bytes, int off, int len)
            throws IOException {
        buf.put(bytes, off, len);
    }

}

1
为什么要将其同步化?您是否期望多个线程读取相同的输入流? - Nim
1
@denys,抱歉,我刚注意到你的评论 - 为什么你想让flush有那种效果?这似乎会让flip变得混乱,因为它会覆盖先前的数据,这不是flush()通常的作用。我猜你正在尝试使用一个既是输入流又是输出流的单个缓冲区作为缓冲区? - Kothar
1
@jaco0646 虽然你只需要实现单个抽象方法,但是另一个方法的默认实现是基于read(int)和write(int)实现的,因此它包含一个循环:for (int i = 0 ; i < len ; i++) { write(b[off + i]); }为了提高效率,我们可以直接将字节数组传递给缓冲区,避免转换为/从int值并为每个字节进行一次函数调用。 - Kothar
1
根据您分配ByteBuffer的方式,您可能需要覆盖InputStream#close()以释放或释放ByteBuffer - Luke Hutchison
2
应该也实现 int available() - Andreas detests censorship
显示剩余5条评论

16

在JDK中没有相关实现,但是有许多其他的实现方式可供使用,可以搜索ByteBufferInputStream。基本上,它们将一个或多个ByteBuffers包装起来,并跟踪记录已经读取了多少数据的索引。类似这样的内容经常出现,但显然存在缺陷,请参见@Mike Houston的答案以获取改进版本


我真惊讶,这对像我这样的外行来说并不是那么简单明了。 - Sridhar Sarnobat

7
如果它由一个字节数组支持,您可以使用ByteArrayInputStream,并通过ByteBuffer.array()获取字节数组。如果您尝试在本机ByteBuffer上使用它,这将引发异常。

“native ByteBuffer” 指的是通过 ByteBuffer.allocateDirect() 方法创建的 ByteBuffer 对象吗? - BD at Rivenhill
8
只有当您确定要读取支撑字节数组的全部内容时,该方法才有效。如果您有部分填充的缓冲区,则会读取超出限制的内容。 - stevevls
3
这种方法是错误的,因为缓冲区内容可能只是数组的一部分,而且数组在开头和结尾会包含其他数据。请参见get()方法的实现。 - Dmitry Risenberg

3
这是我实现的InputStreamOutputStream版本: ByteBufferBackedInputStream
public class ByteBufferBackedInputStream extends InputStream
{
  private ByteBuffer backendBuffer;

  public ByteBufferBackedInputStream(ByteBuffer backendBuffer) {
      Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!");
      this.backendBuffer = backendBuffer;
  }

  public void close() throws IOException {
      this.backendBuffer = null;
  }

  private void ensureStreamAvailable() throws IOException {
      if (this.backendBuffer == null) {
          throw new IOException("read on a closed InputStream!");
      }
  }

  @Override
  public int read() throws IOException {
      this.ensureStreamAvailable();
      return this.backendBuffer.hasRemaining() ? this.backendBuffer.get() & 0xFF : -1;
  }

  @Override
  public int read(@Nonnull byte[] buffer) throws IOException {
      return this.read(buffer, 0, buffer.length);
  }

  @Override
  public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
      this.ensureStreamAvailable();
      Objects.requireNonNull(buffer, "Given buffer can not be null!");
      if (offset >= 0 && length >= 0 && length <= buffer.length - offset) {
          if (length == 0) {
              return 0;
          }
          else {
              int remainingSize = Math.min(this.backendBuffer.remaining(), length);
              if (remainingSize == 0) {
                  return -1;
              }
              else {
                  this.backendBuffer.get(buffer, offset, remainingSize);
                  return remainingSize;
              }
          }
      }
      else {
          throw new IndexOutOfBoundsException();
      }
  }

  public long skip(long n) throws IOException {
      this.ensureStreamAvailable();
      if (n <= 0L) {
          return 0L;
      }
      int length = (int) n;
      int remainingSize = Math.min(this.backendBuffer.remaining(), length);
      this.backendBuffer.position(this.backendBuffer.position() + remainingSize);
      return (long) length;
  }

  public int available() throws IOException {
      this.ensureStreamAvailable();
      return this.backendBuffer.remaining();
  }

  public synchronized void mark(int var1) {
  }

  public synchronized void reset() throws IOException {
      throw new IOException("mark/reset not supported");
  }

  public boolean markSupported() {
      return false;
  }
}

ByteBufferBackedOutputStream:

public class ByteBufferBackedOutputStream extends OutputStream
{
    private ByteBuffer backendBuffer;

    public ByteBufferBackedOutputStream(ByteBuffer backendBuffer) {
        Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!");
        this.backendBuffer = backendBuffer;
    }

    public void close() throws IOException {
        this.backendBuffer = null;
    }

    private void ensureStreamAvailable() throws IOException {
        if (this.backendBuffer == null) {
            throw new IOException("write on a closed OutputStream");
        }
    }

    @Override
    public void write(int b) throws IOException {
        this.ensureStreamAvailable();
        backendBuffer.put((byte) b);
    }

    @Override
    public void write(@Nonnull byte[] bytes) throws IOException {
        this.write(bytes, 0, bytes.length);
    }

    @Override
    public void write(@Nonnull byte[] bytes, int off, int len) throws IOException {
        this.ensureStreamAvailable();
        Objects.requireNonNull(bytes, "Given buffer can not be null!");
        if ((off < 0) || (off > bytes.length) || (len < 0) ||
            ((off + len) > bytes.length) || ((off + len) < 0))
        {
            throw new IndexOutOfBoundsException();
        }
        else if (len == 0) {
            return;
        }

        backendBuffer.put(bytes, off, len);
    }
}

3
如果可用的话,请直接使用堆缓冲区(字节数组),否则请使用包装的bytebuffer(参见Mike Houston的答案)。
public static InputStream asInputStream(ByteBuffer buffer) {
    if (buffer.hasArray()) {
        // use heap buffer; no array is created; only the reference is used
        return new ByteArrayInputStream(buffer.array());
    }
    return new ByteBufferInputStream(buffer);
}

请注意,包装缓冲区可以有效地支持标记/重置和跳过操作。

1
请注意,.array() 是一个可选操作。它可能未实现(例如 MappedByteBuffer),并且即使已实现,对于只读缓冲区也会抛出异常。 - Sheepy
1
确实,这就是为什么有 buffer.hasArray() 的原因 :) - rmuller
2
如果您始终希望InputStream基于整个数组,那么这将是可以的,但对于具有偏移量的流,它将无法提供所需的结果。与此答案相同的问题在您之前4年就已经被提出了... - Krease
@Chris 首先,OP并没有要求支持带有偏移量的流。其次,我的回答是作为Mike Houston的答案的补充(这在文本中明确说明了)。 - rmuller
OP要求将ByteBuffer包装为流进行访问。 ByteBuffer使用偏移量来控制通常对调用者可访问的底层数组的哪个部分。这是使用ByteBuffer而不仅仅是在第一次使用byte[]的原因之一。 - Krease

2

基于ByteArrayInputStream代码的派生,需要提前正确设置适当的位置和限制,以便使用提供的ByteBuffer。

    public class ByteBufferInputStream extends InputStream
    {
        /**
         * The input ByteBuffer that was provided.
         * The ByteBuffer should be supplied with position and limit correctly set as appropriate
         */
        protected ByteBuffer buf;

        public ByteBufferInputStream(ByteBuffer buf)
        {
            this.buf = buf;
            buf.mark(); // to prevent java.nio.InvalidMarkException on InputStream.reset() if mark had not been set
        }

        /**
         * Reads the next byte of data from this ByteBuffer. The value byte is returned as an int in the range 0-255.
         * If no byte is available because the end of the buffer has been reached, the value -1 is returned.
         * @return  the next byte of data, or -1 if the limit/end of the buffer has been reached.
         */
        public int read()
        {
            return buf.hasRemaining()
                ? (buf.get() & 0xff)
                : -1;
        }

        /**
         * Reads up to len bytes of data into an array of bytes from this ByteBuffer.
         * If the buffer has no remaining bytes, then -1 is returned to indicate end of file.
         * Otherwise, the number k of bytes read is equal to the smaller of len and buffer remaining.
         * @param   b     the buffer into which the data is read.
         * @param   off   the start offset in the destination array b
         * @param   len   the maximum number of bytes read.
         * @return  the total number of bytes read into the buffer, or -1 if there is no more data because the limit/end of
         *          the ByteBuffer has been reached.
         * @exception  NullPointerException If b is null.
         * @exception  IndexOutOfBoundsException If off is negative, len is negative, or len is greater than b.length - off
         */
        public int read(byte b[], int off, int len)
        {
            if (b == null)
            {
                throw new NullPointerException();
            }
            else if (off < 0 || len < 0 || len > b.length - off)
            {
                throw new IndexOutOfBoundsException();
            }

            if (!buf.hasRemaining())
            {
                return -1;
            }

            int remaining = buf.remaining();
            if (len > remaining)
            {
                len = remaining;
            }

            if (len <= 0)
            {
                return 0;
            }

            buf.get(b, off, len);

            return len;
        }

        /**
         * Skips n bytes of input from this ByteBuffer. Fewer bytes might be skipped if the limit is reached.
         *
         * @param   n   the number of bytes to be skipped.
         * @return  the actual number of bytes skipped.
         */
        public long skip(long n)
        {
            int skipAmount = (n < 0)
                ? 0
                : ((n > Integer.MAX_VALUE)
                ? Integer.MAX_VALUE
                : (int) n);

            if (skipAmount > buf.remaining())
            {
                skipAmount = buf.remaining();
            }

            int newPos = buf.position() + skipAmount;

            buf.position(newPos);

            return skipAmount;
        }

        /**
         * Returns remaining bytes available in this ByteBuffer
         * @return the number of remaining bytes that can be read (or skipped over) from this ByteBuffer.
         */
        public int available()
        {
            return buf.remaining();
        }

        public boolean markSupported()
        {
            return true;
        }

        /**
         * Set the current marked position in the ByteBuffer.
         * <p> Note: The readAheadLimit for this class has no meaning.
         */
        public void mark(int readAheadLimit)
        {
            buf.mark();
        }

        /**
         * Resets the ByteBuffer to the marked position.
         */
        public void reset()
        {
            buf.reset();
        }

        /**
         * Closing a ByteBuffer has no effect.
         * The methods in this class can be called after the stream has been closed without generating an IOException.
         */
        public void close() throws IOException
        {
        }
    }

1
在复制并粘贴ByteBufferBackedInputStream并想要使用它后,IDE提示我Jackson已经有一个。因此,我在这里将其粘贴作为参考:
package com.fasterxml.jackson.databind.util;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

/**
 * Simple {@link InputStream} implementation that exposes currently
 * available content of a {@link ByteBuffer}.
 */
public class ByteBufferBackedInputStream extends InputStream {
    protected final ByteBuffer _b;

    public ByteBufferBackedInputStream(ByteBuffer buf) { _b = buf; }

    @Override public int available() { return _b.remaining(); }
    
    @Override
    public int read() throws IOException { return _b.hasRemaining() ? (_b.get() & 0xFF) : -1; }

    @Override
    public int read(byte[] bytes, int off, int len) throws IOException {
        if (!_b.hasRemaining()) return -1;
        len = Math.min(len, _b.remaining());
        _b.get(bytes, off, len);
        return len;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接