同时处理大型文本文件

4
所以我有一个大文本文件,大小约为4.5GB,我需要尽可能快地处理整个文件。现在,我使用了3个线程(不包括主线程)进行多线程处理。一个用于读取输入文件的输入线程,一个用于处理数据的处理线程,以及一个用于将处理后的数据输出到文件的输出线程。
目前,瓶颈在处理部分。因此,我想将更多的处理线程添加到其中。然而,这会创建一种情况,即我有多个线程访问同一个BlockingQueue,并且它们的结果因此无法保持输入文件的顺序。
我正在寻找的功能示例看起来像这样: 输入文件:1、2、3、4、5 输出文件:^相同。不是2、1、4、3、5或任何其他组合。
我编写了一个虚拟程序,其功能与实际程序相同,只是没有处理部分(由于处理类包含机密信息,无法提供实际程序)。我还应该提到,所有类(Input、Processing和Output)都是Main类中的内部类,该类包含initialize()方法和主线程代码中提到的类级变量。
static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;

private void initialise() throws IOException {
    BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
    BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million. 

    String inputFileName = "test.txt";
    String outputFileName = "outputTest.txt";

    BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
    BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));


    Thread T1 = new Thread(new Input(reader, inputQueue));
    Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
    Thread T3 = new Thread(new Output(writer, outputQueue));

    T1.start();
    T2.start();
    T3.start();

    while (!writerFinished) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    reader.close();
    writer.close();

    System.out.println("Exited.");
}

输入线程:(请原谅注释的调试代码,用于确保读取线程正常执行)。

class Input implements Runnable {
    BufferedReader reader;
    BlockingQueue<String> inputQueue;

    Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
        this.reader = reader;
        this.inputQueue = inputQueue;
    }

    @Override
    public void run() {
        String poisonPill = "ChH92PU2KYkZUBR";
        String line;
        //int linesRead = 0;

        try {
            while ((line = reader.readLine()) != null) {
                inputQueue.put(line);
                //linesRead++;

                /*
                if (linesRead == 500_000) {
                    //batchesRead += 1;
                    //System.out.println("Batch read");
                    linesRead = 0;
                }
                */
            }

            inputQueue.put(poisonPill);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

        readerFinished = true;

    }
}

处理线程:(通常情况下,它实际上会对行进行某些操作,但是为了模拟目的,我只是立即推送到输出线程)。如果必要,我们可以通过让线程每行休眠一小段时间来模拟其执行一些工作。

class Processing implements Runnable {
    BlockingQueue<String> inputQueue;
    BlockingQueue<String> outputQueue;

    Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
        this.inputQueue = inputQueue;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (inputQueue.isEmpty() && readerFinished) {
                    break;
                }

                String line = inputQueue.take();
                outputQueue.put(line);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

输出线程:

class Output implements Runnable {
    BufferedWriter writer;
    BlockingQueue<String> outputQueue;

    Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
        this.writer = writer;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        String line;
        ArrayList<String> outputList = new ArrayList<>();

        while (true) {
            try {
                line = outputQueue.take();

                if (line.equals("ChH92PU2KYkZUBR")) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer finished - executing termination");

                    writerFinished = true;
                    break;
                }

                line += "\n";
                outputList.add(line);

                if (outputList.size() == 500_000) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer wrote batch");
                    outputList = new ArrayList<>();
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

目前,数据流处理一般是线性的,大概像这样:

输入 > 处理 > 输出。

但我的期望是像下面这张图一样:

Data flow diagram

不过有一个问题,当数据到达输出时,它要么需要被排序到正确的顺序,要么它已经处于正确的顺序。

如果您有关于如何实现这个的建议或示例,将不胜感激。

过去我曾使用Future和Callable接口来解决类似此类并行数据流的任务,但不幸的是那段代码没有从单个队列中读取数据,所以在这里帮助很小。

我还应该补充一点,在代码中,batchSize和poisonPill通常在主线程中定义,然后通过变量传递,它们不像在Input线程的代码中那样硬编码,输出检查写入器线程也是如此。我只是在 ~1am 写模型进行实验时有点懒惰。

编辑:我还应该提到,这需要使用Java 8。由于这些版本没有在将运行此程序的环境中安装,因此不能使用Java 9或更高版本的功能。


  1. 您的结论基于哪些数据来判断处理是限制因素?
  2. 如果处理是限制因素,它是否使用了几乎所有可用的CPU?
  3. 您是否可以让读者将顺序号传递给每个线程,使该线程输出其自己的文件并在之后将文件合并?
- arcy
我基于调试器和CPU使用率的数据进行分析。一个核心几乎总是被占满,而其他核心的使用率会保持在约40%左右。那个被占用的核心正在进行处理。此外,调试IO线程会导致读取线程由于读取速度比单个处理器处理速度快而被阻塞。写入线程受限于单个处理线程的输出速度,这并不快。调试显示,输出队列在峰值时从未超过20个项目。 - root
此外,关于在最后重新组装较小的文件,从理论上讲这可能可行,但我真的不想在最后有5700多万个小文件需要重新组装。为每个文件打开一个新阅读器的开销会很荒谬。 - root
如果您认为处理阶段是瓶颈,那么为什么不在处理数据批次 n 时,单个线程加载数据批次 n+1 呢? - DiveIntoML
本质上,我在这里尝试做的是找到一个平衡点,使得处理线程数量足以完全饱和IO线程的工作负载。这意味着IO线程永远不会等待,而且总是在做某些事情。增加比这个平衡点更多的线程将使其成为IO瓶颈,减少则会成为处理瓶颈,因为没有利用所有可用的CPU资源。 - root
显示剩余5条评论
3个回答

2

您可以这样做:

  • 利用可用的处理器核心数量,使用X个线程进行处理,其中X为可用的核心数。
  • 为每个线程分配一个独立的输入队列。
  • 阅读线程按照可预测的循环方式将记录分发到每个线程的输入队列中。
  • 由于输出文件过大无法全部加载至内存中,您需要写入X个输出文件,每个线程对应一个文件,并在每个文件名中包含线程的索引,以便可以通过文件名恢复原始顺序。
  • 处理完成后,您需要合并X个输出文件。按照轮询方式从线程1的文件,线程2的文件等中取出一行。这样可以恢复原始顺序。

此外,由于每个线程都有一个独立的输入队列,因此在读取之间不会发生锁争用(只有在读取和写入之间)。您甚至可以通过批量大于1的方式将内容放入输入队列来优化此过程。

"Original Answer" 的意思是“最初的回答”。


我会考虑创建一个模型来测试它的性能。如果它的表现良好,我将把它标记为被接受的答案。 - root

1
正如Alexei所提出的,您可以创建OrderedTask:
class OrderedTask implements Comparable<OrderedTask> {

    private final Integer index;
    private final String line;

    public OrderedTask(Integer index, String line) {
        this.index = index;
        this.line = line;
    }


    @Override
    public int compareTo(OrderedTask o) {
        return index < o.getIndex() ? -1 : index == o.getIndex() ? 0 : 1;
    }

    public Integer getIndex() {
        return index;
    }

    public String getLine() {
        return line;
    }    
}

作为输出队列,您可以使用自己的基于优先级队列的队列:
class OrderedTaskQueue {

    private final ReentrantLock lock;
    private final Condition waitForOrderedItem;
    private final int maxQueuesize;
    private final PriorityQueue<OrderedTask> backedQueue;

    private int expectedIndex;

    public OrderedTaskQueue(int maxQueueSize, int startIndex) {
        this.maxQueuesize = maxQueueSize;
        this.expectedIndex = startIndex;
        this.backedQueue = new PriorityQueue<>(2 * this.maxQueuesize);

        this.lock = new ReentrantLock();
        this.waitForOrderedItem = this.lock.newCondition();
    }


    public boolean put(OrderedTask item) {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (this.backedQueue.size() >= maxQueuesize && item.getIndex() != expectedIndex) {
                this.waitForOrderedItem.await();
            }

            boolean result = this.backedQueue.add(item);
            this.waitForOrderedItem.signalAll();
            return result;
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }


    public OrderedTask take() {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (this.backedQueue.peek() == null || this.backedQueue.peek().getIndex() != expectedIndex) {
                this.waitForOrderedItem.await();
            }
            OrderedTask result = this.backedQueue.poll();
            expectedIndex++;
            this.waitForOrderedItem.signalAll();
            return result;
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }
}

StartIndex是第一个有序任务的索引,maxQueueSize用于停止处理其他任务(以防填满内存),当我们等待某个较早的任务完成时。它应该是处理线程数的两倍/三倍,以允许可扩展性,并且不会立即停止处理。

然后您应该创建您的任务:

int indexOrder =0;
            while ((line = reader.readLine()) != null) {
                inputQueue.put(new OrderedTask(indexOrder++,line);                    

            }

逐行处理仅因为你的例子而使用。你应该改变OrderedTask以支持批量处理。


我期待着您的性能结果。如果处理批次大小相同且所有批次的计算相似,我认为这个解决方案应该更有效,因为它不会增加编写/合并多个文件的复杂性。锁争用不应该是一个问题,因为您的任务是计算敏感的。 - HPCS
模型可能需要一些时间,因为我不再每天都在工作中进行此项工作,但是我已经在家里花费了相当多的时间。我们会看情况如何,但是一旦完成,我会回复你们所有人。 - root
在测试过程中,我遇到了线程在队列为空时未能正确终止的问题,尽管我已经使用了毒丸逻辑。我正在调查这个问题。 - root
问题已解决。与您的队列无关,毒丸中存在一个轻微的错误,导致相等性检查失败。使用此方法处理一个5 GB的文本文件在第七代i7(没有超线程)上只需3分30秒。不用说,我非常印象深刻。感谢您在此事上的帮助。 - root
没问题,很高兴能帮忙。 - HPCS
显示剩余6条评论

0

为什么不反转流程呢?

  1. 输出调用X批次;
  2. 生成X个Promise/Task(Promise模式),随机调用其中一个处理核心(保留批次号,传递到输入核心);将调用处理程序分批处理成有序列表;
  3. 每个处理核心在输入核心中调用一批次;
  4. 享受吧!

因为要知道批次的数量,我必须将整个文件读入内存。由于内存限制,这不是一个选项,这就是我在输入线程中逐行处理文件的原因,而不是直接将整个文件倒入内存。话虽如此,如果我在队列提供的有限空间的情况下进行操作,它可能会起作用。输入队列通常在任何给定时间最多包含2个批次。但我可以尝试一次处理一个批次。 - root
你不需要知道批次的数量或整个文件来应用该流程。 你已经有了死亡药丸的概念;反过来 :) - Don Quichot

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接