标准输出流的并发写入

6

我正在编写一个应用程序,涉及将大量数据写入OutputStream(属于Socket)。使这件事有些复杂的是通常有多个线程尝试写入同一个OutputStream。目前,我已经设计好了单独一个线程来管理被写入数据的OutputStream。该线程包含一个队列(LinkedList),它会轮询字节数组并尽快将它们写入。

private class OutputStreamWriter implements Runnable {

    private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>();

    public void run() {
        OutputStream outputStream = User.this.outputStream;
        while (true) {
            try {
                if (chunkQueue.isEmpty()) {
                    Thread.sleep(100);
                    continue;
                }
                outputStream.write(chunkQueue.poll());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

这种设计的问题在于,随着越来越多的写操作发生,越来越多的数据排队等待写入,但写入速度并没有变快。最初,当数据放入队列时,它几乎立即被写入。然后大约15秒左右之后,数据开始滞后;从数据排队到实际写入的时间会有一定的延迟。随着时间的推移,这种延迟变得越来越长。这是非常明显的。
修复这个问题的方法是使用某种ConcurrentOutputStream实现,使数据能够无需阻塞地发送,从而避免写操作被堵塞(甚至可以不用队列)。我不知道是否有这样的实现 - 我一直无法找到 - 而且我个人认为这是不可能实现的。
那么,有人有什么建议可以重新设计这个吗?

4
那不是很积极建设性的。有什么问题吗? - Martin Tuskevicius
顺便问一下,你是否在同步修改你的链表?因为它本身不是线程安全的设计。另外,你在套接字输出上面添加了什么类型的输出流,以及你通过它推送了多少数据? - Perception
3个回答

4

套接字的吞吐量有限,如果它比您的数据生成吞吐量慢,那么数据必须被缓冲,无法避免。并发写入不会起到任何帮助作用。

当排队的数据超过一定限制时,您可以考虑暂停数据生成以减少内存消耗。


我只是在这里试探性地提出一些想法,但是,SocketChannel 怎么样? - Martin Tuskevicius
我认为这不会有帮助。瓶颈在于网络带宽。 - irreputable

1

我需要一个过滤器来拦截慢速连接,在我需要尽快关闭数据库连接的情况下,我最初使用了Java管道,但当仔细查看它们的实现时,发现它都是同步的,所以最终我创建了自己的QueueInputStream,使用小缓冲区和阻塞队列将缓冲区放入队列中一旦满了,除了在LinkedBlockingQueue中使用的锁条件外,它是无锁的,借助小缓冲区,这应该是便宜的,此类仅用于每个实例的单个生产者和消费者,并且您应该传递一个ExecutorService来启动将排队的字节流式传输到最终OutputStream:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

0

我同意@irreputable的观点,同时写入不会有任何帮助。相反,你应该看看生产方面,也就是你已经拥有的东西。

  1. 使用BlockingQueue而不是LinkedList。

  2. 使用队列的阻塞轮询操作,而不仅仅是盲目地睡眠100msl,这在定义上将浪费平均50%的时间。长期来看,这可能真的会累积起来。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接