如何在数据集中更新值?

6
据我所知,Apache Spark没有模拟“更新”SQL命令的功能。例如,我可以根据特定条件更改列中的单个值。唯一的解决方法是使用下面这个命令(在Stackoverflow上指导):withColumn(columnName, where('condition', value)); 然而,条件应该是“column”类型,这意味着我必须使用Apache中内置的列过滤函数(equalToisinltgt等)。有没有办法可以使用SQL语句代替那些内置函数?
问题是我得到了一个包含SQL语句的文本文件,例如WHERE ID > 5WHERE AGE != 50等。然后我必须根据这些条件标记值,我想使用withColumn()方法,但我不能在该函数中插入SQL语句。有什么好的解决方法吗?
4个回答

3
我发现了一个解决方法:
你需要将数据集分成两个部分:你想要更新的值和你不想要更新的值。
Dataset<Row> valuesToUpdate = dataset.filter('conditionToFilterValues');
Dataset<Row> valuesNotToUpdate = dataset.except(valuesToUpdate);

valueToUpdate = valueToUpdate.withColumn('updatedColumn', lit('updateValue'));

Dataset<Row> updatedDataset = valuesNotToUpdate.union(valueToUpdate);

然而,这并不保持原始数据集相同的记录顺序,因此如果顺序对您很重要,这将无法满足您的需求。

在 PySpark 中,您必须使用 .subtract 而不是 .except。


0

when子句转换为case子句,您可以将其与SQL case子句相关联。

示例

scala> val condition_1 = when(col("col_1").isNull,"NA").otherwise("AVAILABLE")
condition_1: org.apache.spark.sql.Column = CASE WHEN (col_1 IS NULL) THEN NA ELSE AVAILABLE END

或者您也可以链接when子句

scala> val condition_2 = when(col("col_1") === col("col_2"),"EQUAL").when(col("col_1") > col("col_2"),"GREATER").
     | otherwise("LESS")
condition_2: org.apache.spark.sql.Column = CASE WHEN (col_1 = col_2) THEN EQUAL WHEN (col_1 > col_2) THEN GREATER ELSE LESS END

scala> val new_df = df.withColumn("condition_1",condition_1).withColumn("condition_2",condition_2)

如果您仍然想使用表格,那么您可以将您的数据框/数据集注册为临时表并执行SQL查询。

df.createOrReplaceTempView("tempTable")//spark 2.1 +
df.registerTempTable("tempTable")//spark 1.6

现在,您可以执行 SQL 查询

spark.sql("your queries goes here with case clause and where condition!!!")//spark 2.1
sqlContest.sql("your queries goes here with case clause and where condition!!!")//spark 1.6

0

如果您正在使用Java数据集,您可以按照以下步骤更新数据集。 这是代码:

Dataset ratesFinal1 = ratesFinal.filter("on_behalf_of_comp_id != 'COMM_DERIVS'");

ratesFinal1 = ratesFinal1.filter("status != 'Hit/Lift'");

Dataset ratesFinalSwap = ratesFinal1.filter("on_behalf_of_comp_id in ('SAPPHIRE','BOND') and cash_derivative != 'cash'");

ratesFinalSwap = ratesFinalSwap.withColumn("ins_type_str",functions.lit("SWAP"));

添加新列并从现有列中获取值

ratesFinalSTW = ratesFinalSTW.withColumn("action", ratesFinalSTW.col("status"));


0
如果您正在使用DataFrame,可以将该数据框注册为临时表,使用df.registerTempTable("events")。
然后您可以像这样查询: sqlContext.sql("SELECT * FROM events "+)

但我仍然无法更新数据集,因为Spark不接受“UPDATE”语句。 - Guillermo Herrera
一旦您根据输入条件获取了筛选后的数据框,您可以在数据框中创建一个新列,并更新其值。 - magic

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接