CsvHelper - 异步读取流

6
我有一个服务,它接收包含CSV数据的输入流,需要批量插入到数据库中,我的应用程序尽可能使用async/await。
处理过程是:使用CsvHelper的CsvParser解析流,将每行添加到DataTable中,使用SqlBulkCopy将DataTable复制到数据库中。
数据可能是任意大小,因此我希望避免一次性将所有数据读入内存 - 很明显,最终我会在DataTable中拥有所有数据,因此在内存中实际上会有2个副本。
我想尽可能异步地完成所有这些操作,但CsvHelper没有任何异步方法,因此我想到了以下解决方法:
using (var inputStreamReader = new StreamReader(inputStream))
{
    while (!inputStreamReader.EndOfStream)
    {
        // Read line from the input stream
        string line = await inputStreamReader.ReadLineAsync();

        using (var memoryStream = new MemoryStream())
        using (var streamWriter = new StreamWriter(memoryStream))
        using (var memoryStreamReader = new StreamReader(memoryStream))
        using (var csvParser = new CsvParser(memoryStreamReader))
        {
            await streamWriter.WriteLineAsync(line);
            await streamWriter.FlushAsync();

            memoryStream.Position = 0;

            // Loop through all the rows (should only be one as we only read a single line...)
            while (true)
            {
                var row = csvParser.Read();

                // No more rows to process
                if (row == null)
                {
                    break;
                }

                // Add row to DataTable
            }
        }
    }
}

这个解决方案有什么问题吗?它是否必要?我看到 CsvHelper 的开发人员明确没有添加异步功能(https://github.com/JoshClose/CsvHelper/issues/202),但我不太明白他们没有这样做的原因。
编辑:我刚意识到,这个解决方案在列包含换行符的情况下也不起作用 :( 我想我只能将整个输入流复制到 MemoryStream 或其他地方。
编辑2:更多信息。
这是一个异步方法,在库中我尝试实现全程异步。它可能会被 MVC 控制器使用(如果我只是想从 UI 线程卸载它,我会使用 Task.Run())。大部分时间,该方法将等待外部来源,如数据库 / DFS,并且我希望在等待期间释放线程。
CsvParser.Read()会阻塞,即使阻塞的是读取流(例如,如果我试图读取的数据位于世界另一端的服务器上),而如果CsvHelper实现了一个使用TextReader.ReadAsync()的异步方法,那么我就不会被阻塞等待来自迪拜的数据。据我所知,我并没有要求在同步方法周围添加异步包装器。
编辑3:来自未来的更新!实际上,异步功能已经在2017年添加到CsvHelper中。我希望我当时工作的公司有升级到更新版本!

讨论串中提到的文章总体上非常好,关于何时应该期望异步方法,请参考:http://blogs.msdn.com/b/pfxteam/archive/2012/03/24/10287244.aspx。 - Erik Forbes
我明白你的想法,但考虑到它从任何地方读取流,所以我会认为 Read() 方法更可能是 I/O 绑定的而不是 CPU 绑定的。 - Lykaios
这与它如何绑定关系较小,更多的是关于您对异步操作的期望。您只是想从UI线程中卸载它吗? - Erik Forbes
@ErikForbes 我已经在问题中添加了更多信息 :) - Lykaios
2个回答

4

Eric Lippert使用在餐厅做饭的比喻解释了async-await的有用性。根据他的解释,如果您的线程没有其他事情要做,异步执行某些操作是没有用的。

此外,请注意,当您的线程正在执行某些操作时,它不能执行其他操作。只有当您的线程在等待某些东西时,它才能做其他事情。您的进程中等待的其中一件事就是读取文件。当线程逐行读取文件时,它必须多次等待读取行。在此期间,它可以做其他事情,例如解析读取的CSV数据并将解析后的数据发送到目标位置。

解析数据不是一个需要线程等待其他进程完成的过程,就像读取文件或向数据库发送数据时一样。这就是为什么没有解析过程的异步版本。普通的async-await无法帮助保持线程繁忙,因为在解析过程中没有需要等待的内容,所以在解析过程中,您的线程没有时间做其他事情。

你当然可以使用 Task.Run(() => ParseReadData(...)) 将解析过程转换为可等待的任务,并等待该任务完成,但在 Eric Lippert 餐厅的类比中,这就像是解冻一名厨师来做工作,而你坐在柜台后面什么都不做。
但是,如果你的线程在读取 CSV 数据时有一些有意义的事情要做,比如响应用户输入,那么启动单独的任务进行解析可能会很有用。
如果你的完整读取 - 解析 - 更新数据库过程不需要与用户交互,但需要在处理过程中使你的线程空闲以执行其他任务,则考虑将整个过程放入单独的任务中,并启动该任务而不等待它。在这种情况下,你只需使用接口线程启动另一个任务,而接口线程则可自由地执行其他任务。启动这个新任务相对于你的整个过程的总时间来说是一个相对较小的成本。
再次强调:如果你的线程没有其他事情可做,请让这个线程进行处理,不要启动其他任务来完成它。

我并不完全同意这个观点。你永远不应该隐式地阻塞一个线程,假设它没有其他事情要做。特别是调用者等待函数启动的任务时,他会期望他的线程在任务(例如读取文件)启动后继续运行。该线程可能是消息处理循环,如UI线程。在读取文件时,它可以处理另一个消息,但在阻塞读取时则不能。 - Johan t Hart

2

以下是一篇关于如何在同步方法上暴露异步包装器的好文章,以及为什么CsvHelper没有这样做的原因。http://blogs.msdn.com/b/pfxteam/archive/2012/03/24/10287244.aspx

如果你不想阻塞UI线程,请在后台线程上运行处理。

CsvHelper拉取一个数据缓存区,缓存区的大小可以更改。如果您的服务器在世界的另一端,它将缓冲一些数据,然后读取它。很可能需要多次读取才能使用缓存区。

CsvHelper还会产生记录,所以如果您没有实际获取到一行数据,那么也不会有任何内容被读取。如果您只读取了几行数据,那么只有这部分文件(实际上是缓存区的大小)被读取。

如果您担心内存问题,有几个简单的解决方案。

  1. 缓冲数据。您可以每次批量复制100或1000行,而不是整个文件。重复此操作,直到文件完成。
  2. 使用FileStream。如果出于某种原因需要一次性读取整个文件,请使用FileStream并将整个文件写入磁盘。这可能会更慢,但不会使用大量内存。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接