星火流 - 阅读和卡夫卡的话题写星火流 - 阅读和卡夫卡的话题写(Spark Streaming -

2019-05-13 06:05发布

我使用的Spark流来处理两个卡夫卡队列之间的数据,但我似乎无法找到从星火卡夫卡写的好办法。 我曾经尝试这样做:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

和它的作品如预期,但实例化一个新的KafkaProducer每一个消息显然是不可行的现实情况下,我试图解决它。

我想继续为每个进程一个实例的引用,并访问它时,我需要发送消息。 我为什么能写入卡夫卡从星火流?

Answer 1:

我的第一个建议是尝试在foreachPartition创建一个新的实例,并衡量,如果这是速度不够快为您的需求(在foreachPartition实例重物的官方文档建议是什么)。

另一种选择是使用一个对象池如本例所示:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

然而,我发现很难使用检查点时实现。

另一个版本是为我工作好是在下面的博客文章中描述的工厂,你只需要检查它是否适合您的需要提供足够的并行(查看评论部分):

http://allegro.tech/2015/08/spark-kafka-integration.html



Answer 2:

是的,不幸的是星火(1.x中,2.X)不让它直着如何写卡夫卡有效的方式。

我建议以下方法:

  • 使用(和再利用)一个KafkaProducer每执行过程/ JVM实例。

这里的高级别设置为这种方式:

  1. 首先,你必须“包装”卡夫卡的KafkaProducer因为,正如你提到的,它不是序列化。 结束语让你“船”,它的执行者。 这里的关键思想是使用一个lazy val使你延迟实例化生产,直到其第一次使用,这是一个有效的解决办法,这样你就不用担心KafkaProducer不被序列化。
  2. 你“船”的制片人包裹向每个执行者利用广播变量。
  3. 在您的实际处理逻辑,您可以通过广播变量访问包裹制片人,并用它来写处理结果反馈给卡夫卡。

下面星火流工作的代码片段星火2.0。

步骤1:包装KafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

第2步:使用广播变量给每个执行自己的包裹KafkaProducer实例

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

步骤3:从火花流卡夫卡写入,重新使用相同的包裹KafkaProducer实例(每个执行器)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}

希望这可以帮助。



Answer 3:

存在通过Cloudera的维护的流卡夫卡作家(实际上是从一个火花JIRA分拆[1] )。 它基本上创建每个分区制片人,其摊销所花的时间来创造超过元素(希望大)集合“重”的对象。

作家可以在这里找到: https://github.com/cloudera/spark-kafka-writer



Answer 4:

我也有同样的问题,并发现这个职位 。

笔者通过建立每执行1名制片人解决了这个问题。 而不是发送生产者本身的,他只发送一个“处方”如何通过广播它来创建一个执行制片人。

    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

他使用了懒洋洋地创建生产包装:

    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }


    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }

因为卡夫卡制片人只是一个执行者第一次使用前初始化该包装是可序列化。 司机保持参照包装和包装发送使用每个执行人的制作中的消息:

    dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }


Answer 5:

与火花> = 2.2

读取和写入操作都可能在卡夫卡使用结构化流API

从卡夫卡的主题构建流

// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
  .readStream // use `read` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

阅读键和值,并应用模式两种,为了简单起见,我们正在他们都转换为String类型。

val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

由于dsStruc有模式,它接受像所有的SQL样操作filteraggselect ..等就可以了。

写流卡夫卡话题

dsStruc
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

更多卡夫卡整合配置来读取或写入

关键工件在应用程序中添加

 "org.apache.spark" % "spark-core_2.11" % 2.2.0,
 "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
 "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,


Answer 6:

为什么不可行? 从根本上说每个RDD的每个分区都将独立运行(并且可以在不同的群集节点上运行良好),所以你必须要重做每个分区的任务开始的连接(和任何同步)。 如果该开销太高,那么你应该增加你的批量大小StreamingContext ,直到它变得可以接受(OBV。有一个延迟的成本这样做)。

(如果你不处理数千封邮件的每个分区,你确定你需要的火花流呢?你会做一个独立的应用程序更好?)



Answer 7:

这可能是你想要做什么。 基本上,你创建的记录,每个分区一个生产者。

input.foreachRDD(rdd =>
      rdd.foreachPartition(
          partitionOfRecords =>
            {
                val props = new HashMap[String, Object]()
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                val producer = new KafkaProducer[String,String](props)

                partitionOfRecords.foreach
                {
                    case x:String=>{
                        println(x)

                        val message=new ProducerRecord[String, String]("output",null,x)
                        producer.send(message)
                    }
                }
          })
) 

希望帮助



文章来源: Spark Streaming - read and write on Kafka topic