Kafka Sink Connector fails: Schema not found; error code: 40403
Asked Answered
G

2

9

I have a sink connector with the following configuration

{
    "name": "sink-test-mariadb-MY_TOPIC",
    "config": { 
                "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
                "tasks.max":"10",
                "topics":"MY_TOPIC",
                "connection.url":"jdbc:mariadb://localhost:3306/myschema?user=myuser&password=mypass",
                "auto.create":"false",
                "auto.evolve":"true",
                "table.name.format":"MY_TABLE",
                "pk.mode":"record_value",
                "pk.fields":"ID",
                "insert.mode":"upsert",
                "transforms":"ExtractField",
                "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
                "transforms.ExtractField.field":"data"
        }
}

and after a while all the tasks of the connector fail with the following error:

{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: MY_TOPIC
                at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
            Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 802
            Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
                at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
                at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
                at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
                at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)",
    "id": 0,
    "worker_id": "localhost:8083"
}

The connector manages to synchronise the topic with tha database but it suddenly fails without any reason. I am also very sure that the schema is there. Its subject appears in the list returned by calling schema registry API localhost:8081/subjects

[
  ...
  MY_TOPIC-value
  ...
]
Grube answered 17/1, 2019 at 7:3 Comment(6)
What's the output of GET /schemas/ids/803 ? (ref)Facile
@RobinMoffatt { "error_code": 40403, "message": "Schema not found" }Grube
how about curl -s "http://localhost:8081/subjects/MY_TOPIC-value/versions/" ?Facile
@RobinMoffatt Thanks for your prompt replies. The response I get is [ 3 ]Grube
I created this problem simply by putting XML comments in my class that the schema is built on. Removed the XML comments and it was happy. The XML comments must be part of the hashing that creates the schema.Janerich
if schema is changed we have to register is once with producerProps.put("auto.register.schemas", true);, for subsequent request we can set it to false.Fifield
H
12

I had the same problem and I realized that the code 40403 doesn't really mean that the schema was not found, it means that the schema does not correspond to the required one. a different code exists in case the schema was not found at all (40401).

So all I did was to change the schema accordingly and it worked for me.

Hypophosphate answered 16/2, 2022 at 10:46 Comment(0)
F
6

The message on the Kafka topic is serialised with a different version of the schema that the one you have on the Schema Registry. Perhaps it was generated by a tool that wrote the schema to a different Schema Registry, or in a different environment? In order to be able to deserialise it Kafka Connect needs to be able to retrieve the schema ID that is in the magic byte at the beginning of the Kafka message on the topic.

The schema is not present on your Schema Registry, as seen by :

GET /schemas/ids/803
 { "error_code": 40403, "message": "Schema not found" }

You can inspect the ID of the schema that you do have by looking at

curl -s "http://localhost:8081/subjects/MY_TOPIC-value/versions/3/"|jq '.id'
Facile answered 17/1, 2019 at 12:19 Comment(5)
Thanks for your answer! I see your point but in my scenario it doesn't make sense. I had the same problem yesterday and to deal with it, I stopped the connector, deleted the topic and the subject. I then re-created the topic and pushed the same messages in it, then started the sink connector and it worked fine until today when the same error appeared.Grube
my bet is on another process somewhere else writing to the same topic.Facile
@GiorgosMyrianthous Also make sure the key.converter is not set to Avro and trying to do ID lookups itself (assuming you have non-null keys)Brindle
@cricket_007 I am pretty sure the keys are in String format as I am using a third party tool in order to produce the data in the topic (which does not support avro keys anyway)Grube
@GiorgosMyrianthous Okay. My point is that Avro schema for id 802 seems rather large if you have a low number of topics and assuming ID's are sequential. What could be happening is that the AvroConverter sees your String keys start with the Avro Magic byte (0x0), then inspecting the next four UTF-8 bytes, and parsing to an int of 802. If you used the String or ByteArray Converter, this wouldn't happenBrindle

© 2022 - 2024 — McMap. All rights reserved.