Integrating Spark Structured Streaming with the Co

2019-01-23 19:33发布

问题:

I'm using a Kafka Source in Spark Structured Streaming to receive Confluent encoded Avro records. I intend to use Confluent Schema Registry, but the integration with spark structured streaming seems to be impossible.

I have seen this question, but unable to get it working with the Confluent Schema Registry. Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

回答1:

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema using the Avro parser. Next, read the Kafka topic as normal. Then map over the binary typed "value" column with the Confluent KafkaAvroDeSerializer. I strongly suggest getting into the source code for these classes because there is a lot going on here, so for brevity I'll leave out many details.

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
  row =>
    if (keyDeserializer == null) {
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    }
    if (valueDeserializer == null) {
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    }

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueJsonString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(DeserializedKeyString, DeserializedValueJsonString)
}

val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()


回答2:

This library will do the job for you. It connects to Confluent Kafka and Schema Registry through Spark Structured Stream.

For Confluent, it copes with the schema id that is sent along with the payload.

In the README you will find a code snippet of how to do it.

DISCLOSURE: I work for ABSA and I developed this library.