Java - 如何从输入流(套接字/套接字服务器)读取未知数量的字节?

20

想要通过inputStream从套接字中读取字节。服务器发送的字节数量可能不固定,客户端无法预先知道字节数组的长度。如何完成这个任务?


byte b[] = new byte[1024]; 
int bytesRead = sock.getInputStream().read(b); //检查是否读取到数据

这样做可以避免Net BzEAnSZ报告“可能未初始化”的错误。帮助解决问题。


你的问题与输入流无关,它只是由于未能初始化数组引用“b”而导致编译错误。答案是将其初始化。标题不清晰,扣1分。 - user207421
11个回答

28

你需要根据需要扩展缓冲区,通过每次读取1024字节的块来实现,就像我之前写的这个示例代码一样。

    byte[] resultBuff = new byte[0];
    byte[] buff = new byte[1024];
    int k = -1;
    while((k = sock.getInputStream().read(buff, 0, buff.length)) > -1) {
        byte[] tbuff = new byte[resultBuff.length + k]; // temp buffer size = bytes already read + bytes last read
        System.arraycopy(resultBuff, 0, tbuff, 0, resultBuff.length); // copy previous bytes
        System.arraycopy(buff, 0, tbuff, resultBuff.length, k);  // copy current lot
        resultBuff = tbuff; // call the temp buffer as your result buff
    }
    System.out.println(resultBuff.length + " bytes read.");
    return resultBuff;

2
如果网络已经饱和(即k很小),重新分配可能是无效的。实际上,即使k=1024,在每个周期重新分配也是昂贵的。我会预先分配一个更大的块(通常建议是当前大小的两倍),并保持其当前位置的偏移量。 - Vladimir Dyuzhev
2
我解决的问题是缓冲区分配和扩展策略。它绝不意味着要解决任何性能问题,因为这些问题非常具体。我的代码旨在读取小文件,大多数文件大小都在1kb以下,因此这个数字对我来说很有意义。通常,在服务器端会话特定代码中,选择“常规/平均”字节传输量是一个有效的选择,如果这样的缓冲区过大,可能会导致并发会话堵塞可用内存。在其他情况下,双倍于此的想法也可能很好-所以这完全取决于情况。 - d-live
2
嘿,放松点,不要把它当成个人问题! :) 这确实是一段很棒的代码。 - Vladimir Dyuzhev
我喜欢你的版本,但需要做一些更改: int available = resIn.available(); byte[] buff = new byte[available]; ByteArrayOutputStream bao = new ByteArrayOutputStream(available); int bytesRead = -1; while ((bytesRead = resIn.read(buff, 0, buff.length)) > -1) { bao.write(buff, 0, bytesRead); } - Marcus Becker

14
假设发送方在数据结束时关闭流:
ByteArrayOutputStream baos = new ByteArrayOutputStream();

byte[] buf = new byte[4096];
while(true) {
  int n = is.read(buf);
  if( n < 0 ) break;
  baos.write(buf,0,n);
}

byte data[] = baos.toByteArray();

2
假设发送方没有关闭流,如果没有更多的可用字节,该方法将会阻塞。但是这个答案让我朝着正确的方向前进了。 - Andreas Dolk
如果发送方发送的数据超过了buf的容量(即>4096字节),这也会导致数组越界异常。 - mmo

11

读取一个整数,它表示接收到的下一段数据的大小。创建一个与该大小相同的缓冲区,或使用现有的空间充足的缓冲区。将数据读入缓冲区中,确保它的大小不超过之前读取的大小。反复执行该操作 :)

如果像你所说的那样真的不知道大小,请像其他答案中提到的那样,将数据读入扩展的 ByteArrayOutputStream 中。但是,使用 size 方法确实是最可靠的。


3
请注意,从远端传递的大小需要进行验证以确保合理。恶意用户或远程软件中的错误可能会导致你为缓冲区分配1G内存,从而立即造成OOM(内存不足)。 - Vladimir Dyuzhev
我想,如果你验证后发现无效,唯一的选择就是关闭流并让另一端重新初始化一个新的流。 - Chris Dennett

8
简单的答案是:
byte b[] = new byte[BIG_ENOUGH];
int nosRead = sock.getInputStream().read(b);

其中BIG_ENOUGH足够大。


但是总的来说,这存在一个大问题。单个read调用不能保证返回另一端写入的所有内容。

  • 如果nosRead值为BIG_ENOUGH,则您的应用程序无法确定是否还有更多字节要到达;另一端可能恰好发送了BIG_ENOUGH字节...或多于BIG_ENOUGH字节。在前一种情况下,如果尝试读取,则您的应用程序将永远阻塞。在后一种情况下,您的应用程序必须进行(至少)另一个read以获取其余数据。

  • 如果nosRead值小于BIG_ENOUGH,则您的应用程序仍然不知道。它可能已经接收到所有内容,部分数据可能已经延迟(由于网络数据包分段,网络数据包丢失,网络分区等),或者另一端可能在发送数据时被阻塞或崩溃。

最佳答案是:要么您的应用程序需要事先知道要期望多少字节,要么应用程序协议需要以某种方式告诉应用程序要期望多少字节或何时发送所有字节。

可能的方法包括:

  • 应用程序协议使用固定的消息大小(不适用于您的示例)
  • 应用程序协议消息大小在消息头中指定
  • 应用程序协议使用消息结束标记
  • 应用程序协议不是基于消息的,并且另一端关闭连接以表示这就是结尾

如果没有这些策略之一,您的应用程序将被迫猜测,并有可能偶尔出错。

然后您可以使用多个read调用和(可能)多个缓冲区。


1
在这个配备至少2G内存的廉价机器时代,为什么还在使用1K缓冲区?大家需要走出80年代,迈向未来。手动内存管理已经是过去时了。 - Joe Zitzelberger
@Joe - 这不是根本的存储管理问题。我可以将缓冲区任意扩大,仍然会遇到问题。 - Stephen C
同意。大多数JVM无论机器上可用的RAM有多少,都会在1.3g或1.4g处崩溃,因为它们依赖于地址空间中可用的RAM。然而,现代操作系统的getmain例程是什么,即使请求一个小块,也很可能得到一个大块。所以我总是默认“一开始就请求一个非常大的块”并保存细节。 - Joe Zitzelberger
@Joe - 你没理解我的意思。即使JVM能够请求并获得一个无限大的缓冲区,问题仍然存在。请阅读我的完整答案。 - Stephen C

8
不必重复造轮子,可以使用Apache Commons:
IOUtils.toByteArray(inputStream);

例如,带有错误处理的完整代码如下:
    public static byte[] readInputStreamToByteArray(InputStream inputStream) {
    if (inputStream == null) {
        // normally, the caller should check for null after getting the InputStream object from a resource
        throw new FileProcessingException("Cannot read from InputStream that is NULL. The resource requested by the caller may not exist or was not looked up correctly.");
    }
    try {
        return IOUtils.toByteArray(inputStream);
    } catch (IOException e) {
        throw new FileProcessingException("Error reading input stream.", e);
    } finally {
        closeStream(inputStream);
    }
}

private static void closeStream(Closeable closeable) {
    try {
        if (closeable != null) {
            closeable.close();
        }
    } catch (Exception e) {
        throw new FileProcessingException("IO Error closing a stream.", e);
    }
}

在这里,FileProcessingException是您应用程序特定的有意义的运行时异常,它将不受干扰地传递到您正确的处理程序,而不会污染中间代码。


1

将所有输入数据流式传输到输出流中。以下是一个可行的示例:

    InputStream inputStream = null;
    byte[] tempStorage = new byte[1024];//try to read 1Kb at time
    int bLength;
    try{

        ByteArrayOutputStream outputByteArrayStream =  new ByteArrayOutputStream();     
        if (fileName.startsWith("http"))
            inputStream = new URL(fileName).openStream();
        else
            inputStream = new FileInputStream(fileName);            

        while ((bLength = inputStream.read(tempStorage)) != -1) {
                outputByteArrayStream.write(tempStorage, 0, bLength);
        }
        outputByteArrayStream.flush();
        //Here is the byte array at the end
        byte[] finalByteArray = outputByteArrayStream.toByteArray();
        outputByteArrayStream.close();
        inputStream.close();
    }catch(Exception e){
        e.printStackTrace();
        if (inputStream != null) inputStream.close();
    }

0
使用BufferedInputStream,并使用available()方法返回可供读取的字节数大小,然后构造一个相应大小的byte[]。问题解决了。 :)
BufferedInputStream buf = new BufferedInputStream(is);  
int size = buf.available();

1
.available()只提供估计值-我不会使用此估计值来构造byte []。 - chzbrgla

0

这个问题已经有7年了,但我遇到了一个类似的问题,当我做一个可与 NIO 和 OIO 兼容的系统时(客户端和服务器可能是任何他们想要的,OIO 或 NIO)。

由于阻塞 InputStreams 的存在,这是一个相当大的挑战。

我找到了一种方法,使之成为可能,并且我想张贴出来,以帮助那些有类似问题的人。

在这里,使用DataInputStream读取动态大小的字节数组,可以简单地将其包装在socketInputStream周围。此外,我不想引入特定的通信协议(例如首先发送要发送的字节数的大小),因为我想使这尽可能原始。首先,我有一个简单的实用程序缓冲类,它看起来像这样:

import java.util.ArrayList;
import java.util.List;

public class Buffer {

    private byte[] core;
    private int capacity;

    public Buffer(int size){
        this.capacity = size;
        clear();
    }

    public List<Byte> list() {
        final List<Byte> result = new ArrayList<>();
        for(byte b : core) {
            result.add(b);
        }

        return result;
    }

    public void reallocate(int capacity) {
        this.capacity = capacity;
    }

    public void teardown() {
        this.core = null;
    }

    public void clear() {
        core = new byte[capacity];
    }

    public byte[] array() {
        return core;
    }
}

这个类只是因为 Java 中字节和 Byte 自动装箱的愚蠢方式与此列表一起使用而存在。在这个例子中,这实际上并不是必需的,但我不想在这个解释中遗漏任何东西。

接下来是两个简单的核心方法。在这些方法中,一个 StringBuilder 被用作“回调”。它将被填充已读取的结果,并返回已读取的字节数量。当然,这可以以不同的方式完成。

private int readNext(StringBuilder stringBuilder, Buffer buffer) throws IOException {
    // Attempt to read up to the buffers size
    int read = in.read(buffer.array());
    // If EOF is reached (-1 read)
    // we disconnect, because the
    // other end disconnected.
    if(read == -1) {
        disconnect();
        return -1;
    }
    // Add the read byte[] as
    // a String to the stringBuilder.
    stringBuilder.append(new String(buffer.array()).trim());
    buffer.clear();

    return read;
}

private Optional<String> readBlocking() throws IOException {
    final Buffer buffer = new Buffer(256);
    final StringBuilder stringBuilder = new StringBuilder();
    // This call blocks. Therefor
    // if we continue past this point
    // we WILL have some sort of
    // result. This might be -1, which
    // means, EOF (disconnect.)
    if(readNext(stringBuilder, buffer) == -1) {
        return Optional.empty();
    }
    while(in.available() > 0) {
        buffer.reallocate(in.available());
        if(readNext(stringBuilder, buffer) == -1) {
            return Optional.empty();
        }
    }

    buffer.teardown();

    return Optional.of(stringBuilder.toString());
}

第一个方法readNext将使用来自DataInputStream的byte[]填充缓冲区,并返回以此方式读取的字节数。

在第二个方法中,readBlocking,我利用了阻塞的特性,不必担心消费者生产者问题。简单地说,readBlocking将阻塞,直到收到一个新的字节数组。在调用此阻塞方法之前,我们分配了一个缓冲区大小。注意,我在第一次读取后(在while循环内部)调用了reallocate。这不是必需的。您可以安全地删除此行,代码仍将正常工作。我之所以这样做,是因为我的问题的独特性。

我没有更详细地解释的两件事是: 1. in(DataInputStream和此处唯一的short变量,对此表示抱歉) 2. 断开连接(您的断开连接例程)

总而言之,现在您可以这样使用它:

// The in has to be an attribute, or an parameter to the readBlocking method
DataInputStream in = new DataInputStream(socket.getInputStream());
final Optional<String> rawDataOptional = readBlocking();
rawDataOptional.ifPresent(string -> threadPool.execute(() -> handle(string)));

这将为您提供一种通过套接字(或任何InputStream)读取任何形状或形式的字节数组的方法。希望这有所帮助!


0

这里是一个更简单的示例,使用ByteArrayOutputStream...

        socketInputStream = socket.getInputStream();
        int expectedDataLength = 128; //todo - set accordingly/experiment. Does not have to be precise value.
        ByteArrayOutputStream baos = new ByteArrayOutputStream(expectedDataLength);
        byte[] chunk = new byte[expectedDataLength];
        int numBytesJustRead;
        while((numBytesJustRead = socketInputStream.read(chunk)) != -1) {
            baos.write(chunk, 0, numBytesJustRead);
        }
        return baos.toString("UTF-8");

然而,如果服务器没有返回-1,则需要以其他方式检测数据的结束 - 例如,返回的内容是否总是以某个标记(例如“”)结尾,或者您可能可以使用socket.setSoTimeout()来解决。(提到这一点是因为它似乎是一个常见的问题。)

expectedDataLength = 128。他说他不知道长度。 - Stealth Rabbi

0

有两种解决方案:

  1. 让发送方在传输字节后关闭套接字。然后在接收方,一直读取直到结束标志。

  2. 按照Chris的建议,让发送方添加长度字段,然后读取相应数量的字节。

  3. 使用自描述协议,例如XML、序列化等。


从发布的上下文中可以看出,@farmostrich已经假定流将被关闭。他的问题是缓冲区分配。 - Vladimir Dyuzhev
@road to yamburg:他没有这么说,而且大多数情况下根本没有必要使用完全正确大小的字节数组:只需像另一个答案所示那样流式传输数据,或者按照您的要求使用ByteArrayOutputStream即可。这不是一个主要问题。 - user207421

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接