Spark:spark-csv 处理时间过长。

3
我正在尝试使用Databricks spark-csv包和航班数据集,从EMR Spark集群上的S3 CSV源创建一个DataFrame:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('s3n://h2o-airlines-unpacked/allyears.csv')

df.first()

这不会在一个由4个m3.xlarge组成的集群上终止。我正在寻找一些建议,以在PySpark中从S3的CSV文件创建DataFrame。或者,我已经尝试将文件放在HDFS上并从HFDS读取,但也没有终止。该文件并不是特别大(12 GB)。

如果spark-csv库的版本是1.2.0+,你可以尝试将parserLib选项设置为univocity吗? - rchukh
1
@rchukh 这不是默认设置吗?我今天从主分支构建了jar包。编辑:不,不是默认设置。我会尝试一下。 - tchakravarty
1个回答

1

如果要读取一个只有12GB大小的良好格式的csv文件,您可以将其复制到所有工作机器和驱动程序机器上,然后手动按“,”拆分。这可能无法解析任何RFC4180 csv文件,但它可以解析我所拥有的。

  • 在征用集群时,为每个工作机器至少增加12GB的额外磁盘空间。
  • 使用至少拥有12GB RAM的机器类型,例如c3.2xlarge。如果您不打算让集群保持空闲并且负担得起更高的费用,请选择更大的机器。更大的机器意味着更少的磁盘文件复制以开始工作。我经常在spot市场上看到c3.8xlarge低于$0.50/hour。

将文件复制到每个工作机器上,在每个工作机器上的同一目录中。这应该是一个物理附加的驱动器,即每台机器上的不同物理驱动器。

确保您在驱动程序机器上具有相同的文件和目录。

raw = sc.textFile("/data.csv")

print "Counted %d lines in /data.csv" % raw.count()

raw_fields  = raw.first()
# this regular expression is for quoted fields. i.e. "23","38","blue",...
matchre = r'^"(.*)"$'
pmatchre = re.compile(matchre)

def uncsv_line(line):
    return [pmatchre.match(s).group(1) for s in line.split(',')]

fields = uncsv_line(raw_fields)

def raw_to_dict(raw_line):
    return dict(zip(fields, uncsv_line(raw_line)))

parsedData = (raw
        .map(raw_to_dict)
        .cache()
        )

print "Counted %d parsed lines" % parsedData.count()

parsedData将是一个字典的RDD,其中字典的键是来自第一行的CSV字段名称,值是当前行的CSV值。如果CSV数据中没有标题行,则这可能不适合您,但应清楚地表明您可以覆盖此处读取第一行的代码并手动设置字段。

请注意,这对于创建数据帧或注册Spark SQL表不是立即有用的。但对于其他任何事情,都可以使用,并且如果需要将其转储到Spark SQL中,则可以进一步提取和转换为更好的格式。

我在一个7GB的文件上使用它,没有任何问题,除了我删除了一些过滤逻辑以检测具有副作用的有效数据的头从解析数据中删除。您可能需要重新实现某些过滤器。


Paul,感谢您的评论,我很感激您尝试回答我的问题。但是,您是否建议采用不同的方法?比如先从HDFS中将CSV读入Hive,然后再从Hive表创建DataFrame?鉴于基础设施,最佳的读取12GB文件作为DataFrame的方法是什么? - tchakravarty
抱歉,我们目前这里并不使用HDFS/Hive基础设施,因此无法发表意见。 - Paul
Paul,请问您能否提供有关如何将一个包含字典的RDD转换为DataFrame的具体细节? - tchakravarty
使用map和lambda函数将每个字典转换为所需值的元组。手动编写模式。如果您想要每行中的所有值,可能更容易将解析映射更改为不使用zip。我还没有运行任何内容,但这似乎是可行的。 - Paul
如果你只需要快速的SQL,你也可以看看谷歌专有的托管BigQuery。它每TB扫描的费用为5美元,加上每月2美分的存储费用。 - Paul

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接