在Java中,如何以并行线程的方式最佳地写入文件?

28

我有一个程序需要频繁地执行计算并将结果写入文件。我知道频繁的写操作会极大地减慢程序的运行速度,因此为了避免这种情况,我想要创建一个专门用于写操作的第二个线程。

目前我是通过下面这个类来实现的(急于求解的可以跳到问题的结尾看代码):

public class ParallelWriter implements Runnable {

    private File file;
    private BlockingQueue<Item> q;
    private int indentation;

    public ParallelWriter( File f ){
        file = f;
        q = new LinkedBlockingQueue<Item>();
        indentation = 0;
    }

    public ParallelWriter append( CharSequence str ){
        try {
            CharSeqItem item = new CharSeqItem();
            item.content = str;
            item.type = ItemType.CHARSEQ;
            q.put(item);
            return this;
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public ParallelWriter newLine(){
        try {
            Item item = new Item();
            item.type = ItemType.NEWLINE;
            q.put(item);
            return this;
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void setIndent(int indentation) {
        try{
            IndentCommand item = new IndentCommand();
            item.type = ItemType.INDENT;
            item.indent = indentation;
            q.put(item);
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void end(){
        try {
            Item item = new Item();
            item.type = ItemType.POISON;
            q.put(item);
        } catch (InterruptedException ex) {
            throw new RuntimeException( ex );
        }
    }

    public void run() {

        BufferedWriter out = null;
        Item item = null;

        try{
            out = new BufferedWriter( new FileWriter( file ) );
            while( (item = q.take()).type != ItemType.POISON ){
                switch( item.type ){
                    case NEWLINE:
                        out.newLine();
                        for( int i = 0; i < indentation; i++ )
                            out.append("   ");
                        break;
                    case INDENT:
                        indentation = ((IndentCommand)item).indent;
                        break;
                    case CHARSEQ:
                        out.append( ((CharSeqItem)item).content );
                }
            }
        } catch (InterruptedException ex){
            throw new RuntimeException( ex );
        } catch  (IOException ex) {
            throw new RuntimeException( ex );
        } finally {
            if( out != null ) try {
                out.close();
            } catch (IOException ex) {
                throw new RuntimeException( ex );
            }
        }
    }

    private enum ItemType {
        CHARSEQ, NEWLINE, INDENT, POISON;
    }
    private static class Item {
        ItemType type;
    }
    private static class CharSeqItem extends Item {
        CharSequence content;
    }
    private static class IndentCommand extends Item {
        int indent;
    }
}

然后我通过以下方式使用它:

ParallelWriter w = new ParallelWriter( myFile );
new Thread(w).start();

/// Lots of
w.append(" things ").newLine();
w.setIndent(2);
w.newLine().append(" more things ");

/// and finally
w.end();

虽然这样做完全有效,但我想知道:是否有更好的方法来完成这个任务?


1
类似问题:https://dev59.com/M2oy5IYBdhLWcg3wYM2B - Vadzim
4个回答

15

你的基本方法看起来很不错。我会按以下方式构建代码:

    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.IOException;
    import java.io.Writer;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public interface FileWriter {
        FileWriter append(CharSequence seq);
    
        FileWriter indent(int indent);
    
        void close();
    }
    
    class AsyncFileWriter implements FileWriter, Runnable {
        private final File file;
        private final Writer out;
        private final BlockingQueue<Item> queue = new LinkedBlockingQueue<Item>();
        private volatile boolean started = false;
        private volatile boolean stopped = false;
    
        public AsyncFileWriter(File file) throws IOException {
            this.file = file;
            this.out = new BufferedWriter(new java.io.FileWriter(file));
        }
    
        public FileWriter append(CharSequence seq) {
            if (!started) {
                throw new IllegalStateException("open() call expected before append()");
            }
            try {
                queue.put(new CharSeqItem(seq));
            } catch (InterruptedException ignored) {
            }
            return this;
        }
    
        public FileWriter indent(int indent) {
            if (!started) {
                throw new IllegalStateException("open() call expected before append()");
            }
            try {
                queue.put(new IndentItem(indent));
            } catch (InterruptedException ignored) {
            }
            return this;
        }
    
        public void open() {
            this.started = true;
            new Thread(this).start();
        }
    
        public void run() {
            while (!stopped) {
                try {
                    Item item = queue.poll(100, TimeUnit.MICROSECONDS);
                    if (item != null) {
                        try {
                            item.write(out);
                        } catch (IOException logme) {
                        }
                    }
                } catch (InterruptedException e) {
                }
            }
            try {
                out.close();
            } catch (IOException ignore) {
            }
        }
    
        public void close() {
            this.stopped = true;
        }
    
        private static interface Item {
            void write(Writer out) throws IOException;
        }
    
        private static class CharSeqItem implements Item {
            private final CharSequence sequence;
    
            public CharSeqItem(CharSequence sequence) {
                this.sequence = sequence;
            }
    
            public void write(Writer out) throws IOException {
                out.append(sequence);
            }
        }
    
        private static class IndentItem implements Item {
            private final int indent;
    
            public IndentItem(int indent) {
                this.indent = indent;
            }
    
            public void write(Writer out) throws IOException {
                for (int i = 0; i < indent; i++) {
                    out.append(" ");
                }
            }
        }
    }

如果您不想在单独的线程中编写(可能是为了测试),则可以实现一个FileWriter,该FileWriter在调用方线程中调用Writer上的append方法。


谢谢,将特定于项目的任务委托给项目本身比我之前的做法更符合面向对象编程。此外,使用this.stopped来结束读取与使用毒素元素相比,是否有特殊优势? - trutheality
另外,您的缩进操作有些不同:我的缩进设置所有未来行的缩进,而您的仅在当前位置缩进。 - trutheality
@trutheality 我使用了 stopped 变量,因为它是一种协作停止线程的标准习语。此外,您可以使用它来防止在调用 end 后调用 append。我误解了原始代码中缩进操作的功能。 - Binil Thomas
@trutheality我也对所有不可变的成员使用final。唯一会发生变化的是队列和标志。队列是一个被认为是线程安全的标准类。标志被编写为其新值不依赖于旧值 - 因此将它们标记为volatile是安全的。我认为这使得整个线程安全性更容易理解。 - Binil Thomas
关于这个问题,我认为这个编写者不会写入所有提供的输入;但是一旦调用stop()方法,它就会停止,因为在while()循环条件中进行了检查。所以可能我们需要检查队列是否为空并且已经停止。 - lkamal

6

使用Exchanger是与单个消费者线程交换数据的好方法。

您可以使用StringBuilder或ByteBuffer作为要与后台线程交换的缓冲区。产生的延迟大约为1微秒,不涉及创建任何对象,并且比使用BlockingQueue更低。

以下是我认为值得在此重复的示例。

class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
 }

谢谢你的想法,它确实教会了我一些新东西。在我的情况下,我不知道那是否是正确的方法:我真的不希望生产者等待消费者,但这似乎是必要的。 - trutheality
@truheality,如果消费者跟不上,生产者才会等待。在这种情况下,您可能会遇到一个问题,队列可能只是隐藏了它。一旦队列变得过长,性能可能会以不可预测的方式受到影响。 - Peter Lawrey

6
使用LinkedBlockingQueue是一个很好的想法。虽然我不太喜欢代码风格,但原则似乎是正确的。
我可能会为LinkedBlockingQueue添加容量,等于您总内存的一定百分比,比如说10,000个项目。这样,如果您的写入速度太慢,您的工作线程就不会继续添加更多的工作,直到堆被炸掉。

1
我知道频繁的写操作会严重拖慢程序的速度。
只要使用缓冲,可能没有你想象的那么严重。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接