How pass Basic Authentication to Confluent Schema Registry?
L

1

7

I want to read data from a confluent cloud topic and then write in another topic.

At localhost, I haven't had any major problems. But the schema registry of confluent cloud requires to pass some authentication data that I don't know how to enter them:

basic.auth.credentials.source=USER_INFO

schema.registry.basic.auth.user.info=:

schema.registry.url=https://xxxxxxxxxx.confluent.cloudBlockquote

Below is the current code:

import com.databricks.spark.avro.SchemaConverters
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.SparkSession

object AvroConsumer {
  private val topic = "transactions"
  private val kafkaUrl = "http://localhost:9092"
  private val schemaRegistryUrl = "http://localhost:8081"

  private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
  private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

  private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
  private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("ConfluentConsumer")
      .master("local[*]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      DeserializerWrapper.deserializer.deserialize(bytes)
    )

    val kafkaDataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaUrl)
      .option("subscribe", topic)
      .load()

    val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")

    import org.apache.spark.sql.functions._

    val formattedDataFrame = valueDataFrame.select(
      from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
      .select("parsed_value.*")

    formattedDataFrame
      .writeStream
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

  object DeserializerWrapper {
    val deserializer = kafkaAvroDeserializer
  }

  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
      genericRecord.toString
    }
  }

}

I think I have to pass this authentication data to CachedSchemaRegistryClient but I'm not sure if so and how.

Lexine answered 13/11, 2019 at 10:59 Comment(4)
This is the actual error: [error] (run-main-0) io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401 [error] io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401Lexine
Where in the code do you do bytes-avro conversion and access schema registry?Mullet
The conversion is being performed by the udf deserialize. the access to the registration scheme now I am trying it in the following way: val schemaRegistryURL = "url" val restService = new RestService(schemaRegistryURL) val props = Map( "basic.auth.credentials.source" -> "USER_INFO", "schema.registry.basic.auth.user.info" -> "SECRET", "schema.registry.url" -> schemaRegistryURL ) val javaProps = scalaProps.asJava var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, javaProps)Lexine
Can you edit your question and add the code to keep everything in one place? I'd like to see the UDF to deserialize avro (that as far as I understand uses the registry).Mullet
L
17

I've finally been able to pass the properties.

I leave the lines that gave the solution.

val restService = new RestService(schemaRegistryURL)

  val props = Map(
    "basic.auth.credentials.source" -> "USER_INFO",
    "schema.registry.basic.auth.user.info" -> "secret:secret"
  ).asJava

  var schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
Lexine answered 19/11, 2019 at 9:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.