为了账号安全,请及时绑定邮箱和手机立即绑定

DataFrame/DataSet组的行为/优化

/ 猿问

DataFrame/DataSet组的行为/优化

慕姐4208626 2019-10-12 11:07:43

DataFrame/DataSet组的行为/优化

假设我们有DataFramedf由下列栏组成:

姓名,姓氏,大小,宽度,长度,体重

现在我们想要执行几个操作,例如,我们希望创建包含大小和宽度数据的几个DataFrame。

val df1 = df.groupBy("surname").agg( sum("size") )val df2 = df.groupBy("surname").agg( sum("width") )

您可以注意到,其他列,如Length,在任何地方都不使用。斯派克是否足够聪明,在洗牌阶段之前丢弃多余的列,还是它们被随身携带?Wil Run:

val dfBasic = df.select("surname", "size", "width")

在分组之前对性能有什么影响?



查看完整描述

2 回答

?
三国纷争

是的,是“够聪明". groupBy在DataFrame与groupBy在普通的RDD上执行。在您已经描述过的场景中,根本不需要移动原始数据。让我们创建一个小示例来说明:


val df = sc.parallelize(Seq(

   ("a", "foo", 1), ("a", "foo", 3), ("b", "bar", 5), ("b", "bar", 1)

)).toDF("x", "y", "z")


df.groupBy("x").agg(sum($"z")).explain


// == Physical Plan ==

// *HashAggregate(keys=[x#148], functions=[sum(cast(z#150 as bigint))])

// +- Exchange hashpartitioning(x#148, 200)

//    +- *HashAggregate(keys=[x#148], functions=[partial_sum(cast(z#150 as bigint))])

//       +- *Project [_1#144 AS x#148, _3#146 AS z#150]

//          +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#144, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#145, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#146]

//             +- Scan ExternalRDDScan[obj#143]

正如您可以看到的那样,第一阶段是只保留所需列的投影。下一个数据是本地聚合的,最后是全局传输和聚合的。如果使用SPark<=1.4,则会得到稍微不同的答案输出,但是一般结构应该完全相同。


最后,一个DAG可视化显示,上面的描述了实际的工作:


group by and agg DAG


同样,Dataset.groupByKey紧随其后reduceGroups,包含两个映射端(ObjectHashAggregate带着partial_reduceaggregator)和减少(ObjectHashAggregate带着reduceaggregator减少数):


case class Foo(x: String, y: String, z: Int)


val ds = df.as[Foo]

ds.groupByKey(_.x).reduceGroups((x, y) => x.copy(z = x.z + y.z)).explain


// == Physical Plan ==

// ObjectHashAggregate(keys=[value#126], functions=[reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@54d90261, Some(newInstance(class $line40.$read$$iw$$iw$Foo)), Some(class $line40.$read$$iw$$iw$Foo), Some(StructType(StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#128, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(x, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).x, true, false) AS x#25, y, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).y, true, false) AS y#26, z, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).z AS z#27) AS _2#129, newInstance(class scala.Tuple2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).x, true, false) AS x#25, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).y, true, false) AS y#26, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).z AS z#27, StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false), true, 0, 0)])

// +- Exchange hashpartitioning(value#126, 200)

//    +- ObjectHashAggregate(keys=[value#126], functions=[partial_reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@54d90261, Some(newInstance(class $line40.$read$$iw$$iw$Foo)), Some(class $line40.$read$$iw$$iw$Foo), Some(StructType(StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#128, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(x, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).x, true, false) AS x#25, y, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).y, true, false) AS y#26, z, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).z AS z#27) AS _2#129, newInstance(class scala.Tuple2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).x, true, false) AS x#25, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).y, true, false) AS y#26, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).z AS z#27, StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false), true, 0, 0)])

//       +- AppendColumns <function1>, newInstance(class $line40.$read$$iw$$iw$Foo), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#126]

//          +- *Project [_1#4 AS x#8, _2#5 AS y#9, _3#6 AS z#10]

//             +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#5, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#6]

//                +- Scan ExternalRDDScan[obj#3]

groupByKey + reduceGroups


但其他方法KeyValueGroupedDataset可能类似于RDD.groupByKey..例如mapGroups(或flatMapGroups)不使用部分聚合。


ds.groupByKey(_.x)

  .mapGroups((_, iter) => iter.reduce((x, y) => x.copy(z = x.z + y.z)))

  .explain


//== Physical Plan ==

//*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).x, true, false) AS x#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).y, true, false) AS y#38, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).z AS z#39]

//+- MapGroups <function2>, value#32.toString, newInstance(class $line15.$read$$iw$$iw$Foo), [value#32], [x#8, y#9, z#10], obj#36: $line15.$read$$iw$$iw$Foo

//   +- *Sort [value#32 ASC NULLS FIRST], false, 0

//      +- Exchange hashpartitioning(value#32, 200)

//         +- AppendColumns <function1>, newInstance(class $line15.$read$$iw$$iw$Foo), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#32]

//            +- *Project [_1#4 AS x#8, _2#5 AS y#9, _3#6 AS z#10]

//               +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#5, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#6]

//                  +- Scan ExternalRDDScan[obj#3]

groupByKey + mapGroups



查看完整回答
反对 回复 2019-10-13
?
繁星淼淼

我试图搜索和读取任何解释跨节点混合操作的源,这些操作的性能和分布是DataFrame(特别是)和RDD在节点上的操作,但是可以找到,给出的只是示例和输出。你能指导教授这样的概念的课程吗(比如RDD中的groupbyKey是昂贵的,而df中的groupby不是)

查看完整回答
反对 回复 2019-10-13

添加回答

回复

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信