PySpark错误:AttributeError:'NoneType'对象没有属性'_jvm'。

34

我有一个时间戳数据集,其格式为:

我在Pyspark中编写了一个UDF来处理此数据集并将其作为键值对映射返回。但是我收到以下错误消息:

数据集:df_ts_list

+--------------------+
|             ts_list|
+--------------------+
|[1477411200, 1477...|
|[1477238400, 1477...|
|[1477022400, 1477...|
|[1477224000, 1477...|
|[1477256400, 1477...|
|[1477346400, 1476...|
|[1476986400, 1477...|
|[1477321200, 1477...|
|[1477306800, 1477...|
|[1477062000, 1477...|
|[1477249200, 1477...|
|[1477040400, 1477...|
|[1477090800, 1477...|
+--------------------+

Pyspark用户自定义函数:

>>> def on_time(ts_list):
...     import sys
...     import os
...     sys.path.append('/usr/lib/python2.7/dist-packages')
...     os.system("sudo apt-get install python-numpy -y")
...     import numpy as np
...     import datetime
...     import time
...     from datetime import timedelta
...     ts = np.array(ts_list)
...     if ts.size == 0:
...             count = 0
...             duration = 0
...             st = time.mktime(datetime.now())
...             ymd = str(datetime.fromtimestamp(st).date())
...     else:
...             ts.sort()
...             one_tag = []
...             start = float(ts[0])
...             for i in range(len(ts)):
...                     if i == (len(ts)) - 1:
...                             end = float(ts[i])
...                             a_round = [start, end]
...                             one_tag.append(a_round)
...                     else:
...                             diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
...                             if abs(diff.total_seconds()) > 3600:
...                                     end = float(ts[i])
...                                     a_round = [start, end]
...                                     one_tag.append(a_round)
...                                     start = float(ts[i+1])
...             one_tag = [u for u in one_tag if u[1] - u[0] > 300]
...             count = int(len(one_tag))
...             duration = int(np.diff(one_tag).sum())
...             ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
...     return {'count':count,'duration':duration, 'ymd':ymd}

Pyspark 代码:

>>> on_time=udf(on_time, MapType(StringType(),StringType()))
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<stdin>", line 27, in on_time
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'
任何帮助都将不胜感激!
8个回答

73

Mariusz的答案并没有真正帮助到我。因此,如果你像我一样发现这是谷歌上唯一的结果并且对pyspark(以及Spark)还不熟悉,请看看下面的方法。

在我的情况下,我之所以会出现那个错误,是因为我试图在pyspark环境设置之前执行pyspark代码。

确保在调用依赖于pyspark.sql.functions的函数之前,pyspark已经可用并已设置好,这对我解决了问题。


14
当作为其他人的附加内容时,我遇到了这个错误。当我的spark会话没有设置好,而我使用一个装饰器来添加模式定义一个pyspark UDF时,就会出现这个错误。通常我会在主函数中设置spark会话,但在这种情况下,由于需要传递复杂的模式,我需要在脚本顶部设置它。感谢快速的提示! - Renée
1
此外,当我在函数的默认值中使用Spark函数时,会出现错误,因为这些函数是在导入时而不是调用时进行评估的。例如:def func(is_test = lit(False)) - JacksonHaenchen
6
对于像我这样愚蠢的人来说,如果你在“pandas_udf”(应该接收pandas代码)中编写了“pyspark”代码,就会出现此错误。 - Ric S
2
@Mari,我能给你的建议就是在初始化Spark上下文之前不能使用pyspark函数。在我的情况下,我将它们用作默认参数值,但这些值在导入时被计算,而不是运行时,因此Spark上下文未被初始化。所以我只需将其更改为None,并在函数内部进行检查即可。 - JacksonHaenchen
1
@Mari 我最近遇到了这个问题。如果你想采用这种构造方式,而不是将其分配为变量,请通过函数返回它。例如:def is_not_empty(): return (col('var') != lit(''))。然后将其作为函数而不是变量使用。这样可以在调用时(在初始化Spark上下文之后)实例化它,而不是在加载模块时。 - Brendan
显示剩余5条评论

25

错误信息显示在udf的第27行调用了一些pyspark sql函数。这一行有abs(),因此我猜测在它之前的某个地方你调用了from pyspark.sql.functions import *并且它覆盖了python的abs()函数。


1
有没有一种方法可以在不删除行 from pyspark.sql.functions import * 的情况下使用原始的 abs() 函数? - egeogretmen
5
@mufmuf,当然可以使用__builtin__.abs作为指向Python函数的指针。 - Mariusz
3
你可以导入pyspark.sql.functions作为F,并使用F.function_name调用pyspark函数。 - Vaibhav
这样一个错误信息太奇怪了,完全是误导性的。我想知道是什么路径导致了与 _jvm 属性相关的错误。 - Mehdi LAMRANI
@MehdiLAMRANI 所有的pyspark代码都是由py4j评估的。它们都被转译为JVM调用。 - OneCricketeer
显示剩余2条评论

12

为了明确起见,许多人遇到的问题源于一种糟糕的编程风格,即 from blah import *

当你们执行以下操作时:

from pyspark.sql.functions import *

您会覆盖 很多 python 内置函数。我强烈建议导入类似以下的函数:

import pyspark.sql.functions as f
# or 
import pyspark.sql.functions as pyf

3
这个建议帮助我纠正了导入时使用“*”的坏习惯。希望其他人也能改正这个问题。 - Maeror

4

udf无法处理None值时,也会出现此异常。例如,以下代码将导致相同的异常:

get_datetime = udf(lambda ts: to_timestamp(ts), DateType())
df = df.withColumn("datetime", get_datetime("ts"))

然而,这一个不行:
get_datetime = udf(lambda ts: to_timestamp(ts) if ts is not None else None, DateType())
df = df.withColumn("datetime", get_datetime("ts"))

3

请确保您正在初始化Spark上下文。例如:

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("...") \
    .getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("com.mongodb.spark.sql").load()

或者就像这样。
spark = SparkSession.builder.appName('company').getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("csv").option("delimiter", ",") \
    .option("quote", "\"").option("escape", "\"") \
    .option("header", "true").option("inferSchema", "true") \
    .load("/path/thecsv.csv")

您可以使用SparkSession获取Dataframe读取器,无需使用SQL上下文。 - OneCricketeer

0

我遇到了同样的问题,当我的代码中有Python的round()函数时,就像@Mariusz所说的那样,Python的round()函数被覆盖了

解决方法是使用__builtin__.round()代替round(),就像@Mariusz在他的答案评论中提到的那样。


或者你可以重新命名你已经定义或导入的任何其他圆形函数。 - OneCricketeer

0

我在我的Jupyter笔记本中发现了这个错误。我添加了以下命令

import findspark
findspark.init()
sc = pyspark.SparkContext(appName="<add-your-name-here>")

它起作用了。

这是相同的问题:Spark上下文未准备好或已停止


你应该使用SparkSession。如果需要,你可以从中获取上下文。 - OneCricketeer

0
很可能,这个错误是由于缺少Spark会话创建而引起的。因此,应该创建Spark会话。
spark = SparkSession.builder
       .master('yarn')
       .appName('a').getOrCreate()

这应该解决问题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接