目前,瓶颈在处理部分。因此,我想将更多的处理线程添加到其中。然而,这会创建一种情况,即我有多个线程访问同一个BlockingQueue,并且它们的结果因此无法保持输入文件的顺序。
我正在寻找的功能示例看起来像这样: 输入文件:1、2、3、4、5 输出文件:^相同。不是2、1、4、3、5或任何其他组合。
我编写了一个虚拟程序,其功能与实际程序相同,只是没有处理部分(由于处理类包含机密信息,无法提供实际程序)。我还应该提到,所有类(Input、Processing和Output)都是Main类中的内部类,该类包含initialize()方法和主线程代码中提到的类级变量。
static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;
private void initialise() throws IOException {
BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million.
String inputFileName = "test.txt";
String outputFileName = "outputTest.txt";
BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));
Thread T1 = new Thread(new Input(reader, inputQueue));
Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
Thread T3 = new Thread(new Output(writer, outputQueue));
T1.start();
T2.start();
T3.start();
while (!writerFinished) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
reader.close();
writer.close();
System.out.println("Exited.");
}
输入线程:(请原谅注释的调试代码,用于确保读取线程正常执行)。
class Input implements Runnable {
BufferedReader reader;
BlockingQueue<String> inputQueue;
Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
this.reader = reader;
this.inputQueue = inputQueue;
}
@Override
public void run() {
String poisonPill = "ChH92PU2KYkZUBR";
String line;
//int linesRead = 0;
try {
while ((line = reader.readLine()) != null) {
inputQueue.put(line);
//linesRead++;
/*
if (linesRead == 500_000) {
//batchesRead += 1;
//System.out.println("Batch read");
linesRead = 0;
}
*/
}
inputQueue.put(poisonPill);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
readerFinished = true;
}
}
处理线程:(通常情况下,它实际上会对行进行某些操作,但是为了模拟目的,我只是立即推送到输出线程)。如果必要,我们可以通过让线程每行休眠一小段时间来模拟其执行一些工作。
class Processing implements Runnable {
BlockingQueue<String> inputQueue;
BlockingQueue<String> outputQueue;
Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run() {
while (true) {
try {
if (inputQueue.isEmpty() && readerFinished) {
break;
}
String line = inputQueue.take();
outputQueue.put(line);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出线程:
class Output implements Runnable {
BufferedWriter writer;
BlockingQueue<String> outputQueue;
Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
this.writer = writer;
this.outputQueue = outputQueue;
}
@Override
public void run() {
String line;
ArrayList<String> outputList = new ArrayList<>();
while (true) {
try {
line = outputQueue.take();
if (line.equals("ChH92PU2KYkZUBR")) {
for (String outputLine : outputList) {
writer.write(outputLine);
}
System.out.println("Writer finished - executing termination");
writerFinished = true;
break;
}
line += "\n";
outputList.add(line);
if (outputList.size() == 500_000) {
for (String outputLine : outputList) {
writer.write(outputLine);
}
System.out.println("Writer wrote batch");
outputList = new ArrayList<>();
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
目前,数据流处理一般是线性的,大概像这样:
输入 > 处理 > 输出。
但我的期望是像下面这张图一样:
不过有一个问题,当数据到达输出时,它要么需要被排序到正确的顺序,要么它已经处于正确的顺序。
如果您有关于如何实现这个的建议或示例,将不胜感激。
过去我曾使用Future和Callable接口来解决类似此类并行数据流的任务,但不幸的是那段代码没有从单个队列中读取数据,所以在这里帮助很小。
我还应该补充一点,在代码中,batchSize和poisonPill通常在主线程中定义,然后通过变量传递,它们不像在Input线程的代码中那样硬编码,输出检查写入器线程也是如此。我只是在 ~1am 写模型进行实验时有点懒惰。
编辑:我还应该提到,这需要使用Java 8。由于这些版本没有在将运行此程序的环境中安装,因此不能使用Java 9或更高版本的功能。