大规模文件读取的多线程实现

16

我仍在努力理解Java并发编程的工作原理。我知道(如果你使用OO Java 5并发模型),你需要使用run()call()方法来实现一个TaskCallable,并且最好将该实现方法的尽可能多的部分并行化。

但是我还是不理解Java并发编程中固有的某些东西:

  • Taskrun()方法如何被分配正确数量的并发工作?

以具体的例子来说,如果我有一个I/O绑定的readMobyDick()方法,它会从本地系统的文件中将Herman Melville的全部内容读入内存。假设我希望这个readMobyDick()方法是并发的,并由3个线程处理,其中:

  • 线程#1将书的前1/3部分读入内存
  • 线程#2将书的中间1/3部分读入内存
  • 线程#3将书的最后1/3部分读入内存

我需要将Moby Dick分成三个文件,并将它们分别传递给它们自己的任务,还是直接在实现的run()方法中调用readMobyDick(),并且某种方式下Executor知道如何在线程之间分配工作。

我是一位视觉学习者,所以任何正确的处理方法的代码示例都将不胜感激!谢谢!


1
好的方法名!当我尝试阅读《白鲸记》时,我发现我也必须同时(与其他书籍交替)进行阅读。;-) - Martin Wilson
1
那么...伟大的白鲸就是.NET,对吗? - Martin James
4个回答

22
您可能不小心选择了最糟糕的并行活动示例!从单个机械硬盘并行读取实际上比使用单个线程读取更慢,因为您实际上是在将机械臂反弹到磁盘的不同部分,以便每个线程轮流运行。这最好作为单线程活动进行。
让我们看另一个例子,它类似于您的示例,但实际上可以提供一些好处:假设我想搜索一个巨大的单词列表中某个单词的出现次数(该列表甚至可以来自磁盘文件,但像我说的那样,由单个线程读取)。 假设我可以像您的示例一样使用3个线程,每个线程在巨大单词列表的1/3上进行搜索,并保持搜索到的单词数量的本地计数器。
在这种情况下,您需要将列表分成3部分,将每个部分传递给不同类型实现Runnable的对象,然后在run方法中实现搜索逻辑。
运行时本身不知道如何进行分区或任何类似的操作,您必须自己指定。有许多其他分区策略,每种策略都有其优点和缺点,但是现在我们可以坚持静态分区。
让我们看一些代码:
class SearchTask implements Runnable {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public void run() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
     }

     public int getCounter() { return localCounter; }
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
SearchTask task1 = new SearchTask(0, 10000, words, "John");
SearchTask task2 = new SearchTask(10000, 20000, words, "John");
SearchTask task3 = new SearchTask(20000, 30000, words, "John");

// create threads for each task
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
Thread t3 = new Thread(task3);

// start threads
t1.start();
t2.start();
t3.start();

// wait for threads to finish
t1.join();
t2.join();
t3.join();

// collect results
int counter = 0;
counter += task1.getCounter();
counter += task2.getCounter();
counter += task3.getCounter();

这应该可以很好地工作。请注意,在实际情况下,您将构建更通用的分区方案。如果您希望返回结果,您可以使用 ExecutorService 并实现 Callable,而不是实现 Runnable

因此,以下是使用更高级构造的替代示例:

class SearchTask implements Callable<Integer> {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public Integer call() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
         return localCounter;
     }        
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
List<Callable> tasks = new ArrayList<Callable>();
tasks.add(new SearchTask(0, 10000, words, "John"));
tasks.add(new SearchTask(10000, 20000, words, "John"));
tasks.add(new SearchTask(20000, 30000, words, "John"));

// create thread pool and start tasks
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future> results = exec.invokeAll(tasks);

// wait for tasks to finish and collect results
int counter = 0;
for(Future f: results) {
    counter += f.get();
}

1
那么一个适合使用多线程的任务的好例子是什么呢?我真的不关心从磁盘读取文件,我关心的是看到一个活生生的、实际的示例,展示如何将工作分块并提供给任务。 (code 代码) - IAmYourFaja
@herpylderp:现在应该已经完成了。 - Tudor
非常好 - 谢谢 @Tudo - 但现在我不太明白线程池如何动态地分块工作(与静态分区相对)。我猜我很快会发布一个类似的(与池相关的)问题...再次感谢。 - IAmYourFaja
1
@herpylderp:线程池(如我上面使用的那个)在内部存储任务队列和一堆线程。这些线程不断循环查看队列中是否有可用任务,如果有,则取出一个并执行它,然后返回继续取更多任务。用户只需使用“submit”将任务放入队列中,线程会按照我解释的方式执行它们。 - Tudor
我正在执行一个测试,测试同一用例 - 从多个线程中读取单个文件。我发现,如果底层存储是SATA磁盘驱动器,则使用多个线程可以提高性能,而如果是SAS驱动器,则会提高性能。这是因为SAS中使用的点对点技术,还是我的测试有什么问题? - Andy Dufresne
显示剩余5条评论

2
你需要实现一个带有run()或call()方法的任务或可调用对象,尽可能地并行化已实现方法的大部分内容。
一个"Task"代表了一个离散的工作单元。例如将文件加载到内存中就是一个离散的工作单元,因此可以将这个任务委派给一个后台线程来运行。它是一个离散的工作单元,因为它没有其他依赖项,只需完成其工作(加载文件)并具有离散的边界。
你想要的是将其进一步分解成任务。即一个线程加载文件的1/3,另一个线程加载2/3等。
如果你能将任务进一步细分,那么从定义上来说它就不再是一个任务了。因此,加载文件本身就是一个单独的任务。
举个例子:假设你有一个GUI,你需要从5个不同的文件中呈现数据给用户。为了呈现这些数据,你还需要准备一些数据结构来处理实际数据。所有这些都是单独的任务。例如,加载文件是5个不同的任务,因此可以由5个不同的线程完成。数据结构的准备可以由另一个线程完成。GUI当然在另一个线程中运行。所有这些都可以同时进行。

2
你选择的例子不太恰当,正如 Tudor 所指出的那样。磁盘硬件受到移动碟片和磁头的物理限制,最有效的读取实现方式是按顺序读取每个块,这减少了移动磁头或等待磁盘对齐的需要。
话虽如此,一些操作系统并不总是将数据连续存储在磁盘上,对于那些还记得的人来说,如果你的操作系统/文件系统没有为你完成这项工作,碎片整理可以提高磁盘性能。
既然你提到想要一个可以受益的程序,让我建议一个简单的程序,矩阵加法。
假设你为每个核心创建了一个线程,你可以轻松地将任何两个要相加的矩阵分成 N 行(每个线程一个)。矩阵加法(如果你还记得)的工作方式如下:
A + B = C

或者

[ a11, a12, a13 ]   [ b11, b12, b13]  =  [ (a11+b11), (a12+b12), (a13+c13) ]
[ a21, a22, a23 ] + [ b21, b22, b23]  =  [ (a21+b21), (a22+b22), (a23+c23) ]
[ a31, a32, a33 ]   [ b31, b32, b33]  =  [ (a31+b31), (a32+b32), (a33+c33) ]

为了将此分配到N个线程中,我们只需要获取行数并模除线程数以获得要添加的“线程ID”即可。

matrix with 20 rows across 3 threads
row % 3 == 0 (for rows 0, 3, 6,  9, 12, 15, and 18)
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19)
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17)
// row 20 doesn't exist, because we number rows from 0

现在,每个线程“知道”它应该处理哪些行,因为结果不会跨越到其他线程的计算领域,所以每行的结果可以轻松计算。
现在只需要一个“结果”数据结构来跟踪值何时被计算,当最后一个值被设置时,计算就完成了。在这个“虚假”的示例中,使用两个线程计算矩阵加法结果大约需要一半的时间。
// the following assumes that threads don't get rescheduled to different cores for 
// illustrative purposes only.  Real Threads are scheduled across cores due to
// availability and attempts to prevent unnecessary core migration of a running thread.
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3)
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1)
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3)
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1)

多线程可以解决更复杂的问题,不同的问题需要不同的技术来解决。我故意选择了一个最简单的例子。


-1

如果您的系统支持高吞吐量 I/O,以下是如何实现的方法:

当可用高吞吐量(3GB/s)文件系统时,如何在Java中使用多个线程读取文件

这里是使用多个线程读取单个文件的解决方案。

将文件分成 N 份,每个线程读取一个块,然后按顺序合并它们。注意跨块边界的行。这是用户 slaks 建议的基本思路。

针对单个20 GB文件的多线程实现的基准测试:

1个线程:50秒:400 MB/s

2个线程:30秒:666 MB/s

4个线程:20秒:1GB/s

8个线程:60秒:333 MB/s

相当于 Java7 的 readAllLines() 方法:400秒:50 MB/s

注意:这可能仅适用于设计支持高吞吐量I/O的系统,而不适用于普通个人电脑

以下是代码的基本要点,如需完整详情,请点击链接

public class FileRead implements Runnable
{

private FileChannel _channel;
private long _startLocation;
private int _size;
int _sequence_number;

public FileRead(long loc, int size, FileChannel chnl, int sequence)
{
    _startLocation = loc;
    _size = size;
    _channel = chnl;
    _sequence_number = sequence;
}

@Override
public void run()
{
        System.out.println("Reading the channel: " + _startLocation + ":" + _size);

        //allocate memory
        ByteBuffer buff = ByteBuffer.allocate(_size);

        //Read file chunk to RAM
        _channel.read(buff, _startLocation);

        //chunk to String
        String string_chunk = new String(buff.array(), Charset.forName("UTF-8"));

        System.out.println("Done Reading the channel: " + _startLocation + ":" + _size);

}

//args[0] is path to read file
//args[1] is the size of thread pool; Need to try different values to fing sweet spot
public static void main(String[] args) throws Exception
{
    FileInputStream fileInputStream = new FileInputStream(args[0]);
    FileChannel channel = fileInputStream.getChannel();
    long remaining_size = channel.size(); //get the total number of bytes in the file
    long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads


    //thread pool
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1]));

    long start_loc = 0;//file pointer
    int i = 0; //loop counter
    while (remaining_size >= chunk_size)
    {
        //launches a new thread
        executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i));
        remaining_size = remaining_size - chunk_size;
        start_loc = start_loc + chunk_size;
        i++;
    }

    //load the last remaining piece
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i));

    //Tear Down

}

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接