使用Cassava将CSV加载到内存中

7
我正在尝试使用Cassava将CSV文件加载到内存中作为向量的向量。我的程序确实可以工作,但对于一个50MB的CSV文件使用了大量的内存,我不明白为什么。
我知道使用Data.Csv.Streaming处理大文件应该更好,但我认为50MB仍然可以。我尝试了Data.Csv和Data.Csv.Streaming,并使用GitHub项目页面上更多或更少规范的示例,我还尝试实现自己的解析器,输出向量的向量(我基于attoparsec-csv https://hackage.haskell.org/package/attoparsec-csv编写代码),所有这些解决方案都使用约2000MB的内存!我确定我做错了什么。正确的做法是什么?
我的最终目标是将数据完全加载到内存中以供以后进行进一步处理。例如,我可以将数据拆分成有趣的矩阵,并使用Hmatrix处理它们。
以下是我尝试使用Cassava的两个程序:
1 / 使用Data.Csv
import qualified Data.ByteString.Lazy as BL
import qualified Data.Vector as V
import Data.Csv
import Data.Foldable


main = do
   csv <- BL.readFile "train.csv"
   let Right res = decode HasHeader csv :: Either String (V.Vector(V.Vector(BL.ByteString)))
   print $ res V.! 0

2/ 使用Data.Csv.Streaming

{-# LANGUAGE BangPatterns #-}

import qualified Data.ByteString.Lazy as BL
import qualified Data.Vector as V
import Data.Csv.Streaming
import Data.Foldable


main = do
   csv <- BL.readFile "train.csv"
   let !a = decode HasHeader csv :: Records(V.Vector(BL.ByteString))
   let !res = V.fromList $ Data.Foldable.toList a
   print $ res V.! 0

请注意,我不会提供基于attoparsec-csv制作的程序,因为它与List几乎完全相同,而Vector则不同。该解决方案的内存使用仍然非常差。
有趣的是,在Data.Csv.Streaming解决方案中,如果我仅使用Data.Foldable.for_打印我的数据,一切都非常快,内存使用量为2MB。这让我想到我的问题与构造Vector的方式有关。可能积累了thunk而不是将原始数据堆叠到紧凑的数据结构中。
感谢您的帮助,
安托万

这个问题与懒ByteString有关。我通过定义strictRead path = evaluate . force <$> BL.read path来解决了这个问题,但我不认为这是最好的答案。你也可以尝试使用ByteString.toLazy <$> ByteString.Strict.readFile path,但我从未测试过。 - guaraqe
不幸的是,这也不起作用。 我做了这个: “csv”<- BL.readFile “train.csv” csv <- evaluate $ force csv'” 但我仍然得到大约2600MB的内存使用。 - Antoine Genton
我实际上写的是(evaluate . force) =<< BL.read path,但在展开之后应该与你的代码等效。嗯,所以我认为解决方案是使用cassava中的其他模块,很抱歉我无法提供帮助,我也对答案感兴趣。如果这里没有答案,请尝试在subreddit上询问,那里非常活跃。 - guaraqe
你能提供一下你的CSV数据的例子吗?更好的是,你能添加一个代码片段来生成类似于你的CSV的内容吗? - Zeta
这些数据是kaggle竞赛Santander(https://www.kaggle.com/c/santander-customer-satisfaction/)的训练集。它是一个71021行371列的csv文件。该csv文件的每个单元格可以是数字(float或int),也可以是短字符串(我想最多10个字符)。 - Antoine Genton
显示剩余2条评论
1个回答

6
Data.CSVData.CSV.Streaming之间的区别可能不是您所期望的。前者生成了一个csv内容的Data.Vector.Vector,就像您看到的那样。我不确定为什么构建这个向量会占用这么多空间--虽然当我反思这里产生28203420个指向惰性bytestrings的指针的向量-指向每行371个指针时,它开始不再让我感到惊讶,通常指向原始字节流的一个小碎片,通常是'0'。根据http://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html,这意味着原始字节流中典型的两个字节序列 - 几乎所有的都是这样的:",0"即[44,48] - 被一些指针和构造函数替换:仅惰性bytestring内容就使得每对字节占用大约11个字(惰性bytestring的ChunkEmpty构造函数,加上J Tibell放在9个单词的严格bytestring材料)...再加上原始字节(减去代表逗号和空格的字节)。在64位系统中,这是一个相当巨大的升级。

Data.CSV.Streaming并没有真正不同:基本上它构建了一个稍微装饰过的列表,而不是向量,因此原则上它可以被惰性评估,在理想情况下,整个东西不需要在内存中实现,正如您所注意到的那样。但是,在像这样的单调背景下,您将从IO“提取列表”,这并不完全保证会产生混乱和困惑。

如果您想正确地流式传输csv内容,则应使用...其中一个流式库。(我没有关于将整个内容放入内存的建议,除了明显的一点,即安排cassava将每行读入一个漂亮的紧凑数据类型而不是指向惰性bytestrings的向量;但是在这里,我们有371个“字段”)。

因此,以下是使用的程序,该程序使用cassava的(真正的)增量接口,然后使用io-streams创建记录流:

  {-# LANGUAGE BangPatterns #-}

  import qualified Data.ByteString.Lazy as BL
  import qualified Data.Vector as V
  import Data.Foldable
  import System.IO.Streams (InputStream, OutputStream)
  import qualified System.IO.Streams as Streams
  import qualified System.IO.Streams.Csv as CSV
  import System.IO

  type StreamOfCSV = InputStream (V.Vector(BL.ByteString))

  main = withFile "train.csv" ReadMode $ \h -> do
     input          <- Streams.handleToInputStream h 
     raw_csv_stream <- CSV.decodeStream HasHeader input
     csv_stream     <- CSV.onlyValidRecords raw_csv_stream :: IO StreamOfCSV
     m <- Streams.read csv_stream
     print m

这个程序在使用比 hello-world 相同的内存量,打印第一条记录后立即结束。你可以在教程https://github.com/pjones/cassava-streams/blob/master/src/System/IO/Streams/Csv/Tutorial.hs中找到更多操作方法。其他流库也有类似的库。如果你需要构建的数据结构(如矩阵)能够适应内存,那么你应该能够通过使用Streams.fold对每一行进行折叠来构建它。如果你试图从每一行提取的信息在被折叠操作消耗之前得到适当的评估,则不应该出现问题。如果你能安排cassava输出一个具有非递归数据结构和无装箱字段的类型,那么就可以为该类型编写一个Unbox实例,并将整个csv折叠成一个单一的紧密打包的无装箱向量。在这种情况下,每行都有371个不同的字段,因此我想这不是一个可选项。

以下是等效于 Data.CSV.Streaming 程序:

  main = withFile "train.csv" ReadMode $ \h -> do
    input          <- Streams.handleToInputStream h 
    raw_csv_stream <- CSV.decodeStream HasHeader input
    csv_stream     <- CSV.onlyValidRecords raw_csv_stream :: IO StreamOfCSV
    csvs <- Streams.toList csv_stream
    print (csvs !! 0)

由于使用了Streams.toList来收集巨大的列表后再尝试查找第一个元素,因此它也存在同样的问题。

-- 附加说明

这里是一个pipes-csv变量,它将每个解析的行手动压缩成一个未装箱的Int向量(这比使用bytestring包中的readInt查找实际存储的Doubles更简单)。

import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as U
import Data.Csv

import qualified Pipes.Prelude as P
import qualified Pipes.ByteString as Bytes
import Pipes
import qualified Pipes.Csv as Csv
import System.IO
import Control.Applicative

import qualified Control.Foldl as L

main = withFile "train.csv" ReadMode $ \h -> do
  let csvs :: Producer (V.Vector ByteString) IO ()
      csvs = Csv.decode HasHeader (Bytes.fromHandle h) >-> P.concat
      -- shamelessly reading integral part only, counting bad parses as 0
      simplify bs = case B.readInt bs of
        Nothing       -> 0
        Just (n, bs') -> n
      uvectors :: Producer (U.Vector Int) IO ()
      uvectors = csvs  >-> P.map (V.map simplify) >-> P.map (V.foldr U.cons U.empty)
  runEffect $ uvectors >-> P.print

您可以使用foldl库中的折叠,或者使用任何您想编写的折叠方法,只需将最后一行替换为以下内容即可折叠行:

  let myfolds = liftA3 (,,) (L.generalize (L.index 13))   -- the thirteenth row, if it exists
                            (L.randomN 3)   -- three random rows
                            (L.generalize L.length) -- number of rows
  (thirteen,mvs,len) <- L.impurely P.foldM myfolds uvectors

  case mvs of 
    Nothing -> return ()
    Just vs -> print (vs :: V.Vector (U.Vector Int))
  print thirteen
  print len

在这种情况下,我正在收集第十三行、三行随机行和记录的总数——可以将任意数量的其他折叠与这些组合。特别地,我们也可以使用L.vector将所有行收集到一个巨大的向量中,但考虑到这个csv文件的大小,这可能仍然是一个不好的想法。在下面,我们回到起点,收集所有内容并打印完成的向量中的第17行,即一种大矩阵的排序方式。
  vec_vec <- L.impurely P.foldM  L.vector uvectors
  print $ (vec_vec :: V.Vector (U.Vector Int)) V.! 17

这需要大量的内存,但并不会特别耗费我的小型笔记本电脑。


1
谢谢Michael的详细解释!我会更系统地使用流库,我以为可以用Data.Csv.Streaming代替它。 - Antoine Genton

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接