为了账号安全,请及时绑定邮箱和手机立即绑定

函数编程中的reduce和foldLeft / fold之间的区别

函数编程中的reduce和foldLeft / fold之间的区别

大话西游666 2019-11-23 12:45:24
为什么Scala和Spark和Scalding等框架同时具有reduce和foldLeft?那么,reduce和之间有什么区别fold?
查看完整描述

3 回答

?
月关宝盒

TA贡献1772条经验 获得超5个赞

foldApache Spark中的内容与fold未分发的集合中的内容不同。实际上,它需要交换函数才能产生确定性的结果:


这与以Scala之类的功能语言为非分布式集合实现的折叠操作有些不同。该折叠操作可以单独应用于分区,然后将那些结果折叠为最终结果,而不是以某些定义的顺序将折叠应用于每个元素。对于非交换函数,结果可能与应用于非分布式集合的折叠结果不同。


Mishael Rosenthal 已证明了这一点,Make42在其评论中建议了这一点。


有人建议观察到的行为与HashPartitioner何时parallelize不洗牌和不使用有关HashPartitioner。


import org.apache.spark.sql.SparkSession


/* Note: standalone (non-local) mode */

val master = "spark://...:7077"  


val spark = SparkSession.builder.master(master).getOrCreate()


/* Note: deterministic order */

val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])

require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })


/* Note: all posible permutations */

require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

解释:


foldRDD的结构


def fold(zeroValue: T)(op: (T, T) => T): T = withScope {

  var jobResult: T

  val cleanOp: (T, T) => T

  val foldPartition = Iterator[T] => T

  val mergeResult: (Int, T) => Unit

  sc.runJob(this, foldPartition, mergeResult)

  jobResult

}

与RDD的结构reduce相同:


def reduce(f: (T, T) => T): T = withScope {

  val cleanF: (T, T) => T

  val reducePartition: Iterator[T] => Option[T]

  var jobResult: Option[T]

  val mergeResult =  (Int, Option[T]) => Unit

  sc.runJob(this, reducePartition, mergeResult)

  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))

}

在runJob不考虑分区顺序的情况下执行,导致需要交换功能。


foldPartition并且reducePartition在处理顺序上有效,reduceLeft并且foldLeft在上有效执行(通过继承和委派)TraversableOnce。


结论:foldRDD不能依赖于块的顺序,而是需要可交换性和关联性。


查看完整回答
反对 回复 2019-11-23
  • 3 回答
  • 0 关注
  • 949 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信