我有一个方法,它接受一个InputStream并从中读取数据。我想也能够将这个方法用于ByteBuffer。是否有一种方式可以包装一个ByteBuffer,以便可以将其作为流访问?
我有一个方法,它接受一个InputStream并从中读取数据。我想也能够将这个方法用于ByteBuffer。是否有一种方式可以包装一个ByteBuffer,以便可以将其作为流访问?
Thilo提到的实现存在一些错误,其他网站上也直接复制了这些错误的代码:
ByteBufferBackedInputStream.read()
返回一个符号扩展的int表示它读取的字节,这是错误的(值应该在[-1..255]范围内)ByteBufferBackedInputStream.read(byte[], int, int)
不会像API规范所述,在缓冲区中没有剩余字节时返回-1ByteBufferBackedOutputStream看起来比较可靠。
下面我呈现了一个“修正”的版本。如果我发现更多漏洞(或有人指出),我将在这里更新。
更新: 从read/write方法中删除了synchronized
关键字
public class ByteBufferBackedInputStream extends InputStream {
ByteBuffer buf;
public ByteBufferBackedInputStream(ByteBuffer buf) {
this.buf = buf;
}
public int read() throws IOException {
if (!buf.hasRemaining()) {
return -1;
}
return buf.get() & 0xFF;
}
public int read(byte[] bytes, int off, int len)
throws IOException {
if (!buf.hasRemaining()) {
return -1;
}
len = Math.min(len, buf.remaining());
buf.get(bytes, off, len);
return len;
}
}
public class ByteBufferBackedOutputStream extends OutputStream {
ByteBuffer buf;
public ByteBufferBackedOutputStream(ByteBuffer buf) {
this.buf = buf;
}
public void write(int b) throws IOException {
buf.put((byte) b);
}
public void write(byte[] bytes, int off, int len)
throws IOException {
buf.put(bytes, off, len);
}
}
flush
有那种效果?这似乎会让flip
变得混乱,因为它会覆盖先前的数据,这不是flush()
通常的作用。我猜你正在尝试使用一个既是输入流又是输出流的单个缓冲区作为缓冲区? - Kotharfor (int i = 0 ; i < len ; i++) { write(b[off + i]); }
为了提高效率,我们可以直接将字节数组传递给缓冲区,避免转换为/从int值并为每个字节进行一次函数调用。 - KotharByteBuffer
的方式,您可能需要覆盖InputStream#close()
以释放或释放ByteBuffer
。 - Luke Hutchisonint available()
。 - Andreas detests censorship在JDK中没有相关实现,但是有许多其他的实现方式可供使用,可以搜索ByteBufferInputStream。基本上,它们将一个或多个ByteBuffers包装起来,并跟踪记录已经读取了多少数据的索引。类似这样的内容经常出现,但显然存在缺陷,请参见@Mike Houston的答案以获取改进版本。
ByteArrayInputStream
,并通过ByteBuffer.array()
获取字节数组。如果您尝试在本机ByteBuffer
上使用它,这将引发异常。InputStream
和OutputStream
版本:
ByteBufferBackedInputStream
:public class ByteBufferBackedInputStream extends InputStream
{
private ByteBuffer backendBuffer;
public ByteBufferBackedInputStream(ByteBuffer backendBuffer) {
Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!");
this.backendBuffer = backendBuffer;
}
public void close() throws IOException {
this.backendBuffer = null;
}
private void ensureStreamAvailable() throws IOException {
if (this.backendBuffer == null) {
throw new IOException("read on a closed InputStream!");
}
}
@Override
public int read() throws IOException {
this.ensureStreamAvailable();
return this.backendBuffer.hasRemaining() ? this.backendBuffer.get() & 0xFF : -1;
}
@Override
public int read(@Nonnull byte[] buffer) throws IOException {
return this.read(buffer, 0, buffer.length);
}
@Override
public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
this.ensureStreamAvailable();
Objects.requireNonNull(buffer, "Given buffer can not be null!");
if (offset >= 0 && length >= 0 && length <= buffer.length - offset) {
if (length == 0) {
return 0;
}
else {
int remainingSize = Math.min(this.backendBuffer.remaining(), length);
if (remainingSize == 0) {
return -1;
}
else {
this.backendBuffer.get(buffer, offset, remainingSize);
return remainingSize;
}
}
}
else {
throw new IndexOutOfBoundsException();
}
}
public long skip(long n) throws IOException {
this.ensureStreamAvailable();
if (n <= 0L) {
return 0L;
}
int length = (int) n;
int remainingSize = Math.min(this.backendBuffer.remaining(), length);
this.backendBuffer.position(this.backendBuffer.position() + remainingSize);
return (long) length;
}
public int available() throws IOException {
this.ensureStreamAvailable();
return this.backendBuffer.remaining();
}
public synchronized void mark(int var1) {
}
public synchronized void reset() throws IOException {
throw new IOException("mark/reset not supported");
}
public boolean markSupported() {
return false;
}
}
ByteBufferBackedOutputStream
:
public class ByteBufferBackedOutputStream extends OutputStream
{
private ByteBuffer backendBuffer;
public ByteBufferBackedOutputStream(ByteBuffer backendBuffer) {
Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!");
this.backendBuffer = backendBuffer;
}
public void close() throws IOException {
this.backendBuffer = null;
}
private void ensureStreamAvailable() throws IOException {
if (this.backendBuffer == null) {
throw new IOException("write on a closed OutputStream");
}
}
@Override
public void write(int b) throws IOException {
this.ensureStreamAvailable();
backendBuffer.put((byte) b);
}
@Override
public void write(@Nonnull byte[] bytes) throws IOException {
this.write(bytes, 0, bytes.length);
}
@Override
public void write(@Nonnull byte[] bytes, int off, int len) throws IOException {
this.ensureStreamAvailable();
Objects.requireNonNull(bytes, "Given buffer can not be null!");
if ((off < 0) || (off > bytes.length) || (len < 0) ||
((off + len) > bytes.length) || ((off + len) < 0))
{
throw new IndexOutOfBoundsException();
}
else if (len == 0) {
return;
}
backendBuffer.put(bytes, off, len);
}
}
public static InputStream asInputStream(ByteBuffer buffer) {
if (buffer.hasArray()) {
// use heap buffer; no array is created; only the reference is used
return new ByteArrayInputStream(buffer.array());
}
return new ByteBufferInputStream(buffer);
}
.array()
是一个可选操作。它可能未实现(例如 MappedByteBuffer
),并且即使已实现,对于只读缓冲区也会抛出异常。 - Sheepybuffer.hasArray()
的原因 :) - rmullerByteBuffer
包装为流进行访问。 ByteBuffer
使用偏移量来控制通常对调用者可访问的底层数组的哪个部分。这是使用ByteBuffer
而不仅仅是在第一次使用byte[]
的原因之一。 - Krease基于ByteArrayInputStream代码的派生,需要提前正确设置适当的位置和限制,以便使用提供的ByteBuffer。
public class ByteBufferInputStream extends InputStream
{
/**
* The input ByteBuffer that was provided.
* The ByteBuffer should be supplied with position and limit correctly set as appropriate
*/
protected ByteBuffer buf;
public ByteBufferInputStream(ByteBuffer buf)
{
this.buf = buf;
buf.mark(); // to prevent java.nio.InvalidMarkException on InputStream.reset() if mark had not been set
}
/**
* Reads the next byte of data from this ByteBuffer. The value byte is returned as an int in the range 0-255.
* If no byte is available because the end of the buffer has been reached, the value -1 is returned.
* @return the next byte of data, or -1 if the limit/end of the buffer has been reached.
*/
public int read()
{
return buf.hasRemaining()
? (buf.get() & 0xff)
: -1;
}
/**
* Reads up to len bytes of data into an array of bytes from this ByteBuffer.
* If the buffer has no remaining bytes, then -1 is returned to indicate end of file.
* Otherwise, the number k of bytes read is equal to the smaller of len and buffer remaining.
* @param b the buffer into which the data is read.
* @param off the start offset in the destination array b
* @param len the maximum number of bytes read.
* @return the total number of bytes read into the buffer, or -1 if there is no more data because the limit/end of
* the ByteBuffer has been reached.
* @exception NullPointerException If b is null.
* @exception IndexOutOfBoundsException If off is negative, len is negative, or len is greater than b.length - off
*/
public int read(byte b[], int off, int len)
{
if (b == null)
{
throw new NullPointerException();
}
else if (off < 0 || len < 0 || len > b.length - off)
{
throw new IndexOutOfBoundsException();
}
if (!buf.hasRemaining())
{
return -1;
}
int remaining = buf.remaining();
if (len > remaining)
{
len = remaining;
}
if (len <= 0)
{
return 0;
}
buf.get(b, off, len);
return len;
}
/**
* Skips n bytes of input from this ByteBuffer. Fewer bytes might be skipped if the limit is reached.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
*/
public long skip(long n)
{
int skipAmount = (n < 0)
? 0
: ((n > Integer.MAX_VALUE)
? Integer.MAX_VALUE
: (int) n);
if (skipAmount > buf.remaining())
{
skipAmount = buf.remaining();
}
int newPos = buf.position() + skipAmount;
buf.position(newPos);
return skipAmount;
}
/**
* Returns remaining bytes available in this ByteBuffer
* @return the number of remaining bytes that can be read (or skipped over) from this ByteBuffer.
*/
public int available()
{
return buf.remaining();
}
public boolean markSupported()
{
return true;
}
/**
* Set the current marked position in the ByteBuffer.
* <p> Note: The readAheadLimit for this class has no meaning.
*/
public void mark(int readAheadLimit)
{
buf.mark();
}
/**
* Resets the ByteBuffer to the marked position.
*/
public void reset()
{
buf.reset();
}
/**
* Closing a ByteBuffer has no effect.
* The methods in this class can be called after the stream has been closed without generating an IOException.
*/
public void close() throws IOException
{
}
}
package com.fasterxml.jackson.databind.util;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* Simple {@link InputStream} implementation that exposes currently
* available content of a {@link ByteBuffer}.
*/
public class ByteBufferBackedInputStream extends InputStream {
protected final ByteBuffer _b;
public ByteBufferBackedInputStream(ByteBuffer buf) { _b = buf; }
@Override public int available() { return _b.remaining(); }
@Override
public int read() throws IOException { return _b.hasRemaining() ? (_b.get() & 0xFF) : -1; }
@Override
public int read(byte[] bytes, int off, int len) throws IOException {
if (!_b.hasRemaining()) return -1;
len = Math.min(len, _b.remaining());
_b.get(bytes, off, len);
return len;
}
}