我已经用Scala编写一个相对简单的工作星火从S3读取一些数据,进行一些转换和聚合,最后将结果存储到存储库。
在最后阶段,我有我的域模型的RDD,我想将它们分组为元素的块,这样我可以在我的仓库做一些质量插入。
我用RDDFunctions.sliding
方法来实现这一点,它的工作几乎罚款。 这里是我的代码的简化版本:
val processedElements: RDD[DomainModel] = _
RDDFunctions.fromRDD(processedElements)
.sliding(500, 500)
.foreach { elementsChunk =>
Await.ready(repository.bulkInsert(elementsChunk), 1.minute)
}
问题是,如果比如我有1020元,只有1000元在我的仓库结束。 它看起来像滑动忽略任何额外的元件如果窗口大小是比其余元素的量较大。
有什么办法解决呢? 如果没有,是否有任何其他的方式来实现,而无需使用相同的行为RDDFunctions.sliding
?
你不能只是使用foreachPartition
和手动批量管理?
fromRDD.foreachPartition(items: Iterator[DomainModel] => {
val batch = new ArrayBuffer[DomainModel](BATCH_SIZE)
while (items.hasNext) {
if (batch.size >= BATCH_SIZE) {
bulkInsert(batch)
batch.clear()
}
batch += items.next
}
if (!batch.isEmpty) {
bulkInsert(batch)
}
})
你说得对,星火的sliding
(不像Scala的),会如果窗口大小超过其余项目的数量产生空RDD,根据RDDFunctions文档 。 也不星火有Scala的一个等价grouped
。
如果你知道你多少组创建,一个可能适用的解决办法是对RDD与分裂modulo
滤波器。 这里的分裂RDD分为5组的轻视例如:
val rdd = sc.parallelize(Seq(
(0, "text0"), (1, "text1"), (2, "text2"), (3, "text2"), (4, "text2"), (5, "text5"),
(6, "text6"), (7, "text7"), (8, "text8"), (9, "text9"), (10, "text10"), (11, "text11")
))
def g(n:Int)(x: Int): Boolean = { x % 5 == n }
val rddList = (0 to 4).map( n => rdd.filter(x => g(n)(x._1)) )
(0 to 4).foreach(n => rddList(n).collect.foreach(println))
(0,text0)
(5,text5)
(10,text10)
(1,text1)
(6,text6)
(11,text11)
(2,text2)
(7,text7)
(3,text2)
(8,text8)
(4,text2)
(9,text9)