如何避免向输出写入过多数据从而阻塞线程?

6
在由线程池运行的任务中,我想要将一堆字符串写入远程,同时有一个标记用于指示该任务是否已取消。
我使用以下代码来确保能够尽快停止:
public void sendToRemote(Iterator<String> data, OutputStream remote) {
    try {
        System.out.println("##### staring sending")
        while(!this.cancelled && data.hasNext()) {
            remote.write(data.next())
        }
        System.out.println("##### finished sending")
        System.out.println("")
    } catch(Throwable e) {
        e.printStackTrace();
    } finally {
        remote.close();
    }
}

有时候我会发现,如果将大量数据(或无限迭代器)提供给此方法,即使稍后将this.cancelled设置为true,它也不能及时完成。代码似乎被阻塞了,而在很长时间(大约1分钟)之后,会出现以下错误:

java.net.SocketTimeoutException: write blocked too long

我猜测remote.write方法可能会在要发送的数据太多时自己阻塞,但是远程主机没有及时消耗它。虽然我将this.cancelled设置为true,但是方法已经在remote.write(data.next())这一行被阻塞了很长时间,所以它没有机会检查this.cancelled的值并跳过循环。最终,在很长时间后,它会抛出一个SocketTimeoutException
我的理解正确吗?如果正确,那么如何避免当要发送的数据太多时的阻塞?

你可以使用Channel(来自java.nio.channels)而不是OutputStream。这样,写入方法就可以轻松地被中断。 - Tesseract
谢谢,但在我的情况下,它必须是一个OutputStreamremote参数来自第三方方法,我们无法更改它。 - Freewind
2
该频道仍将被阻止。 - Dexter
2
我从未见过 java.net.SocketTimeoutException: write blocked too long。你在哪个平台上?当套接字发送缓冲区已满时,写操作应该无限期地阻塞。真正的问题在于读取端。为什么它跟不上呢? - user207421
@EJP:我也从未见过这个异常,谷歌告诉我可能是这个出处:JDK6-source,在 WriteStream#block() - Ortwin Angermeier
显示剩余2条评论
2个回答

0

尝试简单地关闭远程OutputStream。您的线程将以异常结束。

  1. Thread#1正在忙于执行sendToRemote();
  2. Thread#2决定足够了,并关闭了远程连接。(假设OutPutStream对象不是线程本地的,而是全局引用)
  3. Thread#1因异常而死亡:)

编辑我在互联网上找到了this

启用延迟并将超时设置为一定的秒数将导致对Socket.Close的后续调用阻塞,直到发送缓冲区中的所有数据已发送或超时已过。


设置延迟超时确实会产生这种效果,但这有什么帮助呢? - user207421
我们可以关闭套接字而无需等待发送缓冲区排空。因此,操作员可以成功停止线程。 - Dexter
哎呀,我错过了他提到的第三方限制。我的错。 - Dexter

-1

正确的解决方案可能是以某种方式使用NIO。我已经在这里评论了Hadoop如何使用nio underneath

但更简单的解决方案在Dexter的答案中。我还发现了EJP的答案,建议使用BufferedOutputStream来控制数据何时输出。因此,我将两者结合起来得到了下面的TimedOutputStream。它不能完全控制远程输出缓冲(大部分由操作系统完成),但结合适当的缓冲区大小和写入超时提供了至少一些控制(请参见第二个程序以测试TimedOutputStream)。

我还没有完全测试TimedOutputStream,所以请自行进行尽职调查。
编辑:更新了写入方法,以更好地协调缓冲区大小和写入超时时间,并微调了测试程序。添加了关于非安全异步关闭套接字输出流的注释。

import java.io.*;
import java.util.concurrent.*;

/**
 * A {@link BufferedOutputStream} that sets time-out tasks on write operations 
 * (typically when the buffer is flushed). If a write timeout occurs, the underlying outputstream is closed
 * (which may not be appropriate when sockets are used, see also comments on {@link TimedOutputStream#interruptWriteOut}).
 * A {@link ScheduledThreadPoolExecutor} is required to schedule the time-out tasks.
 * This {@code ScheduledThreadPoolExecutor} should have {@link ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy(boolean)}
 * set to {@code true} to prevent a huge task queue.
 * If no {@code ScheduledThreadPoolExecutor} is provided in the constructor, 
 * the executor is created and shutdown with the {@link #close()} method. 
 * @author vanOekel
 *
 */
public class TimedOutputStream extends FilterOutputStream {

    protected int timeoutMs = 50_000;
    protected final boolean closeExecutor;
    protected final ScheduledExecutorService executor;
    protected ScheduledFuture<?> timeoutTask;
    protected volatile boolean writeTimedout;
    protected volatile IOException writeTimeoutCloseException;

    /* *** new methods not in BufferedOutputStream *** */

    /**
     * Default timeout is 50 seconds.
     */
    public void setTimeoutMs(int timeoutMs) {
        this.timeoutMs = timeoutMs;
    }

    public int getTimeoutMs() {
        return timeoutMs;
    }

    public boolean isWriteTimeout() {
        return writeTimedout;
    }

    /**
     * If a write timeout occurs and closing the underlying output-stream caused an exception,
     * then this method will return a non-null IOException.
     */
    public IOException getWriteTimeoutCloseException() {
        return writeTimeoutCloseException;
    }

    public ScheduledExecutorService getScheduledExecutor() {
        return executor;
    }

    /**
     * See {@link BufferedOutputStream#close()}.
     */
    @Override
    public void close() throws IOException {

        try {
            super.close(); // calls flush via FilterOutputStream.
        } finally {
            if (closeExecutor) {
                executor.shutdownNow();
            }
        }
    }

    /* ** Mostly a copy of java.io.BufferedOutputStream and updated with time-out options. *** */

    protected byte buf[];
    protected int count;

    public TimedOutputStream(OutputStream out) {
        this(out, null);
    }

    public TimedOutputStream(OutputStream out, ScheduledExecutorService executor) {
        this(out, 8192, executor);
    }

    public TimedOutputStream(OutputStream out, int size) {
        this(out, size, null);
    }

    public TimedOutputStream(OutputStream out, int size, ScheduledExecutorService executor) {
        super(out);
        if (size <= 0) {
            throw new IllegalArgumentException("Buffer size <= 0");
        }
        if (executor == null) {
            this.executor = Executors.newScheduledThreadPool(1);
            ScheduledThreadPoolExecutor stp = (ScheduledThreadPoolExecutor) this.executor;
            stp.setRemoveOnCancelPolicy(true);
            closeExecutor = true;
        } else {
            this.executor = executor;
            closeExecutor = false;
        }
        buf = new byte[size];
    }

    /**
     * Flushbuffer is called by all the write-methods and "flush()".
     */
    protected void flushBuffer(boolean flushOut) throws IOException {

        if (count > 0 || flushOut) {
            timeoutTask = executor.schedule(new TimeoutTask(this), getTimeoutMs(), TimeUnit.MILLISECONDS);
            try {
                // long start = System.currentTimeMillis(); int len = count;
                if (count > 0) {
                    out.write(buf, 0, count);
                    count = 0;
                }
                if (flushOut) {
                    out.flush(); // in case out is also buffered, this will do the actual write.
                }
                // System.out.println(Thread.currentThread().getName() + " Write [" + len + "] " + (flushOut ? "and flush " : "") + "time: " + (System.currentTimeMillis() - start));
            } finally {
                timeoutTask.cancel(false);
            }
        }
    }

    protected class TimeoutTask implements Runnable {

        protected final TimedOutputStream tout;
        public TimeoutTask(TimedOutputStream tout) {
            this.tout = tout;
        }

        @Override public void run() {
            tout.interruptWriteOut();
        }
    }

    /**
     * Closes the outputstream after a write timeout. 
     * If sockets are used, calling {@link java.net.Socket#shutdownOutput()} is probably safer
     * since the behavior of an async close of the outputstream is undefined. 
     */
    protected void interruptWriteOut() {

        try {
            writeTimedout = true;
            out.close();
        } catch (IOException e) {
            writeTimeoutCloseException = e;
        }
    }

    /**
     * See {@link BufferedOutputStream#write(int b)}
     */
    @Override
    public void write(int b) throws IOException {

        if (count >= buf.length) {
            flushBuffer(false);
        }
        buf[count++] = (byte)b;
    }

    /**
     * Like {@link BufferedOutputStream#write(byte[], int, int)}
     * but with one big difference: the full buffer is always written
     * to the underlying outputstream. Large byte-arrays are chopped
     * into buffer-size pieces and writtten out piece by piece.
     * <br>This provides a closer relation to the write timeout
     * and the maximum (buffer) size of the write-operation to wait on. 
     */
    @Override
    public void write(byte b[], int off, int len) throws IOException {

        if (count >= buf.length) {
            flushBuffer(false);
        }
        if (len <= buf.length - count) {
            System.arraycopy(b, off, buf, count, len);
            count += len;
        } else {
            final int fill = buf.length - count;
            System.arraycopy(b, off, buf, count, fill);
            count += fill;
            flushBuffer(false);
            final int remaining = len - fill;
            int start = off + fill;
            for (int i = 0; i < remaining / buf.length; i++) {
                System.arraycopy(b, start, buf, count, buf.length);
                count = buf.length;
                flushBuffer(false);
                start += buf.length;
            }
            count = remaining % buf.length;
            System.arraycopy(b, start, buf, 0, count);
        }
    }

    /**
     * See {@link BufferedOutputStream#flush()}
     * <br>If a write timeout occurred (i.e. {@link #isWriteTimeout()} returns {@code true}),
     * then this method does nothing. 
     */
    @Override
    public void flush() throws IOException {

        // Protect against flushing before closing after a write-timeout.
        // If that happens, then "out" is already closed in interruptWriteOut.
        if (!isWriteTimeout()) {
            flushBuffer(true);
        }
    }

}

还有测试程序:

import java.io.*;
import java.net.*;
import java.util.concurrent.*;

public class TestTimedSocketOut implements Runnable, Closeable {

    public static void main(String[] args) {

        TestTimedSocketOut m = new TestTimedSocketOut();
        try {
            m.run();
        } finally {
            m.close();
        }
    }

    final int clients = 3; // 2 is minimum, client 1 is expected to fail.
    final int timeOut = 1000;
    final int bufSize = 4096;
    final long maxWait = 5000L;
    // need a large array to write, else the OS just buffers everything and makes it work
    byte[] largeMsg = new byte[28_602];
    final ThreadPoolExecutor tp = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    final ScheduledThreadPoolExecutor stp = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
    final ConcurrentLinkedQueue<Closeable> closeables = new ConcurrentLinkedQueue<Closeable>();
    final CountDownLatch[] serversReady = new CountDownLatch[clients];
    final CountDownLatch clientsDone = new CountDownLatch(clients);
    final CountDownLatch serversDone = new CountDownLatch(clients);

    ServerSocket ss;
    int port;

    @Override public void run()  {

        stp.setRemoveOnCancelPolicy(true);
        try {
            ss = new ServerSocket();
            ss.bind(null);
            port = ss.getLocalPort();
            tp.execute(new SocketAccept());
            for (int i = 0; i < clients; i++) {
                serversReady[i] = new CountDownLatch(1);
                ClientSideSocket css = new ClientSideSocket(i);
                closeables.add(css);
                tp.execute(css);
                // need sleep to ensure client 0 connects first.
                Thread.sleep(50L);
            }
            if (!clientsDone.await(maxWait, TimeUnit.MILLISECONDS)) {
                println("CLIENTS DID NOT FINISH");
            } else {
                if (!serversDone.await(maxWait, TimeUnit.MILLISECONDS)) {
                    println("SERVERS DID NOT FINISH");
                } else {
                    println("Finished");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    } 

    @Override public void close() {

        try { if (ss != null) ss.close(); } catch (Exception ignored) {}
        Closeable c = null;
        while ((c = closeables.poll()) != null) {
            try { c.close(); } catch (Exception ignored) {}
        }
        tp.shutdownNow();
        println("Scheduled tasks executed: " + stp.getTaskCount() + ", max. threads: " + stp.getLargestPoolSize());
        stp.shutdownNow();
    }

    class SocketAccept implements Runnable {

        @Override public void run() {
            try {
                for (int i = 0; i < clients; i++) {
                    SeverSideSocket sss = new SeverSideSocket(ss.accept(), i);
                    closeables.add(sss);
                    tp.execute(sss);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    class SeverSideSocket implements Runnable, Closeable {

        Socket s;
        int number, cnumber;
        boolean completed;

        public SeverSideSocket(Socket s, int number) {
            this.s = s;
            this.number = number;
            cnumber = -1;
        }

        @Override public void run() {

            String t = "nothing";
            try {
                DataInputStream in = new DataInputStream(s.getInputStream());
                DataOutputStream out = new DataOutputStream(s.getOutputStream());
                serversReady[number].countDown();
                Thread.sleep(timeOut);
                t = in.readUTF();
                in.readFully(new byte[largeMsg.length], 0, largeMsg.length);
                t += in.readUTF();
                out.writeByte(1);
                out.flush();
                cnumber = in.readInt();
                completed = true;
            } catch (Exception e) {
                println("server side " + number + " stopped after " + e);
                // e.printStackTrace();
            } finally {
                println("server side " + number + " received: " + t);
                if (completed && cnumber != number) {
                    println("server side " + number + " expected client number " + number + " but got " + cnumber);
                }
                close();
                serversDone.countDown();
            }
        }

        @Override public void close() {
            TestTimedSocketOut.close(s);
            s = null;
        }
    }

    class ClientSideSocket implements Runnable, Closeable {

        Socket s;
        int number;

        public ClientSideSocket(int number) {
            this.number = number;
        }

        @SuppressWarnings("resource")
        @Override public void run() {

            Byte b = -1;
            TimedOutputStream tout = null;
            try {
                s = new Socket();
                s.connect(new InetSocketAddress(port));
                DataInputStream in = new DataInputStream(s.getInputStream());
                tout = new TimedOutputStream(s.getOutputStream(), bufSize, stp);
                if (number == 1) {
                    // expect fail
                    tout.setTimeoutMs(timeOut / 2);
                } else {
                    // expect all OK
                    tout.setTimeoutMs(timeOut * 2);
                }
                DataOutputStream out = new DataOutputStream(tout);
                if (!serversReady[number].await(maxWait, TimeUnit.MILLISECONDS)) {
                    throw new RuntimeException("Server side for client side " + number + " not ready.");
                }
                out.writeUTF("client side " + number + " starting transfer");
                out.write(largeMsg);
                out.writeUTF(" - client side " + number + " completed transfer");
                out.flush();
                b = in.readByte();
                out.writeInt(number);
                out.flush();
            } catch (Exception e) {
                println("client side " + number + " stopped after " + e);
                // e.printStackTrace();
            } finally {
                println("client side " + number + " result: " + b);
                if (tout != null) {
                    if (tout.isWriteTimeout()) {
                        println("client side " + number + " had write timeout, close exception: " + tout.getWriteTimeoutCloseException());
                    } else {
                        println("client side " + number + " had no write timeout");
                    }
                }
                close();
                clientsDone.countDown();
            }
        }

        @Override public void close() {
            TestTimedSocketOut.close(s);
            s = null;
        }
    }

    private static void close(Socket s) {
        try { if (s != null) s.close(); } catch (Exception ignored) {}
    }

    private static final long START_TIME = System.currentTimeMillis(); 

    private static void println(String msg) {
        System.out.println((System.currentTimeMillis() - START_TIME) + "\t " + msg);
    }

}

天啊,每次写入都要开一个线程。太可怕了。而且在java.net中没有任何内容表明流是异步关闭的。还有重新实现BufferedOutputStream而不是直接使用它。调用者可以在需要时直接调用flush(),完全没有必要添加额外的flush参数来破坏API。 - user207421
我忘记了异步关闭,这确实很糟糕。每次写操作都有一个未来任务,但我不希望为每个计划任务立即启动线程。我尝试直接使用BufferedOutputStream,但我找不到一种只使用超时任务向底层输出流写入的方法。额外的flush参数仅在内部(受保护)方法中存在,也许将内部方法设置为私有会更好。 - vanOekel
@EJP 我能够异步关闭被阻塞的.accept()调用。我认为对于.write()也应该适用。 - Dexter

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接