快速CSV解析

19

我有一个Java服务器应用程序,它下载CSV文件并解析它。解析可能需要5到45分钟,并且每小时都会发生一次。由于这种方法是应用程序的瓶颈,因此这不是过早的优化。到目前为止的代码:

        client.executeMethod(method);
        InputStream in = method.getResponseBodyAsStream(); // this is http stream

        String line;
        String[] record;

        reader = new BufferedReader(new InputStreamReader(in), 65536);

        try {
            // read the header line
            line = reader.readLine();
            // some code
            while ((line = reader.readLine()) != null) {
                 // more code

                 line = line.replaceAll("\"\"", "\"NULL\"");

                 // Now remove all of the quotes
                 line = line.replaceAll("\"", "");     


                 if (!line.startsWith("ERROR"){
                   //bla bla 
                    continue;
                 }

                 record = line.split(",");
                 //more error handling
                 // build the object and put it in HashMap
         }
         //exceptions handling, closing connection and reader

有没有现成的库可以帮助我加快速度?我能否改进现有的代码?


2
文件有多大?你尝试过对代码进行分析吗?这将为您提供瓶颈和明确的改进方向。如果网络是主要问题,我不会感到惊讶。此外,请查看http://commons.apache.org/sandbox/csv/,而不是自己构建解析器。 - joostschouten
我正在进行性能分析,意识到大部分时间花费在网络连接上。我想首先改进解析,因为在网络环境下需要更改架构。(我的估计是,更快的解析可以将加载时间提高10-15%。) - Lukasz Madon
听起来不错。使用csv解析器,因为它们已经优化过了,而且你肯定会遇到转义和国际化问题,这是你不想担心的。祝你好运。 - joostschouten
当前的方法似乎足够快,每个文件解析需要2秒钟,因此总共解析所有文件所需时间不到1%;/ - Lukasz Madon
请参考以下链接:https://dev59.com/Y1HTa4cB1Zd3GeqPORBa - Raedwald
不知道是否适用于您的领域,但我建议使用Shell脚本进行基本处理(例如在您的代码中使用replaceAll)。SED / AWK实际上是为此类任务而设计的,速度非常快。然后JVM只需要解析预处理的数据。但同样可能不适用于您的情况。 - Jan Groth
9个回答

20

Apache Commons CSV

你看过Apache Commons CSV吗?

使用split的注意事项

请记住,split只返回数据的视图,这意味着只要有对其任何视图的引用,原始的line对象就不会被垃圾回收。也许制作一个防御性的副本会有所帮助?(Java bug report

它也不能可靠地分组包含逗号的转义CSV列


2
String.split() 使用了 String.substring(),而这个方法很久以前就已经不再返回视图了(https://dev59.com/IFsX5IYBdhLWcg3wY-kh)。 - Malt

13

请参考类似的兄弟答案中的评论。 - Basil Bourque

6

5
除了上面提出的建议,我认为您可以尝试使用一些线程和并发来改进您的代码。
以下是简要分析和建议解决方案:
1. 从代码中看,你正在通过网络读取数据(最可能是apache-common-httpclient库)。
2. 您需要确保瓶颈不在网络传输数据中。
3. 一种方法是只将数据转储到某个文件中(不进行解析),然后查看花费了多少时间。这将使您了解实际上花了多少时间进行解析(与当前观察相比)。
4. 现在看一下java.util.concurrent包的使用情况。您可以使用一些链接(12)。
5. 您可以将在for循环中执行的任务放入一个线程中。
6. 使用线程池和并发可以极大地提高性能。

虽然这个解决方案需要一些努力,但最终肯定会对您有所帮助。


如果瓶颈在网络传输上,您应该考虑指定gzip头。 - Xavier Combelle

2

opencsv

你应该看看OpenCSV。我希望他们有性能优化。


1
我们对opencsv的使用体验非常糟糕。我们发现它既慢又有bug。最终浪费了半天时间,不得不全部替换掉。 - Guy
好的...你可能需要添加更多细节,使这些信息更相关。你遇到了什么问题?你使用的是哪个版本?你选择了哪个其他框架?我只是想知道,因为我在不止一个项目中看到它表现得很好。 - Kai
1
主要问题是对于某些行,它返回了错误的字段数量(即在10个字段行上得到了一个2个字段的字符串[])。我从未理解为什么会发生这种情况,但我猜测它与错误的UTF-8解析有关。 我已经用自己的逐行读取和String.split每行(我意识到这里存在内存考虑因素),将其替换掉了,结果运行速度提高了15%-30%。 我使用的是opencs v2.3(java)。 - Guy

1

有一些 CSV 解析器的基准测试项目,稍有些晚了。您的选择将取决于确切的用例(即原始数据 vs 数据绑定等)。


1

Quirk-CSV

{{链接1:Quirk-CSV}}


新出现的东西。它使用Java注释,并构建在Apache-csv上,这是一种更快的CSV解析库之一。

如果您想要重复使用CSVProcessor,此库也是线程安全的,而且应该这样做。

示例:

Pojo

@CSVReadComponent(type = CSVType.NAMED)
@CSVWriteComponent(type = CSVType.ORDER)
public class Pojo {
    @CSVWriteBinding(order = 0)
    private String name;

    @CSVWriteBinding(order = 1)
    @CSVReadBinding(header = "age")
    private Integer age;

    @CSVWriteBinding(order = 2)
    @CSVReadBinding(header = "money")
    private Double money;

    @CSVReadBinding(header = "name")
    public void setA(String name) {
        this.name = name;
    }

    @Override
    public String toString() {

    return "Name: " + name + System.lineSeparator() + "\tAge: " + age + System.lineSeparator() + "\tMoney: "
            + money;
}}

主函数

import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.*;


public class SimpleMain {
public static void main(String[] args) {
    String csv = "name,age,money" + System.lineSeparator() + "Michael Williams,34,39332.15";

    CSVProcessor processor = new CSVProcessor(Pojo.class);
    List<Pojo> list = new ArrayList<>();
    try {
        list.addAll(processor.parse(new StringReader(csv)));
        list.forEach(System.out::println);

        System.out.println();

        StringWriter sw = new StringWriter();
        processor.write(list, sw);
        System.out.println(sw.toString());
    } catch (IOException e) {
    }


}}

由于这是建立在apache-csv之上的,因此您可以使用强大的工具CSVFormat。假设csv的分隔符是管道(|)而不是逗号(,),您可以执行以下操作:

CSVFormat csvFormat = CSVFormat.DEFAULT.withDelimiter('|');
List<Pojo> list = processor.parse(new StringReader(csv), csvFormat);

另一个好处是继承也被考虑在内。

关于处理读写非基本数据类型的其他示例,请参见。


0

Apache Commons CSV ➙ 百万行12秒

有没有现成的库可以帮助我加速处理?

是的,Apache Commons CSV 项目在我的经验中非常有效。

这里有一个使用 Apache Commons CSV 库写入和读取24列行的示例应用程序:一个整数顺序号,一个 Instant,其余都是随机的 UUID 对象。

对于10,000行,写入和读取各需要约半秒钟。读取包括重构 IntegerInstantUUID 对象。

我的示例代码允许您切换对象的重组。我使用 MacBook Pro(Retina,15 英寸,2013 年底)上的 Java 12,2.3 GHz Intel Core i7,16 GB 1600 MHz DDR3,Apple 内置 SSD 运行了一百万行。这将创建一个 850 兆字节的文件。

对于一百万行,读取需要十秒钟,解析需要两秒钟:

  • 写入:PT25.994816S
  • 仅读取:PT10.353912S
  • 读取和解析:PT12.219364S

源代码是单个 .java 文件。它有一个写入方法和一个 read 方法。这两种方法都从一个 main 方法中调用。

我通过调用Files.newBufferedReader打开了一个BufferedReader
package work.basil.example;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVRecord;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;

public class CsvReadingWritingDemo
{
    public static void main ( String[] args )
    {
        CsvReadingWritingDemo app = new CsvReadingWritingDemo();
        app.write();
        app.read();
    }

    private void write ()
    {
        Instant start = Instant.now();
        int limit = 1_000_000; // 10_000  100_000  1_000_000
        Path path = Paths.get( "/Users/basilbourque/IdeaProjects/Demo/csv.txt" );
        try (
                Writer writer = Files.newBufferedWriter( path, StandardCharsets.UTF_8 );
                CSVPrinter printer = new CSVPrinter( writer , CSVFormat.RFC4180 );
        )
        {
            printer.printRecord( "id" , "instant" , "uuid_01" , "uuid_02" , "uuid_03" , "uuid_04" , "uuid_05" , "uuid_06" , "uuid_07" , "uuid_08" , "uuid_09" , "uuid_10" , "uuid_11" , "uuid_12" , "uuid_13" , "uuid_14" , "uuid_15" , "uuid_16" , "uuid_17" , "uuid_18" , "uuid_19" , "uuid_20" , "uuid_21" , "uuid_22" );
            for ( int i = 1 ; i <= limit ; i++ )
            {
                printer.printRecord( i , Instant.now() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() , UUID.randomUUID() );
            }
        } catch ( IOException ex )
        {
            ex.printStackTrace();
        }
        Instant stop = Instant.now();
        Duration d = Duration.between( start , stop );
        System.out.println( "Wrote CSV for limit: " + limit );
        System.out.println( "Elapsed: " + d );
    }

    private void read ()
    {
        Instant start = Instant.now();

        int count = 0;
        Path path = Paths.get( "/Users/basilbourque/IdeaProjects/Demo/csv.txt" );
        try (
                Reader reader = Files.newBufferedReader( path , StandardCharsets.UTF_8) ;
        )
        {
            CSVFormat format = CSVFormat.RFC4180.withFirstRecordAsHeader();
            CSVParser parser = CSVParser.parse( reader , format );
            for ( CSVRecord csvRecord : parser )
            {
                if ( true ) // Toggle parsing of the string data into objects. Turn off (`false`) to see strictly the time taken by Apache Commons CSV to read & parse the lines. Turn on (`true`) to get a feel for real-world load.
                {
                    Integer id = Integer.valueOf( csvRecord.get( 0 ) ); // Annoying zero-based index counting.
                    Instant instant = Instant.parse( csvRecord.get( 1 ) );
                    for ( int i = 3 - 1 ; i <= 22 - 1 ; i++ ) // Subtract one for annoying zero-based index counting.
                    {
                        UUID uuid = UUID.fromString( csvRecord.get( i ) );
                    }
                }
                count++;
                if ( count % 1_000 == 0 )  // Every so often, report progress.
                {
                    //System.out.println( "# " + count );
                }
            }
        } catch ( IOException e )
        {
            e.printStackTrace();
        }

        Instant stop = Instant.now();
        Duration d = Duration.between( start , stop );
        System.out.println( "Read CSV for count: " + count );
        System.out.println( "Elapsed: " + d );
    }
}

0

为了速度,您不想使用replaceAll,也不想使用正则表达式。在这种关键情况下,您基本上总是希望通过逐个字符解析器创建状态机。我已经将整个过程转换为可迭代函数。它还接受流并解析它,而不保存或缓存它。因此,如果您可以尽早中止,那么也很可能会顺利进行。它应该也足够短且编码良好,以便清楚地说明它的工作原理。

public static Iterable<String[]> parseCSV(final InputStream stream) throws IOException {
    return new Iterable<String[]>() {
        @Override
        public Iterator<String[]> iterator() {
            return new Iterator<String[]>() {
                static final int UNCALCULATED = 0;
                static final int READY = 1;
                static final int FINISHED = 2;
                int state = UNCALCULATED;
                ArrayList<String> value_list = new ArrayList<>();
                StringBuilder sb = new StringBuilder();
                String[] return_value;

                public void end() {
                    end_part();
                    return_value = new String[value_list.size()];
                    value_list.toArray(return_value);
                    value_list.clear();
                }

                public void end_part() {
                    value_list.add(sb.toString());
                    sb.setLength(0);
                }

                public void append(int ch) {
                    sb.append((char) ch);
                }

                public void calculate() throws IOException {
                    boolean inquote = false;
                    while (true) {
                        int ch = stream.read();
                        switch (ch) {
                            default: //regular character.
                                append(ch);
                                break;
                            case -1: //read has reached the end.
                                if ((sb.length() == 0) && (value_list.isEmpty())) {
                                    state = FINISHED;
                                } else {
                                    end();
                                    state = READY;
                                }
                                return;
                            case '\r':
                            case '\n': //end of line.
                                if (inquote) {
                                    append(ch);
                                } else {
                                    end();
                                    state = READY;
                                    return;
                                }
                                break;
                            case ',': //comma
                                if (inquote) {
                                    append(ch);
                                } else {
                                    end_part();
                                    break;
                                }
                                break;
                            case '"': //quote.
                                inquote = !inquote;
                                break;
                        }
                    }
                }

                @Override
                public boolean hasNext() {
                    if (state == UNCALCULATED) {
                        try {
                            calculate();
                        } catch (IOException ex) {
                        }
                    }
                    return state == READY;
                }

                @Override
                public String[] next() {
                    if (state == UNCALCULATED) {
                        try {
                            calculate();
                        } catch (IOException ex) {
                        }
                    }
                    state = UNCALCULATED;
                    return return_value;
                }
            };
        }
    };
}

你通常会像这样有帮助地处理它:

for (String[] csv : parseCSV(stream)) {
    //<deal with parsed csv data>
}

那个API的美在于它那看起来有点神秘的函数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接