为了账号安全,请及时绑定邮箱和手机立即绑定

如何将新列添加到Spark DataFrame(使用PySpark)?

如何将新列添加到Spark DataFrame(使用PySpark)?

Smart猫小萌 2019-11-06 11:03:43
我有一个Spark DataFrame(使用PySpark 1.5.1),想添加一个新列。我已经尝试了以下方法,但没有成功:type(randomed_hours) # => list# Create in Python and transform to RDDnew_col = pd.DataFrame(randomed_hours, columns=['new_col'])spark_new_col = sqlContext.createDataFrame(new_col)my_df_spark.withColumn("hours", spark_new_col["new_col"])使用此命令也出错:my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))那么,如何使用PySpark将新列(基于Python向量)添加到现有DataFrame中?
查看完整描述

3 回答

?
宝慕林4294392

TA贡献2021条经验 获得超8个赞

您无法将任意列添加到DataFrameSpark中。只能通过使用文字来创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述)。


from pyspark.sql.functions import lit


df = sqlContext.createDataFrame(

    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))


df_with_x4 = df.withColumn("x4", lit(0))

df_with_x4.show()


## +---+---+-----+---+

## | x1| x2|   x3| x4|

## +---+---+-----+---+

## |  1|  a| 23.0|  0|

## |  3|  B|-23.0|  0|

## +---+---+-----+---+

转换现有列:


from pyspark.sql.functions import exp


df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))

df_with_x5.show()


## +---+---+-----+---+--------------------+

## | x1| x2|   x3| x4|                  x5|

## +---+---+-----+---+--------------------+

## |  1|  a| 23.0|  0| 9.744803446248903E9|

## |  3|  B|-23.0|  0|1.026187963170189...|

## +---+---+-----+---+--------------------+

包括使用join:


from pyspark.sql.functions import exp


lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))

df_with_x6 = (df_with_x5

    .join(lookup, col("x1") == col("k"), "leftouter")

    .drop("k")

    .withColumnRenamed("v", "x6"))


## +---+---+-----+---+--------------------+----+

## | x1| x2|   x3| x4|                  x5|  x6|

## +---+---+-----+---+--------------------+----+

## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|

## |  3|  B|-23.0|  0|1.026187963170189...|null|

## +---+---+-----+---+--------------------+----+

或使用函数/ udf生成:


from pyspark.sql.functions import rand


df_with_x7 = df_with_x6.withColumn("x7", rand())

df_with_x7.show()


## +---+---+-----+---+--------------------+----+-------------------+

## | x1| x2|   x3| x4|                  x5|  x6|                 x7|

## +---+---+-----+---+--------------------+----+-------------------+

## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|

## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|

## +---+---+-----+---+--------------------+----+-------------------+

在性能方面,pyspark.sql.functions映射到Catalyst表达式的内置函数()通常优于Python用户定义的函数。


如果您想将任意RDD的内容添加为列,则可以


将行号添加到现有数据框

调用zipWithIndexRDD并将其转换为数据帧

使用索引作为联接键将两者联接


查看完整回答
反对 回复 2019-11-06
  • 3 回答
  • 0 关注
  • 1058 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信