Java中的线程安全InputStream

13

如何创建线程安全的InputStream。在多线程操作期间,InputStream数据会被破坏,那么我该如何使我的InputStream线程安全?以下代码是否有效?

public class SynchronizedInputStream  extends InputStream{

    private InputStream in;

    private SynchronizedInputStream( InputStream in ) {
        this.in = in;
    }

    /* ... method for every InputStream type to use */
    public  static InputStream createInputStream( InputStream in) {
        return new SynchronizedInputStream( in);
    }

    public static InputStream createPushBackInputStream(InputStream in,int BUFSIZE){
        return new SynchronizedInputStream(new PushbackInputStream(in,BUFSIZE));
    }

    /* Wrap all InputStream methods Used */

    public int read(){
        synchronized (this) {
            try {
                return in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return 0;
    }

    @Override
    public int available() {
        synchronized( this ) {
            try {
                return in.available();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return 0;
    }


}
在NANOHTTPD文件中。
 public HTTPSession(TempFileManager tempFileManager, InputStream inputStream, OutputStream outputStream, InetAddress inetAddress) {
            this.inputStream=(PushbackInputStream) SynchronizedInputStream.createPushBackInputStream(inputStream);
        /*lines of code..........*/
        }

然后我这样调用它

String Data = readStream(session.getInputStream());//session is HTTPSession
/*.....code....*/
private String readStream(InputStream in) {
        synchronized (in) {
            PushbackInputStream inputStream = (PushbackInputStream) in;
            StringBuffer outputBuffer = null;
            try {
                                 //Reading the InputStream Here
                }
            } catch (IOException ioe) {
                //error msg
            }
                return outputBuffer.toString();

        } 
    }

11
为什么要对输入流进行多线程处理? - Scary Wombat
2
你想要实现什么,遇到了什么问题?在输入流周围尝试同步似乎对我来说并没有太多意义... - Matt Coubrough
@MattCoubrough 如上所述,我的方法以inputStream作为参数,并且不同的线程访问该方法。如果它是单个线程,则可以正常工作,但对于多线程数据会损坏。这就是为什么我考虑同步输入流的原因。 - Manas Pratim Chamuah
3
保持单线程,但切换到生产者/消费者模式可能更合理。要么这样做,要么创建一个线程安全的适配器,将InputStream转换为BlockingQueue。无论哪种方式,都要意识到能够并发调用read()并不能神奇地使您的代码线程安全;线程仍然需要协调彼此的读取,因此需要选择不同的方法。 - David Ehrmann
7
如果你想在多个线程中读取流,则存在设计问题。我建议只使用一个线程读取该流,并让该线程将工作传递给线程池,以实现对读取数据的多线程处理。你不应该需要超过一个线程来读取一个流,因为这样可以尽可能快地读取它,添加线程只会增加复杂性和混乱。 - Peter Lawrey
显示剩余11条评论
2个回答

10
你需要考虑它的意义。想象一下,多个人正在阅读一本神奇的书,第一次有人看到字符就会被抹去,所以每个字符只能被一个人读取。这有点像流。
这使得有用地阅读此书变得非常困难。最简单的方法,每个人只读取一些随机的字符,这样不是很有用。一个直截了当的解决方案是让一个人先读它然后复制到一个不会在阅读时抹去字符的书上。这样每个人都可以读书了。在某些情况下,不需要每个人都理解这本书,只要给他们一句话就可以工作了。在这种情况下,一个读者可以将每个句子发布到队列中,每个人按顺序读取其中的一句话。
其他方法包括使用缓冲区,每个线程存储它们读取的字符,然后每次都检查是否可以组成一个单词,如果可以则发送该单词以供下游处理。有关示例,请参见Netty的编解码器包(例如此处)。
但是,这些方法通常是在流的顶部实现而不是在其内部实现的。你可以有一个在内部实现这些方法的流,但这可能会让人们感到困惑。

7
简而言之,您展示的类是线程安全的,但使用该类的代码可能不是线程安全的!您实现的操作是原子地读取一个字符并原子地测试是否有东西可读。只有当所有线程使用相同的SynchronizedInputStream对象来访问给定的InputStream,并且除了您的包装器之外没有其他内容直接访问InputStream时,这些操作的实现才是线程安全的。但是,这很可能不足以使应用程序对流的使用在更大意义上是线程安全的。我预计你观察到的"损坏"实际上是在更高层次上发生的,例如同时进行一系列读取调用以读取(比如)消息的两个线程正在交错,以至于某些消息的字节被发送到错误的线程。假设这是您的问题,那么这并不能解决它。您的read方法仅在线程读取单个字节时锁定流。解锁后,没有任何东西可以阻止另一个线程读取下一个字节。 有几种方法可以解决这个问题。例如: - 一种简单的方法是重新构造代码,只有一个线程从给定的InputStream中读取。该线程读取消息,并将其转换为可以通过队列传递给其他人的对象。 - 另一种方法是使用一个原子地读取整个消息的包装器类替换您的包装器类。不要扩展InputStream。相反,根据更大规模的操作设计API,并在该粒度级别进行同步。 关于您添加的额外代码。看起来只有一个线程(当前请求线程)应该从输入流中读取。如果您只使用一个线程,则多线程或线程安全性就没有问题。 (而且此外,nanoHTTPD代码的工作方式就是这样设计的。) 假设有多个线程,则readStream中的synchronized(in){块通常足以使代码线程安全,前提是所有线程都使用相同的in对象。问题是,您的黑客HttpSession类为每个"会话"创建了单独的SynchronizedInputStream,而这就是您的代码同步的内容。因此,如果(某种方式)两个线程使用相同的套接字输入流创建HttpSessions对象,它们将在不同的对象上同步,并且不会互斥。
但这只是推测。到目前为止,您还没有证明有多个线程试图使用相同的输入流。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接