避免写文件,用于星火流空分区(Avoid write files for empty partiti

2019-10-29 23:49发布

我有星火流工作,这从读卡夫卡分区上的数据( 每个分区一个执行 )。
我需要保存转换值,HDFS,但需要避免空文件创建。
我试图用的isEmpty,但是当不是所有的分区都是空的,这并不帮助。

PS重新分区不是由于PERFOMANCE降解可接受的解决方案。

Answer 1:

代码工作只PairRDD。

代码文本:

  val conf = ssc.sparkContext.hadoopConfiguration
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[TextOutputFormat[Text, NullWritable]]
    classOf[OutputFormat[Text, NullWritable]])

  kafkaRdd.map(_.value -> NullWritable.get)
    .saveAsNewAPIHadoopFile(basePath,
      classOf[Text],
      classOf[NullWritable],
      classOf[LazyOutputFormat[Text, NullWritable]],
      conf)

代码Avro的:

  val avro: RDD[(AvroKey[MyEvent], NullWritable)]) = ....
  val conf = ssc.sparkContext.hadoopConfiguration

  conf.set("avro.schema.output.key", MyEvent.SCHEMA$.toString)
  conf.setClass("mapreduce.output.lazyoutputformat.outputformat",
    classOf[AvroKeyOutputFormat[MyEvent]],
    classOf[OutputFormat[AvroKey[MyEvent], NullWritable]])

  avro.saveAsNewAPIHadoopFile(basePath,
    classOf[AvroKey[MyEvent]],
    classOf[NullWritable],
    classOf[LazyOutputFormat[AvroKey[MyEvent], NullWritable]],
    conf)



文章来源: Avoid write files for empty partitions in Spark Streaming