如何从HttpsURLConnection创建一个Java非阻塞的InputStream?

7

基本上,我有一个URL,当聊天室发布新消息时,它会流式传输xml更新。 我想将该URL转换为InputStream,并在连接保持的同时继续从中读取,只要我没有发送Thread.interrupt()。 我遇到的问题是,当流中有内容可读取时,BufferedReader.ready()似乎不会变为true。

我正在使用以下代码:

BufferedReader buf = new BufferedReader(new InputStreamReader(ins));


String str = "";
while(Thread.interrupted() != true)
{
    connected = true;
    debug("Listening...");

    if(buf.ready())
    {
        debug("Something to be read.");
        if ((str = buf.readLine()) != null) {
            // str is one line of text; readLine() strips the newline character(s)
            urlContents += String.format("%s%n", str);
            urlContents = filter(urlContents);
        }
    }

    // Give the system a chance to buffer or interrupt.
    try{Thread.sleep(1000);} catch(Exception ee) {debug("Caught thread exception.");}
}

当我运行代码并向聊天室发送消息时,buf.ready()从未变为true,导致这些行永远不会被读取。但是,如果跳过"buf.ready()"部分,直接读取行,它会阻止进一步的操作,直到行被读取。
我该怎么做才能让buf.ready()返回true,或以一种防止阻塞的方式完成此操作?
提前感谢, 詹姆斯

1
每个连接都应该拆分成一个单独的线程。 - Nick
5个回答

10
如何创建Java非阻塞InputStream
你不能这样做。你的问题涉及术语上的矛盾。在Java中,流是阻塞的。因此,不存在“非阻塞 InputStream ”这样的东西。
当可以无阻塞地读取数据时,Reader.ready()返回true。 InputStreamsReaders是阻塞的。这里的一切都按设计工作。如果您想要更多的并发性,您将需要使用多个线程。或者使用Socket.setSoTimeout()及其在HttpURLConnection中的相关方法。

我知道我可以将事物拆分成线程...... 这段代码已经在它自己的线程(可运行)对象中。我想知道的是如何通过发送某种中断来停止线程。当输入流在等待更多数据发布到流中时,似乎会阻塞一切,包括线程中断。 - Warkior
1
你能描述一种情况,使得Reader.ready()方法返回true吗?因为根据你之前所说,Reader通常是阻塞的。基于你之前的说法,似乎Reader.ready()方法是无用的。 - Warkior
如果数据已经可用,ready() 函数将返回 true,Reader 将不会阻塞。如果没有可用的数据,ready() 函数将返回 false,Reader 将会阻塞。 - user207421
那么,如果我知道流中有可供读取的数据,为什么buf.ready()仍然返回false?这就是让我感到困惑的部分。我知道流中有准备好被读取的数据。 - Warkior
你不能“知道”那个。只有ready()才知道(以及InputStream.available(),在这两种情况下都被支持)。没有其他测试。对于一些流,例如SSL,两者都不受支持,因此ready()返回false并且available()返回零。还有一个区别,即数据是否可用以及完整的行是否可读取readLine(),包括行终止符。readLine()将阻塞直到所有内容都到达。 - user207421

7
对于非阻塞IO不要使用InputStream和Reader(或OutputStream/Writer),而是使用java.nio.*类,这种情况下使用SocketChannel(以及额外的CharsetDecoder)。

编辑:作为对您评论的回答:

具体来说,我想知道如何创建一个到https url的socket channel。

套接字(包括SocketChannels)工作在传输层(TCP),比HTTP等应用层协议低一级(或两级)。因此您无法创建到https url的套接字通道

相反,您需要打开到正确服务器和正确端口(如果URI中没有给出其他内容,则为443)的套接字通道,在客户端模式下创建SSLEngine(在javax.net.ssl中),然后从通道中读取数据,将其馈送到SSL引擎中并反之亦然,并向您的SSLEngine发送/接收正确的HTTP协议行,始终检查返回值以了解实际处理的字节数以及下一步要执行的操作。

这个非常复杂(我曾经做过),如果你不是在实现一个有很多客户端同时连接的服务器(你不能为每个连接都有一个单独的线程),你真的不想这样做。相反,请继续使用阻塞的InputStream,从你的URLConnection中读取,并将其放在一个不妨碍应用程序其他部分的空闲线程中。


具体来说,我正在寻找如何创建一个套接字通道到一个https URL。 - Warkior
1
@Warkior:看看我的最后一次编辑-你真的不想这样做。 - Paŭlo Ebermann
嗨Paŭlo,感谢您的建议。这很有道理,也回答了我对该特定方法的主要担忧。我真的无法控制服务器...只能控制客户端从流中读取。在这种情况下,有没有正确的方法来终止被阻塞的连接?如果用户更改到新的聊天室,这将需要发生。(意味着系统需要开始监听不同的流,终止旧的监听器) - Warkior

1

这看起来就是我寻找的方法,可以使线程超时并中断阻塞连接。我会尝试一下。谢谢。 - Warkior
嗯,我还没有找到一个合适的方法来正确地中断被阻塞的线程。也许是因为我的方法不对。 - Warkior

0

使用Channels没有HTTP/HTTPS的实现。无法以非阻塞的方式读取httpurlconnaction的输入流。您必须使用第三方库或自己实现SocketChannel上的HTTP。


-3
import java.io.InputStream;
import java.util.Arrays;

/**
 * This code demonstrates non blocking read from standard input using separate
 * thread for reading.
 */
public class NonBlockingRead {

    // Holder for temporary store of read(InputStream is) value
    private static String threadValue = "";

    public static void main(String[] args) throws InterruptedException {

        NonBlockingRead test = new NonBlockingRead();

        while (true) {
            String tmp = test.read(System.in, 100);
            if (tmp.length() > 0)
                System.out.println(tmp);
            Thread.sleep(1000);
        }
    }

    /**
     * Non blocking read from input stream using controlled thread
     * 
     * @param is
     *            — InputStream to read
     * @param timeout
     *            — timeout, should not be less that 10
     * @return
     */
    String read(final InputStream is, int timeout) {

        // Start reading bytes from stream in separate thread
        Thread thread = new Thread() {

            public void run() {
                byte[] buffer = new byte[1024]; // read buffer
                byte[] readBytes = new byte[0]; // holder of actually read bytes
                try {
                    Thread.sleep(5);
                    // Read available bytes from stream
                    int size = is.read(buffer);
                    if (size > 0)
                        readBytes = Arrays.copyOf(buffer, size);
                    // and save read value in static variable
                    setValue(new String(readBytes, "UTF-8"));
                } catch (Exception e) {
                    System.err.println("Error reading input stream\nStack trace:\n" + e.getStackTrace());
                }
            }
        };
        thread.start(); // Start thread
        try {
            thread.join(timeout); // and join it with specified timeout
        } catch (InterruptedException e) {
            System.err.println("Data were note read in " + timeout + " ms");
        }
        return getValue();

    }

    private synchronized void setValue(String value) {
        threadValue = value;
    }

    private synchronized String getValue() {
        String tmp = new String(threadValue);
        setValue("");
        return tmp;
    }

}

这不是非阻塞I/O。这只是一个无用且多余的阻塞I/O示例,可以通过读取超时和SocketTimeoutException实现。代码显然甚至没有经过测试。除非缓冲区长度为零(这里不是),否则InputStream.read(byte[])不可能返回零。您没有测试流结束,也没有将其传回给原始调用者。String不是潜在二进制数据的容器。答案在每个方面都是错误的。 - user207421
感谢指出逻辑错误。我已经修复了它们。总的来说,这段代码达到了在文本导向的服务器套接字中运行单元测试的目的。如果您能提供更优雅的解决方案,我将非常乐意使用它。 - valdisvi
没有人能够为一个包含自相矛盾的问题提供代码。 - user207421

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接