创建PySpark DataFrame时出现索引错误:元组索引超出范围。

5
我想在pyspark数据帧中创建测试数据,但是我总是收到相同的“元组索引超出范围”的错误。当读取csv时,我不会收到此错误。感谢任何关于为什么我会收到此错误的想法。
我尝试的第一件事是创建一个pandas数据帧并将其转换为pyspark数据帧:
columns = ["id","col_"]
data = [("1", "blue"), ("2", "green"), 
        ("3", "purple"), ("4", "red"), 
        ("5", "yellow")]

df = pd.DataFrame(data=data, columns=columns)

sparkdf = spark.createDataFrame(df)
sparkdf.show()

输出:

PicklingError: Could not serialize object: IndexError: tuple index out of range

如果我按照SparkbyExamples.com的说明尝试从RDD创建数据框架,我会遇到相同的错误:
rdd = spark.sparkContext.parallelize(data)
sparkdf = spark.createDataFrame(rdd).toDF(*columns)
sparkdf.show()

我也尝试了以下操作,但是得到了相同的错误:

import pyspark.pandas as ps
df1 = ps.from_pandas(df)

这是运行上述代码时出现的完整错误信息:
IndexError                                Traceback (most recent call last)
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\serializers.py:458, in CloudPickleSerializer.dumps(self, obj)
    457 try:
--> 458     return cloudpickle.dumps(obj, pickle_protocol)
    459 except pickle.PickleError:

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     70 cp = CloudPickler(
     71     file, protocol=protocol, buffer_callback=buffer_callback
     72 )
---> 73 cp.dump(obj)
     74 return file.getvalue()

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
    601 try:
--> 602     return Pickler.dump(self, obj)
    603 except RuntimeError as e:

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:692, in CloudPickler.reducer_override(self, obj)
    691 elif isinstance(obj, types.FunctionType):
--> 692     return self._function_reduce(obj)
    693 else:
    694     # fallback to save_global, including the Pickler's
    695     # dispatch_table

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:565, in CloudPickler._function_reduce(self, obj)
    564 else:
--> 565     return self._dynamic_function_reduce(obj)

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:546, in CloudPickler._dynamic_function_reduce(self, func)
    545 newargs = self._function_getnewargs(func)
--> 546 state = _function_getstate(func)
    547 return (types.FunctionType, newargs, state, None, None,
    548         _function_setstate)

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:157, in _function_getstate(func)
    146 slotstate = {
    147     "__name__": func.__name__,
    148     "__qualname__": func.__qualname__,
   (...)
    154     "__closure__": func.__closure__,
    155 }
--> 157 f_globals_ref = _extract_code_globals(func.__code__)
    158 f_globals = {k: func.__globals__[k] for k in f_globals_ref if k in
    159              func.__globals__}

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle.py:334, in _extract_code_globals(co)
    331 # We use a dict with None values instead of a set to get a
    332 # deterministic order (assuming Python 3.6+) and avoid introducing
    333 # non-deterministic pickle bytes as a results.
--> 334 out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
    336 # Declaring a function inside another one using the "def ..."
    337 # syntax generates a constant code object corresponding to the one
    338 # of the nested function's As the nested function may itself need
    339 # global variables, we need to introspect its code, extract its
    340 # globals, (look for code object in it's co_consts attribute..) and
    341 # add the result to code_globals

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle.py:334, in <dictcomp>(.0)
    331 # We use a dict with None values instead of a set to get a
    332 # deterministic order (assuming Python 3.6+) and avoid introducing
    333 # non-deterministic pickle bytes as a results.
--> 334 out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
    336 # Declaring a function inside another one using the "def ..."
    337 # syntax generates a constant code object corresponding to the one
    338 # of the nested function's As the nested function may itself need
    339 # global variables, we need to introspect its code, extract its
    340 # globals, (look for code object in it's co_consts attribute..) and
    341 # add the result to code_globals

IndexError: tuple index out of range

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
Cell In [67], line 2
      1 rdd = spark.sparkContext.parallelize(data)
----> 2 df1 = ps.from_pandas(df)
      3 sparkdf = spark.createDataFrame(rdd).toDF(*columns)
      4 #Create a dictionary from each row in col_

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\pandas\namespace.py:153, in from_pandas(pobj)
    151     return Series(pobj)
    152 elif isinstance(pobj, pd.DataFrame):
--> 153     return DataFrame(pobj)
    154 elif isinstance(pobj, pd.Index):
    155     return DataFrame(pd.DataFrame(index=pobj)).index

File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\pandas\frame.py:450, in DataFrame.__init__(self, data, index, columns, dtype, copy)
    448     else:
    449         pdf = pd.DataFrame(data=data, index=index, columns=columns, dtype=dtype, copy=copy)
--> 450     internal = InternalFrame.from_pandas(pdf)
    452 object.__setattr__(self, "_internal_frame", internal)
...
    466     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    467 print_exec(sys.stderr)
--> 468 raise pickle.PicklingError(msg)

PicklingError: Could not serialize object: IndexError: tuple index out of range

我无法复现这个错误。所有的代码都能够完美地输出dataframe。代码和样本数据中没有任何可疑之处。你是在使用以上样本数据时出现了错误吗? - Azhar Khan
是的,以上所有代码都会导致错误。我还在帖子中添加了完整的错误信息。 - Jonathan P
1个回答

15

阅读了一些资料后,我检查了https://pyreadiness.org/3.11,发现最新版本的 Python 并不支持 pyspark。通过降级到 Python 3.9,我成功解决了这个问题。


3
非常感谢您发布这个答案!!今天这个问题让我烦恼不已,而且我在这个主题上找不到其他任何东西。 令人沮丧的是,官方文档说支持的版本是“Python 3.7及以上”,但事实显然并非如此... - Meorge
3
谢谢!我在3.11上遇到了同样的错误。更新一下,今天PySpark支持3.10版本,我也成功了。 - Hongbo Miao

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接