管道中的csv-parse错误处理

3
作为我正在构建的应用程序的一部分,我使用 csv-parse 读取和操作大型 (约5.5GB,800万行) 的 csv 文件。我已经让这个过程相对顺利地运行了起来,但是遇到了一个问题 - 如何捕获由不一致的列数引发的错误。
我使用管道函数,因为它与应用程序的其余部分很好地配合使用,但是我的问题是,如何将解析器抛出的错误重定向到日志,并允许进程继续进行?
我知道可以使用 "relax_column_count" 选项跳过具有不一致列数的记录,该选项几乎足够满足需求。但是由于数据质量评估的目的,我需要记录这些记录,以便在以后可以查看导致错误列数的原因(该过程具有许多潜在故障点)。
此外,我知道解决这个问题最简单的方法是在此过程上游清理数据,但不幸的是,我无法控制数据源。
在这个示例集中,例如,我会遇到以下错误:
事件.js:141 抛出 er; // 未处理的“错误”事件 错误:第(行号)行的列数与标题不匹配

样例数据(实际上并不是我的数据,但演示了同样的问题):

year, month, value1, value2
2012, 10, A, B
2012, 11, B, C,
2012, 11, C, D,
2013, 11, D, E,
2013, 11, E, F,
2013, 11, F, 
2013, 11, G, G,
2013, 1, H, H,
2013, 11, I, I,
2013, 12, J, J,
2014, 11, K, K,
2014, 4, L, L,
2014, 11, M, M,
2014, 5, N, 
2014, 11, O, N,
2014, 6, P, O,
2015, 11, Q, P,
2015, 11, R, Q,
2015, 11, S, R,
2015, 11, T, S, 

代码:
const fs = require('fs');
const parse = require('csv-parse');
const stringify = require('csv-stringify');
const transform = require('stream-transform');

const paths = {
    input: './sample.csv',
    output: './output.csv',
    error: './errors.csv',
}

var input  = fs.createReadStream(paths.input);
var output = fs.createWriteStream(paths.output);
var error  = fs.createWriteStream(paths.error);

var stringifier = stringify({
    header: true,
    quotedString: true,
});
var parser = parse({
    relax: true,
    delimiter: ',', 
    columns: true, 
    //relax_column_count: true,
})
var transformer = transform((record, callback) => {
    callback(null, record);
}, {parallel: 10});

input.pipe(parser).pipe(transformer).pipe(stringifier).pipe(output);

你有什么想法?


这似乎很有前途,但仍在努力确定如何 a) 在捕获错误之前清除部分写入的记录,并 b) 重新安排流: .on('error', (error) => { console.log(error)}) - Adam Bethke
1个回答

1
我解决了这个问题。它没有使用管道 API,而是使用 CSV 包的回调 API。这种解决方案不太优雅,但它是功能性的,并且具有显式错误处理的好处,这不会因为列数不一致而使进程停滞不前。
该过程逐行读取文件,将行与 settings 对象中预期字段列表 (settings.mapping) 进行解析,然后转换、字符串化并写入新的 csv 中的结果行。
我设置它记录由于列数与标题不一致而导致的错误到文件中,并附带一些额外数据(执行日期时间、行号和完整行作为诊断信息的文本)。我没有设置记录其他类型的错误,因为它们都是 csv 结构错误的下游,但你可以修改代码以记录这些错误。 (你也可以将它们写入 JSON 或 MySQL 数据库,但一步一步来)。
好消息是,使用这种方法与直接方法相比没有太大的性能损失。我没有进行正式的性能测试,但在一个60MB的文件中,两种方法的性能大致相同(假设该文件没有不一致的行)。下一步明确的目标是研究如何将写入磁盘的操作捆绑在一起以减少I/O。

我仍然非常想知道是否有更好的方法来解决这个问题,所以如果你有任何想法,请务必发表答案!同时,我认为我应该发布这个可用的答案,以便帮助其他遇到类似格式不一致的数据源的人。


应该给予荣誉,尤其是两个问题/答案:

  • 在Node.js中逐行读取解析大型日志文件
    • 这个答案改编了一些核心代码,从将文件分割成逐行读取的答案中提取出来,这样可以防止csv-parse组件在失败的行处关闭(以分割文件的代码开销为代价)。我实际上非常推荐像那篇文章中所做的那样使用iconv-lite,但它与最小可重现示例无关,因此我在此贴子中将其删除。
  • 使用node.js流进行错误处理
    • 这在更好地理解管道的潜力和限制方面非常有帮助。看起来在解析器的出站管道上理论上有一种方法可以放置基本上相当于管道分裂器的东西,但考虑到我目前的时间限制和与异步进程相关的挑战,后者在流终止方面会非常不可预测,我改用回调API。

示例代码:

'use strict'
// Dependencies
const es     = require('event-stream');
const fs     = require('fs');
const parse = require('csv-parse');
const stringify = require('csv-stringify');
const transform = require('stream-transform');

// Reference objects
const paths = {
    input: 'path to input.csv',
    output: 'path to output.csv',
    error: 'path to error output.csv',
}
const settings = {
    mapping: {
        // Each field is an object with the field name as the key
        // and can have additional properties for use in the transform 
        // component of this process
        // Example
        'year' : {
            import: true,
        }
    }
}

const metadata = {
    records: 0,
    error: 0
}

// Set up streams
var input  = fs.createReadStream(paths.input);
var errors  = fs.createWriteStream(paths.error,  {flags: 'ax'});
var output = fs.createWriteStream(paths.output, {flags: 'ax'});

// Begin process (can be refactored into function, but simplified here)
input
  .pipe(es.split()) // split based on row, assumes \n row endings
  .pipe(es.mapSync(line => { // synchronously process each line

    // Remove headers, specified through settings
    if (metadata.records === 0) return metadata.records++;
    var id = metadata.records;

    // Parse csv by row 
    parse(line, {
        relax: true,
        delimiter: ',', 
        columns: Object.keys(settings.mapping),
    }, (error, record) => {

        // Write inconsistent column error 
        if (error) {
            metadata.error++;
            errors.write(
                new Date() + ', Inconsistent Columns, ' + 
                 id + ', `' +  
                 line + '`\n'
            );
        }

    // Apply transform / reduce
    transform(record, (record) => {
        // Do stuff to record
        return record;
    }, (error, record) => {

        // Throw tranform errors
        if (error) {
            throw error;
        }

    // Stringify results and write to new csv
    stringify(record, {
           header: false,
           quotedString: true,
    }, (error, record) => {

        // Throw stringify errors
        if (error) {
            console.log(error);
        }

        // Write record to new csv file
        output.write(record);
    });
    });
    })

    // Increment record count
    metadata.records++;

  }))  
  .on('end', () => {
    metadata.records--;
    console.log(metadata)
  })    

回顾一下,我要注意的是这是一个O(N)解决方案,这使得它在大规模情况下非常糟糕。回想起来,我希望早些时候考虑其他技术而不是尝试实现纯JS解决方案。 - Adam Bethke

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接