为了账号安全,请及时绑定邮箱和手机立即绑定

如何将RDD拆分为两个或多个RDD?

如何将RDD拆分为两个或多个RDD?

如何将RDD拆分为两个或多个RDD?我正在寻找一种将RDD分割成两个或多个RDD的方法。我最近看到的是ScalaSPark:将集合拆分成几个RDD?仍然是一个单一的RDD。如果您熟悉SAS,如下所示:data work.split1, work.split2;     set work.preSplit;     if (condition1)         output work.split1     else if (condition2)         output work.split2 run;这就产生了两个不同的数据集。必须立即坚持才能得到我想要的结果.。
查看完整描述

3 回答

?
慕沐林林

TA贡献2016条经验 获得超9个赞

不可能从单个转换*生成多个RDDs。如果要拆分RDD,则必须应用filter对于每个分裂的条件。例如:

def even(x): return x % 2 == 0def odd(x): return not even(x)rdd = sc.parallelize(range(20))rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果您只有二进制条件,而且计算成本很高,您可能更喜欢这样的东西:

kv_rdd = rdd.map(lambda x: (x, odd(x)))kv_rdd.cache()rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

它只意味着一个谓词计算,但需要对所有数据进行额外的传递。

需要注意的是,只要输入rdd被正确地缓存,并且没有关于数据分布的附加假设,在重复过滤器和嵌套if-倒换循环之间的时间复杂度方面没有显著差异。

对于N个元素和M条件,您必须执行的操作数显然与N乘以M成正比。如果是for-循环,它应该更接近于(N+MN)/2,重复滤波器正好是NM,但在一天结束时,它只不过是O(NM)。你可以看到我的讨论*詹森·伦德曼读一些正反两方面的文章。

在非常高的层次上,您应该考虑两件事:

  1. 星火转换是懒惰的,直到您执行一个操作,您的rdd才会成为现实。

    这有什么关系?回到我的例子:

    rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

    如果以后我决定我只需要rdd_odd那么就没有理由去实现rdd_even.

    如果您查看一下要计算的SAS示例work.split2您需要同时实现输入数据和work.split1.

  2. RDDs提供了一个声明性API。当你使用filtermap这完全取决于火花引擎如何执行这一操作。只要传递给转换的函数是无副作用的,它就会为优化整个管道创造多种可能性。

到头来,这起案件还不够特殊,不足以证明它本身的转变是合理的。

这个带有过滤器图案的地图实际上是在一个核心星火中使用的。见我对Sparks RDD.随机Split实际上是如何分割RDD的?和一个相关部分.的.randomSplit方法。

如果唯一的目标是实现输入的分割,则可以使用partitionBy条款DataFrameWriter哪种文本输出格式:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

*星火只有三种基本类型的转换:

  • RDD[T]=>RDD[T]
  • RDD[T]=>RDD[U]
  • (RDD[T],RDD[U])=>RDD[W]

其中T,U,W可以是原子类型,也可以是原子类型产品/元组(K,V)。任何其他操作都必须使用上述的某种组合来表示。你可以检查原始RDD论文更多细节。

** http:/chat.stackoverflow

*另见ScalaSPark:将集合拆分成几个RDD?


查看完整回答
反对 回复 2019-07-20
?
白猪掌柜的

TA贡献1893条经验 获得超10个赞

一种方法是使用自定义分区程序根据筛选条件对数据进行分区。这可以通过扩展Partitioner并实现类似于RangePartitioner.

然后,可以使用映射分区从分区RDD构造多个RDD,而无需读取所有数据。

val filtered = partitioned.mapPartitions { iter => {

  new Iterator[Int](){
    override def hasNext: Boolean = {
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
        false
      } else {
        iter.hasNext      }
    }

    override def next():Int = iter.next()
  }

请注意,筛选的RDD中的分区数将与分区RDD中的分区数相同,因此应该使用合并来减少这一点,并删除空分区。


查看完整回答
反对 回复 2019-07-20
  • 3 回答
  • 0 关注
  • 4094 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信