97得票8回答
在RDD/Spark DataFrame中,根据特定列删除重复行。

假设我有一个相当大的数据集,形式如下: data = sc.parallelize([('Foo', 41, 'US', 3), ('Foo', 39, 'UK', 1), ('Bar', 57, '...

95得票6回答
在Spark DataFrame中添加一个空列

正如在许多 其他网站上提到的,向现有DataFrame添加新列并不简单。尽管在分布式环境中效率低下,但拥有此功能非常重要,特别是在尝试使用unionAll连接两个DataFrame时。 添加一个null列以便于使用unionAll,最优雅的解决方案是什么? 我的版本如下: from py...

95得票5回答
在Spark中更新数据框的列

看到新的spark DataFrame API,不清楚是否可以修改dataframe列。 如何修改dataframe中行 x 列 y 的值? 在 pandas 中,可以这样做: df.ix[x,y] = new_value 编辑: 总结下面所说的,你不能修改现有的数据帧,因为它是不可...

95得票4回答
如何在Pyspark中使用多列进行连接?

我正在使用Spark 1.3,并希望使用Python接口(SparkSQL)在多个列上进行连接。 以下代码是有效的: 我首先将它们注册为临时表。numeric.registerTempTable("numeric") Ref.registerTempTable("Ref") test ...

93得票4回答
创建Spark DataFrame。无法推断类型的模式。

有人可以帮我解决我在Spark DataFrame中遇到的问题吗? 当我执行myFloatRDD.toDF()时,出现错误: TypeError:无法推断出类型为“float”的模式 我不明白为什么会这样... 示例:myFloatRdd = sc.parallelize([1...

92得票7回答
PySpark:如何以表格形式显示 Spark 数据框

我正在使用pyspark读取一个像下面这样的parquet文件:my_df = sqlContext.read.parquet('hdfs://myPath/myDB.db/myTable/**') 当我使用my_df.take(5)时,它会显示[Row(...)]而不是像使用pandas数据...

92得票6回答
如何在Spark 2.0+中编写单元测试?

我一直在努力寻找一种合理的方法,使用JUnit测试框架来测试SparkSession。虽然有一些对于SparkContext的好例子,但是我无法弄清楚如何让相应的例子适用于SparkSession,即使它在spark-testing-base中的内部也被多次使用。如果这不是正确的方法,我也很乐...

90得票2回答
Spark - 选择WHERE还是过滤?

使用where子句进行选择和在Spark中进行过滤有什么区别? 是否存在某些情况,其中一个比另一个更合适? 何时使用哪种方法?DataFrame newdf = df.select(df.col("*")).where(df.col("somecol").leq(10)) 什么时候是Data...

90得票10回答
如何对Spark DataFrame进行透视?

我开始使用Spark DataFrames,需要将数据透视以从一个具有多行的列创建多个列。Scalding和Python中的Pandas都有内置功能来执行此操作,但是我找不到新的Spark Dataframe中的任何功能。 我假设我可以编写某种自定义函数来执行此操作,但我甚至不知道如何开始,特...

87得票22回答
如何在Spark中将两个列数不同的DataFrame进行合并?

我有两个DataFrame: 我需要像这样合并它们: unionAll函数不起作用,因为列数和列名不同。 我该怎么做?