使用超时机制从InputStream读取数据是否可行?

173

具体而言,问题是编写一个像这样的方法:

int maybeRead(InputStream in, long timeout)

如果在'timeout'毫秒内有数据可用,则返回值与in.read()相同,否则返回-2。方法返回之前,任何生成的线程都必须退出。

为避免争论,这里涉及到Java.io.InputStream主题,该主题由Sun(任何Java版本)记录。请注意,这并不像看起来那么简单。以下是一些根据Sun文档直接支持的事实。

  1. in.read()方法可能是不可中断的。

  2. 包装InputStream在Reader或InterruptibleChannel中没有帮助,因为所有这些类能做的就是调用InputStream的方法。如果可以使用这些类,那么就可以编写一个解决方案,直接在InputStream上执行相同的逻辑。

  3. in.available()始终可返回0。

  4. in.close()方法可能会阻塞或不执行任何操作。

  5. 没有通用的方法可以杀死另一个线程。

9个回答

94

使用inputStream.available()

System.in.available() 返回 0 始终是可以接受的。

我发现相反的情况 - 它总是返回可用字节数的最佳值。根据 InputStream.available() 的JavaDoc:

Returns an estimate of the number of bytes that can be read (or skipped over) 
from this input stream without blocking by the next invocation of a method for 
this input stream.

由于时间/陈旧性,估计是不可避免的。该数字可能会因为新数据的不断到来而被低估一次。但是它总是在下一个调用中“追上”——它应该考虑到所有到达的数据,除了刚好在新调用时刻到达的数据。如果有数据却永久返回0,则无法满足以上条件。

第一个注意点:具体的InputStream子类负责available()

InputStream是一个抽象类。它没有数据源。它拥有可用数据是毫无意义的。因此,available()的javadoc也说明:

The available method for class InputStream always returns 0.

This method should be overridden by subclasses.

实际上,具体的输入流类确实覆盖了available()方法,提供有意义的值而不是常数0。

第二个警告:在Windows中键入输入时,请确保使用回车符。

如果使用System.in,则当您的命令行程序将其传递时,您的程序才会收到输入。如果您正在使用文件重定向/管道(例如somefile > java myJavaApp或somecommand | java myJavaApp),则输入数据通常会立即传递。但是,如果您手动输入数据,则数据传递可能会延迟。例如,在Windows cmd.exe shell中,数据会缓冲在cmd.exe shell中。只有在按下回车键(控制-M或<enter>)后,数据才会传递给执行的Java程序。这是执行环境的限制。当然,InputStream.available()在shell缓冲数据时将返回0 - 这是正确的行为;此时没有可用的数据。一旦数据从shell可用,该方法将返回一个大于0的值。注意:Cygwin也使用cmd.exe。

最简单的解决方案(无阻塞,因此不需要超时)

只需使用以下内容:

    byte[] inputData = new byte[1024];
    int result = is.read(inputData, 0, is.available());  
    // result will indicate number of bytes read; -1 for EOF with no data read.

或等价地说,

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
    // ...
         // inside some iteration / processing logic:
         if (br.ready()) {
             int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
         }

更丰富的解决方案(在超时期限内最大程度地填充缓冲区)

声明如下:

public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
     throws IOException  {
     int bufferOffset = 0;
     long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
     while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
         int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
         // can alternatively use bufferedReader, guarded by isReady():
         int readResult = is.read(b, bufferOffset, readLength);
         if (readResult == -1) break;
         bufferOffset += readResult;
     }
     return bufferOffset;
 }

然后使用这个:

    byte[] inputData = new byte[1024];
    int readCount = readInputStreamWithTimeout(System.in, inputData, 6000);  // 6 second timeout
    // readCount will indicate number of bytes read; -1 for EOF with no data read.

4
如果 is.available() > 1024,这个建议将会失败。当然有一些流(streams)确实会返回0,例如直到最近的SSLSockets也是如此。你不能依赖这一点。 - user207421
通过readLength专门处理'is.available() > 1024'这种情况。 - Glen Best
关于SSLSockets的注释不正确 - 当缓冲区中没有数据时,它会返回0表示可用。正如我的回答所述。Javadoc:“如果套接字上没有缓冲字节,并且套接字未使用close关闭,则available将返回0。” - Glen Best
1
@GlenBest 我对于SSLSocket的评论并不是错误的。在最近之前,它一直返回零。你谈论的是现在,而我谈论的是整个JSSE的历史,我从Java 1.4(2002年)首次包含它之前就开始使用了。 - user207421
1
这实际上不是一个好答案。1)如先前所述,available()可能返回0,这取决于JVM、版本、操作系统和实现。2)如果您试图访问错误的文件,则任何read()调用都可能永远不会返回(或者至少不会在合理的超时时间内返回,有些需要10分钟)。因此,使用此解决方案是一个坏主意。Ian Jones的答案更好,如果正确编写,可读性更好。 - JayC667
显示剩余8条评论

73

假设您的流不是由套接字支持的(所以无法使用 Socket.setSoTimeout()),我认为解决这类问题的标准方法是使用 Future。

假设我有以下执行器和流:

    ExecutorService executor = Executors.newFixedThreadPool(2);
    final PipedOutputStream outputStream = new PipedOutputStream();
    final PipedInputStream inputStream = new PipedInputStream(outputStream);

我有一个写入器,它会写入一些数据,然后等待5秒钟,然后再写入最后一块数据并关闭流:

    Runnable writeTask = new Runnable() {
        @Override
        public void run() {
            try {
                outputStream.write(1);
                outputStream.write(2);
                Thread.sleep(5000);
                outputStream.write(3);
                outputStream.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    executor.submit(writeTask);

正常的阅读方式如下。读取操作会一直阻塞直到有数据可用,因此这个过程需要5秒钟完成:

    long start = currentTimeMillis();
    int readByte = 1;
    // Read data without timeout
    while (readByte >= 0) {
        readByte = inputStream.read();
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }
    System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");

输出结果为:

Read: 1
Read: 2
Read: 3
Complete in 5001ms

如果存在更根本的问题,例如写程序没有响应,那么读程序将会一直阻塞。 如果我将读操作包装在一个future中,我可以按以下方式控制超时:

    int readByte = 1;
    // Read data with timeout
    Callable<Integer> readTask = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return inputStream.read();
        }
    };
    while (readByte >= 0) {
        Future<Integer> future = executor.submit(readTask);
        readByte = future.get(1000, TimeUnit.MILLISECONDS);
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }

输出结果为:

Read: 1
Read: 2
Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)

我可以捕获TimeoutException并进行任何所需的清理操作。


17
但是阻塞的线程怎么办?它会一直留在内存中直到应用程序终止吗?如果我没错的话,当应用程序负载很重时,这可能会产生无尽的线程,甚至会阻塞更多的线程使用已经被占用和阻塞的池。如果我有错误,请纠正我。谢谢。 - Muhammad Gelbana
4
Muhammad Gelbana,你是对的:阻塞的 read() 线程会一直运行,这是不好的。但我已经找到了一种方法来避免这种情况:当超时时间到达时,在调用线程中关闭输入流(在我的情况下,我关闭了 Android 蓝牙套接字的输入流)。当你这样做时,read() 调用将立即返回。在我的情况下,我使用的是 int read(byte[]) 的重载版本,那个方法会立即返回。也许 int read() 的重载版本会抛出 IOException,因为我不知道它会返回什么。在我看来,这是正确的解决方案。 - Emmanuel Touzery
5
在应用程序终止之前,线程读取保持阻塞状态。 - Ortwin Angermeier
11
@ortang 这就是我所说的“捕获TimeoutException并进行任何清理操作”的意思...例如,我可能想要终止读取线程:...catch (TimeoutException e) { executor.shutdownNow(); } - Ian Jones
13
"executer.shutdownNow"不会杀死线程,而是尝试中断它,但无效果。没有可能进行清理,这是一个严重的问题。 - Marko Topolnik
显示剩余4条评论

27

如果您的InputStream由Socket支持,您可以使用setSoTimeout设置Socket超时时间(以毫秒为单位)。 如果在指定的超时时间内read()调用不解除阻塞,则会抛出SocketTimeoutException。

只需确保在进行read()调用之前在Socket上调用setSoTimeout。


18

我会质疑问题陈述而不是盲目接受它。你只需要从控制台或网络中获得超时时间。如果是后者,你可以使用Socket.setSoTimeout()HttpURLConnection.setReadTimeout(),它们都能够完全满足要求,只要在构建/获取它们时正确设置即可。在应用程序的任意后一点留给InputStream处理超时,这是很差的设计,并导致非常尴尬的实现。


10
还有其他情况可能会导致读取操作长时间阻塞,例如从磁带驱动器、远程挂载的网络驱动器或带有磁带机器人后端的 HFS 中读取时。 (但您回答的主要观点是正确的。) - Stephen C
1
@StephenC 感谢您的评论和示例。为了更好地举例说明,一个简单的情况可能是套接字连接已正确建立,但读取尝试被阻塞,因为数据应该从数据库中获取,但某种原因未能成功(比如说数据库没有响应,查询进入锁定状态)。在这种情况下,您需要有一种明确超时套接字读取操作的方法。 - sactiw
1
InputStream 抽象的整个意义在于不考虑底层实现。对于发布的答案的利弊进行争论是公平的,但质疑问题陈述并不能帮助讨论。 - pellucide
3
InputStream在流上工作并阻塞,但它不提供超时机制。因此,InputStream抽象不是一个恰当设计的抽象。因此,在流上请求超时并不要求太多。因此,这个问题是在寻求一个非常实际的问题的解决方案。 大多数底层实现都会阻塞。这就是流的本质。如果流的另一侧没有准备好新数据,套接字、文件和管道将会阻塞。 - pellucide
2
@EJP。我不知道你是怎么得出那个结论的。我不同意你的看法。问题陈述“如何在InputStream上设置超时”是合理的。由于框架没有提供超时的方法,因此询问这样的问题是恰当的。 - pellucide
显示剩余7条评论

6

2
+1:我不相信仅使用InputStream就有可靠的方法来完成OP所要求的操作。然而,nio是为此目的之一而创建的。 - Eddie
4
OP已经基本排除了这种可能。InputStreams天生就是阻塞的,而且可能是不可中断的。 - user207421

5

以下是一种从System.in获取NIO FileChannel并使用超时检查数据可用性的方法,这是问题描述中的特殊情况。在控制台上运行它,不要输入任何内容,等待结果。在Windows和Linux上成功地测试了Java 6。

import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;

public class Main {

    static final ByteBuffer buf = ByteBuffer.allocate(4096);

    public static void main(String[] args) {

        long timeout = 1000 * 5;

        try {
            InputStream in = extract(System.in);
            if (! (in instanceof FileInputStream))
                throw new RuntimeException(
                        "Could not extract a FileInputStream from STDIN.");

            try {
                int ret = maybeAvailable((FileInputStream)in, timeout);
                System.out.println(
                        Integer.toString(ret) + " bytes were read.");

            } finally {
                in.close();
            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    /* unravels all layers of FilterInputStream wrappers to get to the
     * core InputStream
     */
    public static InputStream extract(InputStream in)
            throws NoSuchFieldException, IllegalAccessException {

        Field f = FilterInputStream.class.getDeclaredField("in");
        f.setAccessible(true);

        while( in instanceof FilterInputStream )
            in = (InputStream)f.get((FilterInputStream)in);

        return in;
    }

    /* Returns the number of bytes which could be read from the stream,
     * timing out after the specified number of milliseconds.
     * Returns 0 on timeout (because no bytes could be read)
     * and -1 for end of stream.
     */
    public static int maybeAvailable(final FileInputStream in, long timeout)
            throws IOException, InterruptedException {

        final int[] dataReady = {0};
        final IOException[] maybeException = {null};
        final Thread reader = new Thread() {
            public void run() {                
                try {
                    dataReady[0] = in.getChannel().read(buf);
                } catch (ClosedByInterruptException e) {
                    System.err.println("Reader interrupted.");
                } catch (IOException e) {
                    maybeException[0] = e;
                }
            }
        };

        Thread interruptor = new Thread() {
            public void run() {
                reader.interrupt();
            }
        };

        reader.start();
        for(;;) {

            reader.join(timeout);
            if (!reader.isAlive())
                break;

            interruptor.start();
            interruptor.join(1000);
            reader.join(1000);
            if (!reader.isAlive())
                break;

            System.err.println("We're hung");
            System.exit(1);
        }

        if ( maybeException[0] != null )
            throw maybeException[0];

        return dataReady[0];
    }
}

有趣的是,在NetBeans 6.5内运行程序而不是在控制台中运行时,超时根本不起作用,并且调用System.exit()实际上是必要的来终止僵尸线程。发生的情况是中断线程在调用reader.interrupt()时阻塞(!)。另一个测试程序(此处未显示)还尝试关闭通道,但也无法正常工作。


无论是使用JDK 1.6还是JDK 1.7,在Mac OS上都无法正常工作。只有在读取期间按下回车键后,才能识别中断。 - user502187

4
正如jt所说,NIO是最好(也是正确的)的解决方案。但如果您真的被InputStream卡住了,您可以选择以下两种方法之一:
1.创建一个专门负责从InputStream读取数据并将结果放入缓冲区的线程。这样做只有在您只有一个流实例的情况下才有效。否则,您可能需要使用Thread类中的弃用方法来杀死线程,但这可能会导致资源泄漏。
2.依靠isAvailable指示可以无阻塞地读取的数据。但在某些情况下(例如Socket),可能需要进行潜在的阻塞读取才能报告非零值。

6
Socket.setSoTimeout() 是同样正确且更简单的解决方案。或者使用 HttpURLConnection.setReadTimeout() - user207421
4
@EJP - 只有在特定情况下才会“同样正确”;例如,如果输入流是套接字流/HTTP连接流。 - Stephen C
2
@Stephen C NIO只有在相同的情况下才是非阻塞和可选择的。例如,没有非阻塞文件I/O。 - user207421
2
@EJP 但是有非阻塞管道IO(System.in),对于本地磁盘上的文件来说,非阻塞I/O是无意义的。 - woky
1
@EJP 争论InputStream不是什么纯属胡说,因为它是抽象超类。InputStream实际上可能是一个管道(因此是非阻塞的),99%的时间System.in是一个管道。我可以编写JNI代码,从System.in获取fd并进行非阻塞IO而没有问题。但是我无法通过Java API执行此操作(inheritedChanel()不返回SelectableChannel)。怪罪NIO制造商。顺便说一下,不存在C NIO ;)。 - woky
显示剩余5条评论

1
没有办法正确地阻塞一个InputStream,除非它是由Socket创建的。
1. 永远不要忙等待
"忙等待"指的是使用CPU密集型的while循环来轮询IO资源。永远不要忙等待。正确的方法是将当前线程标记为"阻塞",并允许操作系统选择其他线程运行。当IO资源可用或超时到期时,操作系统负责将您的线程标记为"挂起"并再次选择它。Java内置的read()实现就是这样做的。
忙等待的后果:
- 如果你有一个忙等待者,当它应该空闲时,CPU将一直运行。 - 如果你有多个忙等待者,它们会相互饥饿,并在应用程序中引入延迟,延迟时间长达操作系统的量子/时钟周期时间(通常为10毫秒-100毫秒)。CPU的有效利用率也下降,根据情况可能接近0%。 - 如果你的忙等待者的线程优先级高于其他实时线程,它们甚至会饥饿外部应用程序并引入延迟。

InputStream.read() 是一个IO调用。在等待字节/超时时,它不应该占用任何CPU资源。据我所知,Java提供的read()支持适当的阻塞,但没有任何超时功能。大多数其他回答这个StackOverflow问题的答案都提供了超时功能,但使用了忙等待。

在没有忙等待的情况下,无法为InputStream实现超时。

2. Java通过允许Socket.setSoTimeout违反了面向对象编程原则

Socket.getInputStream()返回的InputStream扩展了class InputStream。根据其APIInputStream.read()是一个抽象方法,只允许抛出IOException以外的任何异常。然而,引用Socket.setSoTimeout()的API

设置此选项为非零超时时,与此套接字关联的InputStream上的read()调用将仅阻塞此时间量。如果超时到期,则会引发java.net.SocketTimeoutException异常,但套接字仍然有效。
字面上抛出一个在接口中未声明的异常。
这个问题进一步强调了InputStream接口不支持超时的事实。Java通过违反面向对象编程原则来使套接字返回支持超时的InputStream。总之,如果您有一个非基于TCP的InputStream,并且希望进行适当的超时阻塞,那么很遗憾 - 这是不可能的。

1
  1. Selector.select(timeout) 实现了非忙等待。
  2. SocketTimeoutException 继承自 InterruptedIOException,后者又继承自 IOException
- undefined
@Vlad 谢谢你非常有帮助的评论!我正在编辑答案。 - undefined

0
此答案启发,我想到了一个更面向对象的解决方案。 只有在你打算读取字符时才有效 您可以覆盖 BufferedReader 并实现类似以下内容的东西:
public class SafeBufferedReader extends BufferedReader{

    private long millisTimeout;

    ( . . . )

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(cbuf, off, len);
    }

    protected void waitReady() throws IllegalThreadStateException, IOException {
        if(ready()) return;
        long timeout = System.currentTimeMillis() + millisTimeout;
        while(System.currentTimeMillis() < timeout) {
            if(ready()) return;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                break; // Should restore flag
            }
        }
        if(ready()) return; // Just in case.
        throw new IllegalThreadStateException("Read timed out");
    }
}

这里有一个几乎完整的例子。

在一些方法中,我返回0,但你应该将其更改为-2以满足你的需求,不过我认为0更适合BufferedReader协议。没有发生错误,只是读取了0个字符。readLine方法会严重影响性能。如果你真的想使用readLine,你应该创建一个全新的BufferedReader。目前它不是线程安全的。如果在readLine等待行时有人调用操作,它将产生意想不到的结果。

我不喜欢在这里返回-2。我会抛出异常,因为有些人可能只是检查int < 0来考虑EOS。无论如何,这些方法声称“不能阻塞”,你应该检查该语句是否真实,并且不要覆盖它们。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.CharBuffer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
 * 
 * readLine
 * 
 * @author Dario
 *
 */
public class SafeBufferedReader extends BufferedReader{

    private long millisTimeout;

    private long millisInterval = 100;

    private int lookAheadLine;

    public SafeBufferedReader(Reader in, int sz, long millisTimeout) {
        super(in, sz);
        this.millisTimeout = millisTimeout;
    }

    public SafeBufferedReader(Reader in, long millisTimeout) {
        super(in);
        this.millisTimeout = millisTimeout;
    }



    /**
     * This is probably going to kill readLine performance. You should study BufferedReader and completly override the method.
     * 
     * It should mark the position, then perform its normal operation in a nonblocking way, and if it reaches the timeout then reset position and throw IllegalThreadStateException
     * 
     */
    @Override
    public String readLine() throws IOException {
        try {
            waitReadyLine();
        } catch(IllegalThreadStateException e) {
            //return null; //Null usually means EOS here, so we can't.
            throw e;
        }
        return super.readLine();
    }

    @Override
    public int read() throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return -2; // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
        }
        return super.read();
    }

    @Override
    public int read(char[] cbuf) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return -2;  // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
        }
        return super.read(cbuf);
    }

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(cbuf, off, len);
    }

    @Override
    public int read(CharBuffer target) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(target);
    }

    @Override
    public void mark(int readAheadLimit) throws IOException {
        super.mark(readAheadLimit);
    }

    @Override
    public Stream<String> lines() {
        return super.lines();
    }

    @Override
    public void reset() throws IOException {
        super.reset();
    }

    @Override
    public long skip(long n) throws IOException {
        return super.skip(n);
    }

    public long getMillisTimeout() {
        return millisTimeout;
    }

    public void setMillisTimeout(long millisTimeout) {
        this.millisTimeout = millisTimeout;
    }

    public void setTimeout(long timeout, TimeUnit unit) {
        this.millisTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
    }

    public long getMillisInterval() {
        return millisInterval;
    }

    public void setMillisInterval(long millisInterval) {
        this.millisInterval = millisInterval;
    }

    public void setInterval(long time, TimeUnit unit) {
        this.millisInterval = TimeUnit.MILLISECONDS.convert(time, unit);
    }

    /**
     * This is actually forcing us to read the buffer twice in order to determine a line is actually ready.
     * 
     * @throws IllegalThreadStateException
     * @throws IOException
     */
    protected void waitReadyLine() throws IllegalThreadStateException, IOException {
        long timeout = System.currentTimeMillis() + millisTimeout;
        waitReady();

        super.mark(lookAheadLine);
        try {
            while(System.currentTimeMillis() < timeout) {
                while(ready()) {
                    int charInt = super.read();
                    if(charInt==-1) return; // EOS reached
                    char character = (char) charInt;
                    if(character == '\n' || character == '\r' ) return;
                }
                try {
                    Thread.sleep(millisInterval);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // Restore flag
                    break;
                }
            }
        } finally {
            super.reset();
        }
        throw new IllegalThreadStateException("readLine timed out");

    }

    protected void waitReady() throws IllegalThreadStateException, IOException {
        if(ready()) return;
        long timeout = System.currentTimeMillis() + millisTimeout;
        while(System.currentTimeMillis() < timeout) {
            if(ready()) return;
            try {
                Thread.sleep(millisInterval);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Restore flag
                break;
            }
        }
        if(ready()) return; // Just in case.
        throw new IllegalThreadStateException("read timed out");
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接