如何拆分RDD成两个或更多RDDS?如何拆分RDD成两个或更多RDDS?(How do I spli

2019-05-10 17:30发布

我正在寻找一种方式来放射性散布分裂成两个或更多的RDDS。 我见过的最接近的是斯卡拉星火:斯普利特收集到几个RDD? 这仍然是一个单一的RDD。

如果你熟悉SAS,是这样的:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

这导致两个不同的数据集。 它必须立即坚持让我准备的结果...

Answer 1:

这是不可能从单一转化产生多个RDDS *。 如果要拆分RDD你必须申请一个filter为每个分割状态。 例如:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果你只有一个二进制状态和计算是昂贵的,你可能更喜欢这样的事情:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

这意味着只有一个谓语的计算,但需要对所有数据的附加通。

需要注意的是,只要输入RDD正确缓存,并且有关于数据分布没有额外的假设没有显著差异,当谈到时间复杂度重复过滤器之间和循环嵌套的if-else是很重要的。

与必须执行的操作的N个元素和M的条件数目显然是正比于N次M.在for循环的情况下,它应该是接近(N + MN)/ 2和反复过滤器是完全NM,但在结束时一天它不是别的,只是O(NM)。 你可以看到我的讨论**与杰森Lenderman阅读有关的一些优点和利弊。

在非常高的水平,你应该考虑两件事情:

  1. 星火转换是懒惰的,那么只有在执行一个动作你RDD不兑现

    为什么这有关系? 让我们回到我的例子:

     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even)) 

    如果后来我决定,我只需要rdd_odd那么就没有理由兑现rdd_even

    如果你看看你的SAS的例子来计算work.split2你需要兑现输入数据和work.split1

  2. RDDS提供一个声明API。 当您使用filtermap它完全取决于如何执行此操作星火引擎。 只要传递给转换的功能无副作用它创建多种可能性,以优化整个管道。

在一天结束的时候这种情况下是不够的特殊证明其自身的转型。

此地图器图案在核心火花实际使用。 见我的回答是怎样火花RDD.randomSplit实际上分裂RDD和相关部分的的randomSplit方法。

如果唯一的目标就是实现对输入的分割,可以使用partitionBy子句DataFrameWriter其文本输出格式:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

*只有3种基本类型中的星火的转换:

  • EET [T] => EET [T]
  • EET [T] => EET [U]
  • (EET [T] EET [U])=> EET [W]

其中T,U,W可以是原子类型或产品 /元组(K,V)。 任何其他操作必须使用上述的一些组合来表示。 您可以查看原RDD文件的更多细节。

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

***参见斯卡拉星火:斯普利特收集到几个RDD?



Answer 2:

正如其他海报上面提到的,没有一个统一的,原生RDD变换分割RDDS,但这里有一些“复”的操作,可以有效地模拟各种RDDS上的“分裂”, 而不读多次:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

一些具体的随机拆分方法:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

方法是从开源项目硅石:

https://github.com/willb/silex

一篇博客文章中解释它们是如何工作:

http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}

def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}

至于其他地方提到的,这些方法涉及的内存速度的权衡,因为他们通过计算整个分区结果“急切”,而不是运营“懒洋洋地”。 因此,有可能这些方法碰上在大分区上的内存问题,其中较为传统的懒变换不会。



Answer 3:

如果您使用拆分RDD randomSplit API调用 ,你回来RDDS的数组。

如果你想5个RDDS返回,通过5个权重值。

val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)

splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)


Answer 4:

一种方法是使用自定义分区划分取决于你的过滤条件的数据。 这可以通过扩展实现Partitioner和实施类似的东西RangePartitioner

然后,地图分区可以被用于构建从分区RDD多个RDDS而不读取的所有数据。

val filtered = partitioned.mapPartitions { iter => {

  new Iterator[Int](){
    override def hasNext: Boolean = {
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
        false
      } else {
        iter.hasNext
      }
    }

    override def next():Int = iter.next()
  }

要知道,在过滤RDDS分区的数量将是一样的所以COALESCE应该用来降低下来并取出空分区的分区RDD数量。



文章来源: How do I split an RDD into two or more RDDs?