如何在Java代码中将CSV文件分割成多个块并并行读取这些块

11

我有一个非常大的CSV文件(1GB+),它有100,000行。

我需要编写一个Java程序来解析CSV文件中的每一行,以创建HTTP请求的主体并发送出去。

换句话说,我需要发送100,000个HTTP请求,这些请求对应于CSV文件中的行。如果我在单个线程中执行这些操作,将会非常耗时。

我想创建1,000个线程来完成以下任务:i)从CSV文件中读取一行,ii)创建一个HTTP请求,其主体包含读取行的内容,以及iii)发送HTTP请求并接收响应。

这样,我需要将CSV文件分成1,000个块,并且这些块中的行不应相互重叠。

如何最好地进行此类拆分过程?


1
我有一个非常大的CSV文件(1GB+),它有100,000行。对于现今的计算机来说,这并不算大。如果你可以饱和所有的CPU,那么拥有比CPU更多的线程是一个错误。最终,它将受到IO部门的限制,同时向服务器发送大量并发请求也不是很明智,除非你有意尝试DoS攻击。 - bestsss
6个回答

14

同时在多个位置读取单个文件不会让你更快(反而可能会使速度显著降低)。

与其从多个线程读取文件,不如从单个线程读取文件,并将这些行的处理并行化。一个单独的线程应该逐行读取您的CSV文件,并将每一行放入队列中。然后,多个工作线程应从队列中取下一行,解析它,转换为请求,并根据需要并行处理请求。分割工作将由单个线程完成,确保没有遗漏的行或重叠。


能否在读取文件之前进行分割操作,将其分成多个相同大小的块?如果可以,那么在文件被分割后,启动多个线程并行读取块会比单个线程读取整个文件更快,是吗? - JuliaLi
1
@JuliaLi 不是很准确:大文件通常占据磁盘上靠近彼此的多个块。由于磁盘在访问连续块时速度更快,因为无需重新定位磁头,所以当连续读取大文件时,从磁盘读取速度会更快。 - Sergey Kalinichenko

6
你可以创建一个线程,读取CSV文件的每一行并构建一个行列表。当达到某个限制时,例如100行,请将其传递给一个固定大小的线程池以作为请求发送。
我怀疑除非你的服务器有1000个核心,否则使用10-100并发请求可能会更快。

1
这就是我的意思。由于您的应用程序更可能受到I/O限制,基于核心数量的固定公式不会起作用,您必须尝试找出最佳方案。(或编写自适应系统,这可能会过于复杂。) - biziclop
服务器与客户端在同一局域网内,响应速度很快。 - JuliaLi
可以做到,但你有两个问题。你需要在一行上分割数据或将某些行分成两半。找到换行符有点麻烦,但你可以做到。第二个也是主要的问题是,顺序读取文件几乎总是比随机读取文件快。这意味着除非你有大约相同数量的磁盘/轴承作为同时读取的线程(假设你有一个可以支持并发读取的磁盘子系统),否则在许多位置读取文件可能会慢得多。 - Peter Lawrey
是的,从磁盘并行读取的问题也很重要。例如,如果您在普通的Windows桌面上尝试它,很可能会导致巨大的性能下降。其他系统可能会更加包容。 - biziclop
@biziclop 很多情况取决于您所拥有的磁盘子系统类型。在任何台式电脑上,最便宜的磁盘都不是为高工作负载而设计的,尤其是笔记本硬盘。 - Peter Lawrey
显示剩余6条评论

2

当您一次性获取行后,使用单个线程读取CSV文件,然后将该行委托给池中可用的一个Thread,通过构造您的Runnable Task对象并将其传递给Executors's submit(),该任务将异步执行。

 public static void main(String[] args) throws IOException {

      String fName = "C:\\Amit\\abc.csv";
      String thisLine;
      FileInputStream fis = new FileInputStream(fName);
      DataInputStream myInput = new DataInputStream(fis);
      ExecutorService pool=Executors.newFixedThreadPool(1000);
      int count = 0;  // Concurrent request to Server barrier

      while ((thisLine = myInput.readLine()) != null) {
          if (count > 150) {
              try {
                  Thread.sleep(100);
                  count = 0;
              } catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
          }

          pool.submit(new MyTask(thisLine));
          count++;
      }

    }
}

以下是您的任务:

class MyTask implements Runnable {
      private String lLine;
      public MyTask(String line) {
           this.lLine=line;

      }

      public void run() {
          // 1) Create Request  lLine
          // 2) send the HTTP request out and receive response
      }
}

2

1

有一个线程逐行读取文件,对于每一行读取的内容,将任务发布到ExecutorService中以执行每个HTTP请求。

从多个线程读取文件是行不通的,因为要读取第n行,必须先读取所有其他行。(如果您的文件包含固定宽度记录,则理论上可以工作,但CSV不是固定宽度格式。)


当你知道列时,可以推断出行的结尾,这是可行的,但几乎不值得努力。因此,如果有多个磁盘阵列和映射文件,则多个线程将起作用(对于读取部分)。 - bestsss
在读取文件之前,是否可以进行分割操作将其分成相同大小的多个块?如果可以,那么在文件被分割后,启动多个线程并行读取这些块。 - JuliaLi

0

Java 8计划于本月发布,将通过并行流和lambda表达式提供更好的支持。Oracle关于并行流的tutorial可能是一个很好的起点。

需要注意的是过度并行化会有陷阱。对于检索URL的示例,建议并发调用数量较低。过多的并行化不仅会影响带宽和您连接的网站,还会冒着耗尽文件描述符的风险,而在大多数java运行环境中,文件描述符是严格限制的资源。

一些可能对您有所帮助的框架是Netflix的RxJavaAkka。请注意,这些框架并不简单,需要一些学习的努力。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接