管道输入流被锁定了。

4
我正在尝试使用管道输入流编写数据。但从线程转储中可以看出,管道输入流上存在锁定。
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
FileInputStream fis = null;
GZIPOutputStream gos = null;
byte[] buffer = new byte[1024];
try {
    fis = new FileInputStream(file);
    gos = new GZIPOutputStream(pos);
    int length;
    while ((length = fis.read(buffer, 0, 1024)) != -1)
        gos.write(buffer, 0, length);
    } catch(Exception e){
        print("Could not read the file");
    }
    finally {
        try {
            fis.close();
            gos.close();
        }catch (Exception ie){ 
            printException(ie);
        }
    }
writeObject(pis);
pos.close();

writeobj方法只会从流中读取内容,但是read方法会被锁定。线程转储显示在管道输入流上等待。

main" prio=10 tid=0x08066000 nid=0x48d2 in Object.wait() [0xb7fd2000..0xb7fd31e8]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0xa5c28be8> (a java.io.PipedInputStream)
    at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:257)
    at java.io.PipedInputStream.receive(PipedInputStream.java:215)
    - locked <0xa5c28be8> (a java.io.PipedInputStream)
    at java.io.PipedOutputStream.write(PipedOutputStream.java:132)
    at java.util.zip.GZIPOutputStream.finish(GZIPOutputStream.java:95)
    at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:146)

   Locked ownable synchronizers:
    - None

我不确定是谁锁住了它。阅读文档以找出锁定调用。但无法弄清楚出了什么问题,也不知道如何克服它。

3个回答

9

使用PipedInputStream和PipedOutputStream必须在不同的线程中进行。

仔细阅读Javadoc: http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html

通常,一个线程从PipedInputStream对象中读取数据,另一个线程通过相应的PipedOutputStream写入数据。不建议尝试从单个线程中同时使用这两个对象,因为它可能会导致线程死锁。


7
PipedInputStream有一个小的非扩展缓冲区。一旦缓冲区满了,PipedOutputStream的写操作将被阻塞,直到被不同线程读取的缓冲输入。您不能在同一线程中同时使用两个操作,因为写入操作将等待无法发生的读取操作。
在您的情况下,您在写完所有数据之前没有读取任何数据,因此解决方案是改用ByteArrayOutputStreamByteArrayInputStream
  1. 将所有数据写入ByteArrayOutputStream。
  2. 完成后,在流上调用toByteArray()以检索字节数据。
  3. (可选)使用字节数据创建ByteArrayInputStream以作为InputStream读取它。

2
如果数据达到了几个G的级别,会怎么样呢?(应用程序肯定会崩溃)流是为了避免分配大内存块而创建的。 - Hamidreza Vakilian
在这种情况下,程序确实会崩溃,但是在这种情况下,OP试图将完整的数据存储在RAM中,因此我假设他们知道它能够适合。 - Boann

0
我需要一个过滤器来拦截慢速连接,因为我需要尽快关闭数据库连接,所以我最初使用了Java管道,但是当我仔细查看它们的实现时,发现它们都是同步的,所以最终我创建了自己的QueueInputStream,使用小缓冲区和阻塞队列将缓冲区放入队列中一旦满了,除了在LinkedBlockingQueue中使用的锁条件外,它是无锁的,借助小缓冲区,它应该很便宜,这个类只用于每个实例的单个生产者和消费者。
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接