我正在寻找一种方式来放射性散布分裂成两个或更多的RDDS。 我见过的最接近的是斯卡拉星火:斯普利特收集到几个RDD? 这仍然是一个单一的RDD。
如果你熟悉SAS,是这样的:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
这导致两个不同的数据集。 它必须立即坚持让我准备的结果...
这是不可能从单一转化产生多个RDDS *。 如果要拆分RDD你必须申请一个filter
为每个分割状态。 例如:
def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果你只有一个二进制状态和计算是昂贵的,你可能更喜欢这样的事情:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()
rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
这意味着只有一个谓语的计算,但需要对所有数据的附加通。
需要注意的是,只要输入RDD正确缓存,并且有关于数据分布没有额外的假设没有显著差异,当谈到时间复杂度重复过滤器之间和循环嵌套的if-else是很重要的。
与必须执行的操作的N个元素和M的条件数目显然是正比于N次M.在for循环的情况下,它应该是接近(N + MN)/ 2和反复过滤器是完全NM,但在结束时一天它不是别的,只是O(NM)。 你可以看到我的讨论**与杰森Lenderman阅读有关的一些优点和利弊。
在非常高的水平,你应该考虑两件事情:
星火转换是懒惰的,那么只有在执行一个动作你RDD不兑现
为什么这有关系? 让我们回到我的例子:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果后来我决定,我只需要rdd_odd
那么就没有理由兑现rdd_even
。
如果你看看你的SAS的例子来计算work.split2
你需要兑现输入数据和work.split1
。
RDDS提供一个声明API。 当您使用filter
或map
它完全取决于如何执行此操作星火引擎。 只要传递给转换的功能无副作用它创建多种可能性,以优化整个管道。
在一天结束的时候这种情况下是不够的特殊证明其自身的转型。
此地图器图案在核心火花实际使用。 见我的回答是怎样火花RDD.randomSplit实际上分裂RDD和相关部分的的randomSplit
方法。
如果唯一的目标就是实现对输入的分割,可以使用partitionBy
子句DataFrameWriter
其文本输出格式:
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)
*只有3种基本类型中的星火的转换:
- EET [T] => EET [T]
- EET [T] => EET [U]
- (EET [T] EET [U])=> EET [W]
其中T,U,W可以是原子类型或产品 /元组(K,V)。 任何其他操作必须使用上述的一些组合来表示。 您可以查看原RDD文件的更多细节。
** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
***参见斯卡拉星火:斯普利特收集到几个RDD?
正如其他海报上面提到的,没有一个统一的,原生RDD变换分割RDDS,但这里有一些“复”的操作,可以有效地模拟各种RDDS上的“分裂”, 而不读多次:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions
一些具体的随机拆分方法:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions
方法是从开源项目硅石:
https://github.com/willb/silex
一篇博客文章中解释它们是如何工作:
http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}
def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}
至于其他地方提到的,这些方法涉及的内存速度的权衡,因为他们通过计算整个分区结果“急切”,而不是运营“懒洋洋地”。 因此,有可能这些方法碰上在大分区上的内存问题,其中较为传统的懒变换不会。
如果您使用拆分RDD randomSplit API调用 ,你回来RDDS的数组。
如果你想5个RDDS返回,通过5个权重值。
如
val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)
splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)
一种方法是使用自定义分区划分取决于你的过滤条件的数据。 这可以通过扩展实现Partitioner
和实施类似的东西RangePartitioner
。
然后,地图分区可以被用于构建从分区RDD多个RDDS而不读取的所有数据。
val filtered = partitioned.mapPartitions { iter => {
new Iterator[Int](){
override def hasNext: Boolean = {
if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
false
} else {
iter.hasNext
}
}
override def next():Int = iter.next()
}
要知道,在过滤RDDS分区的数量将是一样的所以COALESCE应该用来降低下来并取出空分区的分区RDD数量。