在我的代码,我第一次订阅卡夫卡流处理每个RDD创建我的类的实例People
,然后,我想结果集(发布Dataset[People]
)特定主题卡夫卡。 需要注意的是不能从卡夫卡收到每个传入的消息映射到的实例是很重要的People
。 此外,人的实例应该完全按照从卡夫卡收到了同样的顺序发送到卡夫卡。
不过,我不知道,如果排序是非常必要的,或者的情况下, People
在各自的代码在运行执行人保持相同的顺序(我可以直接我的数据集发布到卡夫卡)。 据我明白,排序是必需的,因为内的代码foreachRDD
可以在集群中的不同节点上执行。 这个对吗?
这里是我的代码:
val myStream = KafkaUtils.createDirectStream[K, V](streamingContext, PreferConsistent, Subscribe[K, V](topics, consumerConfig))
def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
case (rdd, time) if !rdd.isEmpty =>
// More Code...
// In the end, I have: Dataset[People]
case _ =>
}
myStream.foreachRDD((x, y) => process((x, y))) // Do I have to replace this call with map, sort the RDD and then publish it to Kafka?