在PySpark RDD连接时出现“unhashable type: 'list'”错误

4

当我在pipelineRDD上运行collect时出现了错误。我也查看了View RDD contents in Python Spark?。我有两个需要在spark中运行join的输入文件。

文件格式如下:

able,991
about,11
burger,15
actor,22

[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/input/join1_FileB.txt
n-01 able,5
Feb-02 about,3
Mar-03 about,8
Apr-04 able,13
Feb-22 actor,3
Feb-23 burger,5
Mar-08 burger,2
Dec-15 able,100

我还为fileAfileB创建了映射器并验证了结果。

def split_fileB(line):
    key_val1 = line.split(",")
    dt_word  = key_val1[0].split(" ")
    count    = key_val1[1]
    date     = dt_word[0]
    word     = dt_word[1]
    return(word,date + " " + count)

def split_fileA(line):
    key_val = line.split(",")
    word = key_val[0].split(" ")
    count = int(key_val[1])
    return(word,count)

fileA_data = fileA.map(split_fileA)
fileA_data.collect()
## [(u'able', 991), (u'about', 11), (u'burger', 15), (u'actor', 22)] 

fileB_data = fileA.map(split_fileB)
fileB_data.collect()
## [(u'able', u'n-01 5'),
## (u'about', u'Feb-02 3'),
## (u'about', u'Mar-03 8'),
## (u'able', u'Apr-04 13'),
## (u'actor', u'Feb-22 3'),
## (u'burger', u'Feb-23 5'),
## (u'burger', u'Mar-08 2'),
## (u'able', u'Dec-15 100')]

fileB_joined_fileA = fileB_data.join(fileA_data)
fileB_joined_fileA.collect()


fileB_joined_fileA.collect()

追踪:

    16/02/17 03:20:01 INFO scheduler.DAGScheduler: Job 14 failed: collect at <ipython-input-45-20609ef53c7a>:1, took 0.318953 s
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-45-20609ef53c7a> in <module>()
----> 1 fileB_joined_fileA.collect()

/usr/lib/spark/python/pyspark/rdd.py in collect(self)
    699         """
    700         with SCCallSiteSync(self.context) as css:
--> 701             bytesInJava = self._jrdd.collect().iterator()
    702         return list(self._collect_iterator_through_file(bytesInJava))
    703 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o319.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 17.0 failed 1 times, most recent failure: Lost task 1.0 in stage 17.0 (TID 18, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 101, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 96, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1706, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/lib/spark/python/pyspark/shuffle.py", line 253, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'
1个回答

2

首先,您提供的输出与您的代码不匹配。split_fileA 首先通过逗号拆分输入字符串 a:

key_val = line.split(",")

key_val的第一个元素按空格拆分:

word = key_val[0].split(" ")

这意味着,假设没有格式错误的行,word 实际上是一个 list,而不是一个 string
"able,991".split(",")[0].split(" ")
## ['able']

作为结果,join 转换是没有意义的,也无法工作,因为列表不可哈希。
另请参阅:将列表用作 PySpark's reduceByKey 的键

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接