使用Python中的Hadoop处理大型CSV文件

3

我有一个巨大的CSV文件,想在亚马逊EMR上使用Hadoop MapReduce来处理它(使用Python)。

文件有7个字段,但我只关注日期数量字段。

 "date" "receiptId" "productId" "quantity"  "price" "posId" "cashierId"

首先,我的mapper.py文件

import sys

def main(argv):
    line = sys.stdin.readline()
    try:
        while line:
            list = line.split('\t')

            #If date meets criteria, add quantity to express key
                if int(list[0][11:13])>=17 and int(list[0][11:13])<=19:
                    print '%s\t%s' % ("Express", int(list[3]))
            #Else, add quantity to non-express key
                else:
                    print '%s\t%s' % ("Non-express", int(list[3]))

            line =  sys.stdin.readline()
except "end of file":
        return None
if __name__ == "__main__":
        main(sys.argv)

对于reducer,我将使用流式命令:aggregate。

问题:

  1. 我的代码正确吗?我在Amazon EMR上运行它,但输出为空。

  2. 因此,我的最终结果应该是:express,XXX和non-express,YYY。我可以将其做成返回结果前的除法操作吗?只需XXX/YYY的结果。我应该把这段代码放在哪里?一个reducer中吗?

  3. 此外,这是一个巨大的CSV文件,所以mapping会将其分成几个分区吗?还是我需要显式调用FileSplit?如果是这样,我该怎么做?


为什么不使用Python内置的CSV解析器? - papezjustin
1个回答

3

Answering my own question here!

  1. The code is wrong. If you're using aggregate library to reduce, your output does not follow the usual key value pair. It requires a "prefix".

    if int(list[0][11:13])>=17 and int(list[0][11:13])<=19:
        #This is the correct way of printing for aggregate library
        #Print all as a string.
        print  "LongValueSum:" + "Express" + "\t" + list[3]
    

    The other "prefixes" available are: DoubleValueSum, LongValueMax, LongValueMin, StringValueMax, StringValueMin, UniqValueCount, ValueHistogram. For more info, look here http://hadoop.apache.org/common/docs/r0.15.2/api/org/apache/hadoop/mapred/lib/aggregate/package-summary.html.

  2. Yes, if you want to do more than just the basic sum, min, max or count, you need to write your own reducer.

  3. I do not yet have the answer.


@Deyang 你好,我是一个Hadoop-Python的新手。我也有类似的工作要做,但是我在Hadoop目录中有多个CSV文件,我已经编写了脚本,在本地机器上运行正常。但是当我在集群上运行它时,它会出现“Streaming Command Failed”的错误。你能建议一下如何从HDFS目录中读取所有的CSV文件吗? - MegaBytes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接