Integrating Spark Structured Streaming with the Confluent Schema Registry
M

10

33

I'm using a Kafka Source in Spark Structured Streaming to receive Confluent encoded Avro records. I intend to use Confluent Schema Registry, but the integration with spark structured streaming seems to be impossible.

I have seen this question, but unable to get it working with the Confluent Schema Registry. Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

Myxomatosis answered 20/2, 2018 at 10:12 Comment(11)
Thanks @VinothChinnasamy but your link is about classic spark streaming, I'm talking about spark STRUCTURED streamingMyxomatosis
you need to respect kafka spark integration : spark.apache.org/docs/latest/…Cliquish
@Cliquish thank you but you misunderstand the question.Myxomatosis
Please upvote the confluence issue about it : github.com/confluentinc/schema-registry/issues/755Myxomatosis
Possible duplicate of reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)Tnt
@cricket_007 no it's not the same thingMyxomatosis
Why not? See schemaRegistryUrl in this answer? https://mcmap.net/q/452594/-reading-avro-messages-from-kafka-with-spark-2-0-2-structured-streamingTnt
That question is not about the schema registry. The answer for it was not clear, but with your edits it's much clearer, thank you!Myxomatosis
Check out this project - github.com/hortonworks-spark/spark-schema-registry This allows you to integrate Hortonwork's Schema registry (github.com/hortonworks/registry) with Spark. It may also be possible to plug this into Confluent Schema registry (since the Hortonworks Schema registry is compatible with the Confluent one), but you will need to explore it further.Hill
Check out this project - github.com/hortonworks-spark/spark-schema-registry This allows you to integrate Hortonwork's Schema registry (github.com/hortonworks/registry) with Spark. It may also be possible to plug this into Confluent Schema registry (since the Hortonworks Schema registry is compatible with the Confluent one), but you will need to explore it further.Hill
I completely agree with @SouhaibGuitouni This is an issue that should not be happening in Confluent.Immensurable
T
12

Disclaimer

This code was only tested on a local master, and has been reported runs into serializer issues in a clustered environment. There's an alternative solution (step 7-9, with Scala code in step 10) that extracts out the schema ids to columns, looks up each unique ID, and then uses schema broadcast variables, which will work better, at scale.

Also, there is an external library AbsaOSS/ABRiS that also addresses using the Registry with Spark


Since the other answer that was mostly useful was removed, I wanted to re-add it with some refactoring and comments.

Here are the dependencies needed. Code tested with Confluent 5.x and Spark 2.4

     <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions> 
                <!-- Conflicts with Spark's version -->
                <exclusion> 
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
     </dependency>
 
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

And here is the Scala implementation (only tested locally on master=local[*])

First section, define the imports, some fields, and a few helper methods to get schemas

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

object App {

  private var schemaRegistryClient: SchemaRegistryClient = _

  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String) = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

 // ... continues below

Then define a simple main method that parses the CMD args to get Kafka details

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)

    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(App.getClass.getName)
      .master(master)
      .getOrCreate()

    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)

    spark.stop()
  }


  // ... still continues

Then, the important method that consumes the Kafka topic and deserializes it

  private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
    import spark.implicits._

    // Setup the Avro deserialization UDF
    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) 
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)
    )

    // Load the raw Kafka topic (byte stream)
    val rawDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    // Deserialize byte stream into strings (Avro fields become JSON)
    import org.apache.spark.sql.functions._
    val jsonDf = rawDf.select(
      // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
      callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
      callUDF("deserialize", 'value).as("value")
      // excluding topic, partition, offset, timestamp, etc
    )

    // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }

    // Apply structured schema to JSON stream
    val parsedDf = jsonDf.select(
      'key, // keys are usually plain strings
      // values are JSONified Avro records
      from_json('value, dfValueSchema.dataType).alias("value")
    ).select(
      'key,
      $"value.*" // flatten out the value
    )

    // parsedDf.printSchema()

    // Sample schema output
    // root
    // |-- key: string (nullable = true)
    // |-- header: struct (nullable = true)   // Not a Kafka record "header". This is part of our value schema
    // |    |-- time: long (nullable = true)
    // |    ...

    // TODO: Do something interesting with this stream
    parsedDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

 // still continues

The command line parser allows for passing in bootstrap servers, schema registry, topic name, and Spark master.

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options

    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)

    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)

    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)

    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)

    val parser = new BasicParser
    parser.parse(options, args)
  }

  // still continues

In order for the UDF above to work, then there needed to be a deserializer to take the DataFrame of bytes to one containing deserialized Avro

  // Simple wrapper around Confluent deserializer
  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      // TODO: configure the deserializer for authentication 
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val value = super.deserialize(bytes)
      value match {
        case str: String =>
          str
        case _ =>
          val genericRecord = value.asInstanceOf[GenericRecord]
          genericRecord.toString
      }
    }
  }

} // end 'object App'

Put each of these blocks together, and it works in IntelliJ after adding -b localhost:9092 -s http://localhost:8081 -t myTopic to Run Configurations > Program Arguments

Tnt answered 20/2, 2018 at 10:12 Comment(8)
It's not working in standlone cluster mode..throws Failed to execute user defined function(anonfun$consumeAvro$1: (binary) => string)Riles
yeah @OneCricketeer. It's not working in yarn as well. Getting a null exception while deserializing line kafkaAvroDeserializer.deserialize(bytes). I try to handle with Try{}.getOrelse(), but if I do that I'm not getting data in data frame. Is it solvable ? any workaround for this ?Riles
Or any working solutions in this stackoverflow post ? for cluster modeRiles
@Riles If you get an NPE in deserialize, then the bytes are probably null. I believe the deserializer is correctly initialized in the closureTnt
Hi @OneCricketeer, why the key and value has a single quote in front of it? also, I had to add extra dependency ``` "org.apache.spark" %% "spark-sql"``` (sbt syntax), for the compiler to be happy for SparkSession etc. and then it has below error type mismatch; [error] found : Symbol [error] required: org.apache.spark.sql.Column [error] callUDF("deserialize", 'key).as("key"), // but sometimes they are avro Eustoliaeutectic
@Minnie The single quote creates a Symbol object. The way to get a column would be Column("key") or $"key", but that was more typingTnt
Hi @OneCricketeer, when run in inteliJ, it has exception Exception in thread "main" java.net.ConnectException: Connection refused (Connection refused) ... ... at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272). basically i could not connect to the SchemaRegistry. Note Kafka Confluent 5.5.1 is running. I tried this command nc -vz localhost 8081 to verify, and yes, it has error nc: connect to localhost port 8081 (tcp) failed: Connection refused. Any suggestions on this? Thanks a lot.Eustoliaeutectic
@Minnie Sound like you need to start a Schema Registry?Tnt
A
22

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema using the Avro parser. Next, read the Kafka topic as normal. Then map over the binary typed "value" column with the Confluent KafkaAvroDeSerializer. I strongly suggest getting into the source code for these classes because there is a lot going on here, so for brevity I'll leave out many details.

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
  row =>
    if (keyDeserializer == null) {
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    }
    if (valueDeserializer == null) {
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    }

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}

val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()
Adhesive answered 8/3, 2018 at 20:35 Comment(7)
Can you elaborate on the comment topic name is actually unused in the source code, just required by the signature. Weird right?Gentian
It seems the signature for the deserialize method calls for a string, but it is unused in the function body. KafkaAvroDeserializer.javaAdhesive
Hi, I am trying to implement the same code. I am getting an exception at keyDeserializer.deserialize(topicName, row.key, keySchema).toString , saying keySchema is org.apache.avro.Schema where as required is Array[Byte]. Checked the source code it looks like it expects Array[Byte] github.com/confluentinc/schema-registry/blob/master/…. Something i am missing here ?Rabideau
@tstites, im not able to find io.confluent.kafka.schemaregistry.client.rest.RestService this package in any confluent repositories, can you give location of this jar or mvn repository for this package?Pacian
@Karthikeyan github.com/confluentinc/schema-registry/blob/master/client/src/… is part of io.confluent:kafka-schema-registry-client And the repo is here docs.confluent.io/current/clients/…Tnt
@VibhorNigam, getting the same row.value error as you , did you fix itGonsalez
@Gonsalez unfortunately no. i got another source with same data so moved on from this issue.Rabideau
T
12

Disclaimer

This code was only tested on a local master, and has been reported runs into serializer issues in a clustered environment. There's an alternative solution (step 7-9, with Scala code in step 10) that extracts out the schema ids to columns, looks up each unique ID, and then uses schema broadcast variables, which will work better, at scale.

Also, there is an external library AbsaOSS/ABRiS that also addresses using the Registry with Spark


Since the other answer that was mostly useful was removed, I wanted to re-add it with some refactoring and comments.

Here are the dependencies needed. Code tested with Confluent 5.x and Spark 2.4

     <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions> 
                <!-- Conflicts with Spark's version -->
                <exclusion> 
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
     </dependency>
 
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

And here is the Scala implementation (only tested locally on master=local[*])

First section, define the imports, some fields, and a few helper methods to get schemas

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

object App {

  private var schemaRegistryClient: SchemaRegistryClient = _

  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String) = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

 // ... continues below

Then define a simple main method that parses the CMD args to get Kafka details

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)

    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(App.getClass.getName)
      .master(master)
      .getOrCreate()

    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)

    spark.stop()
  }


  // ... still continues

Then, the important method that consumes the Kafka topic and deserializes it

  private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
    import spark.implicits._

    // Setup the Avro deserialization UDF
    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) 
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)
    )

    // Load the raw Kafka topic (byte stream)
    val rawDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    // Deserialize byte stream into strings (Avro fields become JSON)
    import org.apache.spark.sql.functions._
    val jsonDf = rawDf.select(
      // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
      callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
      callUDF("deserialize", 'value).as("value")
      // excluding topic, partition, offset, timestamp, etc
    )

    // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }

    // Apply structured schema to JSON stream
    val parsedDf = jsonDf.select(
      'key, // keys are usually plain strings
      // values are JSONified Avro records
      from_json('value, dfValueSchema.dataType).alias("value")
    ).select(
      'key,
      $"value.*" // flatten out the value
    )

    // parsedDf.printSchema()

    // Sample schema output
    // root
    // |-- key: string (nullable = true)
    // |-- header: struct (nullable = true)   // Not a Kafka record "header". This is part of our value schema
    // |    |-- time: long (nullable = true)
    // |    ...

    // TODO: Do something interesting with this stream
    parsedDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

 // still continues

The command line parser allows for passing in bootstrap servers, schema registry, topic name, and Spark master.

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options

    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)

    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)

    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)

    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)

    val parser = new BasicParser
    parser.parse(options, args)
  }

  // still continues

In order for the UDF above to work, then there needed to be a deserializer to take the DataFrame of bytes to one containing deserialized Avro

  // Simple wrapper around Confluent deserializer
  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      // TODO: configure the deserializer for authentication 
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val value = super.deserialize(bytes)
      value match {
        case str: String =>
          str
        case _ =>
          val genericRecord = value.asInstanceOf[GenericRecord]
          genericRecord.toString
      }
    }
  }

} // end 'object App'

Put each of these blocks together, and it works in IntelliJ after adding -b localhost:9092 -s http://localhost:8081 -t myTopic to Run Configurations > Program Arguments

Tnt answered 20/2, 2018 at 10:12 Comment(8)
It's not working in standlone cluster mode..throws Failed to execute user defined function(anonfun$consumeAvro$1: (binary) => string)Riles
yeah @OneCricketeer. It's not working in yarn as well. Getting a null exception while deserializing line kafkaAvroDeserializer.deserialize(bytes). I try to handle with Try{}.getOrelse(), but if I do that I'm not getting data in data frame. Is it solvable ? any workaround for this ?Riles
Or any working solutions in this stackoverflow post ? for cluster modeRiles
@Riles If you get an NPE in deserialize, then the bytes are probably null. I believe the deserializer is correctly initialized in the closureTnt
Hi @OneCricketeer, why the key and value has a single quote in front of it? also, I had to add extra dependency ``` "org.apache.spark" %% "spark-sql"``` (sbt syntax), for the compiler to be happy for SparkSession etc. and then it has below error type mismatch; [error] found : Symbol [error] required: org.apache.spark.sql.Column [error] callUDF("deserialize", 'key).as("key"), // but sometimes they are avro Eustoliaeutectic
@Minnie The single quote creates a Symbol object. The way to get a column would be Column("key") or $"key", but that was more typingTnt
Hi @OneCricketeer, when run in inteliJ, it has exception Exception in thread "main" java.net.ConnectException: Connection refused (Connection refused) ... ... at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272). basically i could not connect to the SchemaRegistry. Note Kafka Confluent 5.5.1 is running. I tried this command nc -vz localhost 8081 to verify, and yes, it has error nc: connect to localhost port 8081 (tcp) failed: Connection refused. Any suggestions on this? Thanks a lot.Eustoliaeutectic
@Minnie Sound like you need to start a Schema Registry?Tnt
T
12

Another very simple alternative for pyspark (without full support for schema registry like schema registration, compatibility check, etc.) could be:

import requests

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *

# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"

# retrieve the latest schema
response = requests.get('{}/subjects/{}-value/versions/latest/schema'.format(schemaregistry, topic))

# error check
response.raise_for_status()

# extract the schema from the response
schema = response.text

# run the query
query = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic) \
    .load() \
    # The magic goes here:
    # Skip the first 5 bytes (reserved by schema registry encoding protocol)
    .selectExpr("substring(value, 6) as avro_value") \
    .select(from_avro(col("avro_value"), schema).alias("data")) \
    .select(col("data.my_field")) \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()
Triceratops answered 20/1, 2021 at 21:57 Comment(5)
#The magic goes here: this worked for me. But why do we need to skip first 5 bytes.Coenurus
Hi @Venkat, this is necessary because Confluent reserves this first bytes for it's internal wire formatTriceratops
Ideally, you broadcast the schema variable in SparkTnt
Hi @Triceratops is there possibility that the number of bytes changes in the future?Idea
@Idea Unlikely. Confluent would introduce a backwards incompatible change if that happenedTnt
C
10

This is an example of my code integrating spark structured streaming with kafka and schema registry (code in scala)

import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_${scala.compat.version}</artifactId>
import org.apache.spark.sql.functions.col

object KafkaConsumerAvro {

  def main(args: Array[String]): Unit = {

    val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
    val SCHEMA_REGISTRY_URL = "http://localhost:8081"
    val TOPIC = "transactions"

    val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
      .option("subscribe", TOPIC)
      .option("startingOffsets", "earliest") // from starting
      .load()

//     Prints Kafka schema with columns (topic, offset, partition e.t.c)
    df.printSchema()

//    Create REST service to access schema registry and retrieve topic schema (latest)
    val restService = new RestService(SCHEMA_REGISTRY_URL)
    val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
    val jsonSchema = valueRestResponseSchema.getSchema

    val transactionDF = df.select(
      col("key").cast("string"), // cast to string from binary value
      from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
      col("topic"),
      col("offset"),
      col("timestamp"),
      col("timestampType"))
    transactionDF.printSchema()

//    Stream data to console for testing
    transactionDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  }

}

When reading from kafka topic, we have this kind of schema:

key: binary | value: binary | topic: string | partition: integer | offset: long | timestamp: timestamp | timestampType: integer |

As we can see, key and value are binary so we need to cast key as string and in this case, value is avro formatted so we can achieve this by calling from_avro function.

In adition to Spark and Kafka dependencies, we need this dependencies:

<!-- READ AND WRITE AVRO DATA -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_${scala.compat.version}</artifactId>
  <version>${spark.version}</version>
</dependency>
<!-- INTEGRATION WITH SCHEMA REGISTRY -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>${confluent.version}</version>
</dependency>
Copulative answered 19/11, 2019 at 10:4 Comment(4)
Could you please explain how we can pass schema registry credentials in you program in case needed?Wigwag
I don't need to authenticate against schema registry, but i've found this information: (docs.confluent.io/current/schema-registry/security/index.html), and in this link you can configure Schema Registry authorization for communicating with the RBAC Kafka cluster. (docs.confluent.io/current/schema-registry/security/…)Copulative
To pass schema registry credentials, see this answer: https://mcmap.net/q/452595/-how-pass-basic-authentication-to-confluent-schema-registryPlexor
Will this work in standalone cluster or yarn mode ?Riles
D
8

This library will do the job for you. It connects to Confluent Schema Registry through Spark Structured Stream.

For Confluent, it copes with the schema id that is sent along with the payload.

In the README you will find a code snippet of how to do it.

DISCLOSURE: I work for ABSA and I developed this library.

Dissociable answered 22/5, 2018 at 17:34 Comment(6)
description in this lib seems not correct for example in decripton there is 2.0.0 version but in maven i saw only 1.0.0Convention
also i can not build the project. i have an error: [ERROR] E:\projects\dvsts\ABRiS\src\test\scala\za\co\absa\abris\avro\read\confluent\ScalaConfluentKafkaAvroDeserializerSpec.scala:113: error: class MockedSchemaRegistryClient needs to be abstract, since: [ERROR] it has 8 unimplemented members.Convention
@Mikhail, the new version was updated yesterday, and probably when you checked Maven Central it had not yet been synchronized. You can find it here: mvnrepository.com/artifact/za.co.absa/abris/2.0.0Dissociable
Would be nice to see an example usage here on this answerTnt
@cricket_007, does this library work with spark Java api, as I cannot able to get fromavro method after all the imports. could you please ?Contemplate
@Vignesh I have not used it. But yes, any Scala library can be imported into Java code because it's all still running in the same JVM.Tnt
M
6

Databricks now provide this functionality but you have to pay for it :-(

dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

See: https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html for more info

A good free alternative is ABRIS. See: https://github.com/AbsaOSS/ABRiS the only downside we can see that you need to provide a file of your avro schema at runtime so the framework can enforce this schema on your dataframe before it publishes it to the Kafka topic.

Mohamedmohammad answered 16/4, 2019 at 15:58 Comment(3)
Only Databricks supports the registry, not Apache Spark itselfTnt
Does Databricks support Schema Registry of Confluent? Or another type of schema registry. In case you can use Databricks, someone knows how to pass the schema registry credentials.I say this because the examples I find do not comment on it.Doubleton
@Doubleton Yes. Databricks partners with Confluent to support that Avro + Schema Registry functionalityTnt
B
5

Based on @cricket_007's answers I created the following solution which could run in our cluster environment, including the following new features:

  • You need add broadcast variables to transfer some values into map operations for cluster environment. Neither Schema.Parser nor KafkaAvroDeserializer could be serialized in spark, so it is why you need initialize them in map operations
  • My structured streaming used foreachBatch output sink.
  • I applied org.apache.spark.sql.avro.SchemaConverters to convert avro schema format to spark StructType, so that you could use it in from_json column function to parse dataframe in Kafka topic fields (key and value).

Firstly, you need load some packages:

SCALA_VERSION="2.11"
SPARK_VERSION="2.4.4"
CONFLUENT_VERSION="5.2.2"

jars=(
  "org.apache.spark:spark-sql-kafka-0-10_${SCALA_VERSION}:${SPARK_VERSION}"    ## format("kafka")
  "org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}"    ## SchemaConverters
  "io.confluent:kafka-schema-registry:${CONFLUENT_VERSION}"   ## import io.confluent.kafka.schemaregistry.client.rest.RestService
  "io.confluent:kafka-avro-serializer:${CONFLUENT_VERSION}"   ## import io.confluent.kafka.serializers.KafkaAvroDeserializer
)

./bin/spark-shell --packages ${"${jars[*]}"// /,}

Here are the whole codes I tested in spark-shell:

import org.apache.avro.Schema
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client.rest.RestService

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters

import scala.collection.JavaConverters._
import java.time.LocalDateTime

spark.sparkContext.setLogLevel("Error")

val brokerServers = "xxx.yyy.zzz:9092"
val topicName = "mytopic" 
val schemaRegistryURL = "http://xxx.yyy.zzz:8081"

val restService = new RestService(schemaRegistryURL)

val exParser = new Schema.Parser
//-- For both key and value
val schemaNames = Seq("key", "value")
val schemaStrings = schemaNames.map(i => (i -> restService.getLatestVersion(s"$topicName-$i").getSchema)).toMap
val tempStructMap = schemaStrings.transform((k,v) => SchemaConverters.toSqlType(exParser.parse(v)).dataType)
val schemaStruct = new StructType().add("key", tempStructMap("key")).add("value", tempStructMap("value"))
//-- For key only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-key").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
//-- For value only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-value").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType


val query = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()
  .writeStream
  .outputMode("append")
  //.option("checkpointLocation", s"cos://$bucket.service/checkpoints/$tableName")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => {

    val bcTopicName = sc.broadcast(topicName)
    val bcSchemaRegistryURL = sc.broadcast(schemaRegistryURL)
    val bcSchemaStrings = sc.broadcast(schemaStrings)
    
    val rstDF = batchDF.map {
      row =>
      
        val props = Map("schema.registry.url" -> bcSchemaRegistryURL.value)
        //-- For both key and value
        val isKeys =  Map("key" -> true, "value" -> false)
        val deserializers = isKeys.transform{ (k,v) => 
            val des = new KafkaAvroDeserializer
            des.configure(props.asJava, v)
            des 
        }
        //-- For key only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, true)
        //-- For value only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, false)
        

        val inParser = new Schema.Parser
        //-- For both key and value
        val values = bcSchemaStrings.value.transform( (k,v) => 
            deserializers(k).deserialize(bcTopicName.value, row.getAs[Array[Byte]](k), inParser.parse(v)).toString)
        s"""{"key": ${values("key")}, "value": ${values("value")} }"""
        //-- For key only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("key"), inParser.parse(bcSchemaStrings.value)).toString
        //-- For value only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("value"), inParser.parse(bcSchemaStrings.value)).toString  
      }
      .select(from_json(col("value"), schemaStruct).as("root"))
      .select("root.*")

    println(s"${LocalDateTime.now} --- Batch $batchId: ${rstDF.count} rows")
    rstDF.printSchema
    rstDF.show(false)    

  })
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start()

query.awaitTermination()
Boone answered 8/12, 2019 at 0:54 Comment(4)
for some reason the broadcast strings aren't working inside the map. Why?Lemmon
I'm not sure you need to broadcast for each batch, also topic name isn't used by the deserialize, I believeTnt
Hi timothyzhang, you did not need the UDF like @Tnt did?Eustoliaeutectic
Hi @timothyzhang, did you expereience this issue in your version test? #63846892Eustoliaeutectic
N
3

Summarizing some of answer above and adding some of my own experience, those are the options at the time of writing:

  1. 3rd party Abris library. This is what we used initially, but it doesn't seem to support a permissive mode where you can drop malformed packages. It will crash the stream when it encounters a malformed message. If you can guarantee the message validity, that is okay, but it was an issue for us as it kept trying to parse the malformed message after stream restart.
  2. Custom UDF which parses the Avro data, as outlined by OneCricketeer's answer. Gives the most flexibility but also requires the most custom code.
  3. Using Databrick's from_avro variant which allows you to simply pass the URL, and it will find the right schema and parse it for you. Works really well, but only available in their environment, thus hard to test in a codebase.
  4. Using Spark's built-in from_avro function. This functions allows you to pass a JSON schema and parse it from there. Only fix that you have to apply is that in Confluent's Wire format, there is one magic byte and 4 schema bytes before the actual Avro binary data starts as also pointed out in dudssource's answer. You can parse it like this in Scala:
val restService             = new RestService(espConfig.schemaRegistryUrl)
val valueRestResponseSchema = restService.getVersion(espConfig.fullTopicName + "-value", schemaVersion)
valueRestResponseSchema.getSchema

streamDf
  .withColumn("binary_data", substring(6, Int.MaxValue))
  .withColumn("parsed_data", from_avr('binary_data, jsonSchema, Map("MODE" -> "PERMISSIVE")))

Nucleus answered 25/5, 2022 at 7:40 Comment(0)
B
2

For anyone that want's to do this in pyspark: The library that felipe referenced worked nicely on the JVM for me before, so i wrote a small wrapper function that integrates it in python. This looks very hacky, because a lot of types that are implicit in the scala language have to be specified explicitly in py4j. Has been working nicely so far, though, even in spark 2.4.1.

def expand_avro(spark_context, sql_context, data_frame, schema_registry_url, topic):
    j = spark_context._gateway.jvm
    dataframe_deserializer = j.za.co.absa.abris.avro.AvroSerDe.DataframeDeserializer(data_frame._jdf)
    naming_strategy = getattr(
        getattr(j.za.co.absa.abris.avro.read.confluent.SchemaManager,
                "SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()
    conf = getattr(getattr(j.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.url", schema_registry_url))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.topic", topic))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.id", "latest"))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.naming.strategy", naming_strategy))
    schema_path = j.scala.Option.apply(None)
    conf = j.scala.Option.apply(conf)
    policy = getattr(j.za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies, "RETAIN_SELECTED_COLUMN_ONLY$")()
    data_frame = dataframe_deserializer.fromConfluentAvro("value", schema_path, conf, policy)
    data_frame = DataFrame(data_frame, sql_context)
    return data_frame

For that to work, you have to add the library to the spark packages, e.g.

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ' \
    'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1,' \
    'org.apache.spark:spark-avro_2.11:2.4.1,' \
    'za.co.absa:abris_2.11:2.2.2 ' \
    '--repositories https://packages.confluent.io/maven/ ' \
    'pyspark-shell'
Bulimia answered 21/4, 2019 at 21:24 Comment(1)
How can we use this function in spark structured streaming , i am having spark 2.3.2 no from_avro and to_avro function availableKarlik
J
0

The problem is that if you use io.confluent.kafka.serializers.KafkaAvroSerializer when producing messages then the message bytes is not avro but [magic_byte schema_id (integer) avro_bytes] so from_avro does not work

You can see this here

https://github.com/confluentinc/schema-registry/blob/3e7eca9e0ce07c9167c301ccc7c1a2e8248c26a7/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java#LL135C17-L135C27

Dropping the magic byte and schema id like this works

select(from_avro(expr("substring(value, 6)"), schemaJson))

But its very inelegant

Jerald answered 22/5, 2023 at 12:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.