How to ensure constant Avro schema generation and avoid the 'Too many schema objects created for x' exception?
Asked Answered
K

1

7

I am experiencing a reproducible error while producing Avro messages with reactive kafka and avro4s. Once the identityMapCapacity of the client (CachedSchemaRegistryClient) is reached, serialization fails with

java.lang.IllegalStateException: Too many schema objects created for <myTopic>-value

This is unexpected, since all messages should have the same schema - they are serializations of the same case class.

val avroProducerSettings: ProducerSettings[String, GenericRecord] = 
  ProducerSettings(system, Serdes.String().serializer(), 
  avroSerde.serializer())
 .withBootstrapServers(settings.bootstrapServer)

val avroProdFlow: Flow[ProducerMessage.Message[String, GenericRecord, String],
                    ProducerMessage.Result[String, GenericRecord, String],
                    NotUsed] = Producer.flow(avroProducerSettings)

val avroQueue: SourceQueueWithComplete[Message[String, GenericRecord, String]] = 
  Source.queue(bufferSize, overflowStrategy)
  .via(avroProdFlow)
  .map(logResult)
  .to(Sink.ignore)
  .run()

...
queue.offer(msg)

The serializer is a KafkaAvroSerializer, instantiated with a new CachedSchemaRegistryClient(settings.schemaRegistry, 1000)

Generating the GenericRecord:

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord =
  recordFormat.to(a)

val makeEdgeMessage: (Edge, String) => Message[String, GenericRecord, String] = { (edge, topic) =>
  val edgeAvro: GenericRecord = toAvro(edge)
  val record   = new ProducerRecord[String, GenericRecord](topic, edge.id, edgeAvro)
  ProducerMessage.Message(record, edge.id)
}

The schema is created deep in the code (io.confluent.kafka.serializers.AbstractKafkaAvroSerDe#getSchema, invoked by io.confluent.kafka.serializers.AbstractKafkaAvroSerializer#serializeImpl) where I have no influence on it, so I have no idea how to fix the leak. Looks to me like the two confluent projects do not work well together.

The issues I have found here, here and here do not seem to address my use case.

The two workarounds for me are currently:

  • not use schema registry - not a long-term solution obviously
  • create custom SchemaRegistryClient not relying on object identity - doable but I would like to avoid creating more issues than by reimplementing

Is there a way to generate or cache a consistent schema depending on message/record type and use it with my setup?

Klausenburg answered 30/8, 2017 at 15:7 Comment(0)
K
6

edit 2017.11.20

The issue in my case was that each instance of GenericRecord carrying my message has been serialized by a different instance of RecordFormat, containing a different instance of the Schema. The implicit resolution here generated a new instance each time.

def toAvro[A](a: A)(implicit recordFormat: RecordFormat[A]): GenericRecord = recordFormat.to(a)

The solution was to pin the RecordFormat instance to a val and reuse it explicitly. Many thanks to https://github.com/heliocentrist for explaining the details.

original response:

After waiting for a while (also no answer for the github issue) I had to implement my own SchemaRegistryClient. Over 90% is copied from the original CachedSchemaRegistryClient, just translated into scala. Using a scala mutable.Map fixed the memory leak. I have not performed any comprehensive tests, so use at your own risk.

import java.util

import io.confluent.kafka.schemaregistry.client.rest.entities.{ Config, SchemaString }
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest
import io.confluent.kafka.schemaregistry.client.rest.{ RestService, entities }
import io.confluent.kafka.schemaregistry.client.{ SchemaMetadata, SchemaRegistryClient }
import org.apache.avro.Schema

import scala.collection.mutable

class CachingSchemaRegistryClient(val restService: RestService, val identityMapCapacity: Int)
    extends SchemaRegistryClient {

  val schemaCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()
  val idCache: mutable.Map[String, mutable.Map[Integer, Schema]] =
    mutable.Map(null.asInstanceOf[String] -> mutable.Map())
  val versionCache: mutable.Map[String, mutable.Map[Schema, Integer]] = mutable.Map()

  def this(baseUrl: String, identityMapCapacity: Int) {
    this(new RestService(baseUrl), identityMapCapacity)
  }

  def this(baseUrls: util.List[String], identityMapCapacity: Int) {
    this(new RestService(baseUrls), identityMapCapacity)
  }

  def registerAndGetId(subject: String, schema: Schema): Int =
    restService.registerSchema(schema.toString, subject)

  def getSchemaByIdFromRegistry(id: Int): Schema = {
    val restSchema: SchemaString = restService.getId(id)
    (new Schema.Parser).parse(restSchema.getSchemaString)
  }

  def getVersionFromRegistry(subject: String, schema: Schema): Int = {
    val response: entities.Schema = restService.lookUpSubjectVersion(schema.toString, subject)
    response.getVersion.intValue
  }

  override def getVersion(subject: String, schema: Schema): Int = synchronized {
    val schemaVersionMap: mutable.Map[Schema, Integer] =
      versionCache.getOrElseUpdate(subject, mutable.Map())

    val version: Integer = schemaVersionMap.getOrElse(
      schema, {
        if (schemaVersionMap.size >= identityMapCapacity) {
          throw new IllegalStateException(s"Too many schema objects created for $subject!")
        }

        val version = new Integer(getVersionFromRegistry(subject, schema))
        schemaVersionMap.put(schema, version)
        version
      }
    )
    version.intValue()
  }

  override def getAllSubjects: util.List[String] = restService.getAllSubjects()

  override def getByID(id: Int): Schema = synchronized { getBySubjectAndID(null, id) }

  override def getBySubjectAndID(subject: String, id: Int): Schema = synchronized {
    val idSchemaMap: mutable.Map[Integer, Schema] = idCache.getOrElseUpdate(subject, mutable.Map())
    idSchemaMap.getOrElseUpdate(id, getSchemaByIdFromRegistry(id))
  }

  override def getSchemaMetadata(subject: String, version: Int): SchemaMetadata = {
    val response = restService.getVersion(subject, version)
    val id       = response.getId.intValue
    val schema   = response.getSchema
    new SchemaMetadata(id, version, schema)
  }

  override def getLatestSchemaMetadata(subject: String): SchemaMetadata = synchronized {
    val response = restService.getLatestVersion(subject)
    val id       = response.getId.intValue
    val version  = response.getVersion.intValue
    val schema   = response.getSchema
    new SchemaMetadata(id, version, schema)
  }

  override def updateCompatibility(subject: String, compatibility: String): String = {
    val response: ConfigUpdateRequest = restService.updateCompatibility(compatibility, subject)
    response.getCompatibilityLevel
  }

  override def getCompatibility(subject: String): String = {
    val response: Config = restService.getConfig(subject)
    response.getCompatibilityLevel
  }

  override def testCompatibility(subject: String, schema: Schema): Boolean =
    restService.testCompatibility(schema.toString(), subject, "latest")

  override def register(subject: String, schema: Schema): Int = synchronized {
    val schemaIdMap: mutable.Map[Schema, Integer] =
      schemaCache.getOrElseUpdate(subject, mutable.Map())

    val id = schemaIdMap.getOrElse(
      schema, {
        if (schemaIdMap.size >= identityMapCapacity)
          throw new IllegalStateException(s"Too many schema objects created for $subject!")
        val id: Integer = new Integer(registerAndGetId(subject, schema))
        schemaIdMap.put(schema, id)
        idCache(null).put(id, schema)
        id
      }
    )
    id.intValue()
  }
}
Klausenburg answered 7/9, 2017 at 9:32 Comment(9)
Nice that you manage to solve it. But implement your own SchemaRegistryClient definitely should not be the solution.Scenography
this is exactly the reason why this is an open question at SO ;) I will gladly accept better solutionsKlausenburg
Totally agree with you. I'm having exactly the same issue, and I'm willing to try your solution. How did you plugin your class with Kafka? Did you just changed the property properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, <Your class>))?Scenography
Or how do you actually use it? Sorry to bother you with questions, just facing the same issue here. :)Scenography
NP, I am working with it in 2 ways: val props = Map("schema.registry.url" -> "mock") // irrelevant for the schema cache issue val client: CachingSchemaRegistryClient = new CachingSchemaRegistryClient(settings.schemaRegistry, 1000) val serializer: KafkaAvroSerializer = new KafkaAvroSerializer(client, props.asJava)Klausenburg
I am also using a different approach, which is dirtier, but only requires overriding a single method of the KafkaAvroSerilizer so it returns the same instance and is thus much faster: val schema: Schema = SchemaFor[ComplexMsg]() val prepSerializer: KafkaAvroSerializer = new KafkaAvroSerializer(client) { override def getSchema(object: Object): Schema = schema } val prepProducerSettings: ProducerSettings[AnyRef, AnyRef] = ProducerSettings(system, serializer, prepSerializer) .withBootstrapServers(kafkaSettings.bootstrapServer)Klausenburg
Thank you. The KafkaAvroSerializer method sounds easier and would better fit my code. This issue is quite a shame :(Scenography
@Scenography you might be interested to know the proper solution ;) please see the edit.Klausenburg
Heliocentrist is actually my colleague at work :) I was discussing the problem with him when he had the idea that solved the problem. All credits goes to him. But many thanks for writing back to me in the comments. Cheers! :)Scenography

© 2022 - 2024 — McMap. All rights reserved.