从Numpy矩阵创建Spark dataframe

18

这是我第一次使用PySpark(Spark 2),我试图为Logit模型创建一个玩具数据框。我已经成功运行了教程,现在想把自己的数据传递进去。

我尝试了以下代码:

%pyspark
import numpy as np
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.regression import LabeledPoint

df = np.concatenate([np.random.randint(0,2, size=(1000)), np.random.randn(1000), 3*np.random.randn(1000)+2, 6*np.random.randn(1000)-2]).reshape(1000,-1)
df = map(lambda x: LabeledPoint(x[0], Vectors.dense(x[1:])), df)

mydf = spark.createDataFrame(df,["label", "features"])

但我无法摆脱:

TypeError: Cannot convert type <class 'pyspark.ml.linalg.DenseVector'> into Vector

我正在使用ML库进行向量操作,输入是一个双精度数组,请问有什么需要注意的地方吗?根据文档,应该没有问题。

非常感谢。

3个回答

12

从Numpy到Pandas再到Spark:

data = np.random.rand(4,4)
df = pd.DataFrame(data, columns=list('abcd'))
spark.createDataFrame(df).show()

输出:

+-------------------+-------------------+------------------+-------------------+
|                  a|                  b|                 c|                  d|
+-------------------+-------------------+------------------+-------------------+
| 0.8026427193838694|0.16867056812634307|0.2284873209015007|0.17141853164400833|
| 0.2559088794287595| 0.3896957084615589|0.3806810025185623| 0.9362280141470332|
|0.41313827425060257| 0.8087580640179158|0.5547653674054028| 0.5386190454838264|
| 0.2948395900484454| 0.4085807623354264|0.6814694724946697|0.32031773805256325|
+-------------------+-------------------+------------------+-------------------+

问题是,如果您要继续使用Spark ML处理这些数据,您将需要类似VectorAssembler的东西进一步向下转换,以将4列转换为单个列,例如我的答案中的features,因为Spark ML需要以这种形式提供特征... - desertnaut

8

您正在混合使用 ML 和 MLlib 的功能,它们不一定兼容。在使用 spark-ml 时,您不需要 LabeledPoint

sc.version
# u'2.1.1'

import numpy as np
from pyspark.ml.linalg import Vectors

df = np.concatenate([np.random.randint(0,2, size=(1000)), np.random.randn(1000), 3*np.random.randn(1000)+2, 6*np.random.randn(1000)-2]).reshape(1000,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), df)

mydf = spark.createDataFrame(dff,schema=["label", "features"])

mydf.show(5)
# +-----+-------------+ 
# |label|     features| 
# +-----+-------------+ 
# |    1|[0.0,0.0,0.0]| 
# |    0|[0.0,1.0,1.0]| 
# |    0|[0.0,1.0,0.0]| 
# |    1|[0.0,0.0,1.0]| 
# |    0|[0.0,1.0,0.0]|
# +-----+-------------+

注意:从Spark 2.0开始,spark.mllib包中基于RDD的API已进入维护模式。现在,Spark的主要机器学习API是基于DataFrame的API,位于spark.ml包中。 [参考链接]


1
我认为澄清这些事情非常重要,因为这是混乱开始的地方。这并不是 OP 第一次混淆它们,在某个时候他们会问自己应该使用什么。 - eliasah
1
当你第一次看到它时,可能会感到有些困惑。 :) https://www.nodalpoint.com/spark-classification/ - Jan Sila
我认为你应该使用 column_stack 而不是 concatenate - Amanda

2
问题很容易解决。您同时使用了mlmllibAPI,请坚持使用其中一个。否则,您将收到此错误消息。
以下是mllibAPI的解决方案:
import numpy as np
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.mllib.regression import LabeledPoint

df = np.concatenate([np.random.randint(0,2, size=(1000)), np.random.randn(1000), 3*np.random.randn(1000)+2, 6*np.random.randn(1000)-2]).reshape(1000,-1)
df = map(lambda x: LabeledPoint(x[0], Vectors.dense(x[1:])), df)

mydf = spark.createDataFrame(df,["label", "features"])

对于ml API,您不再需要使用LabeledPoint。这里提供一个示例。建议使用ml API,因为mllib API很快就会被弃用。

非常感谢您的回答。我已经给desertnaut打了赏和点了您的赞。非常感谢! - Jan Sila
我也点赞了,因为它补充了我的回答(mllib)。现在看不到了,但我们两个的回答只相差2分钟 - 很酷... :) - desertnaut

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接