Unable to decode avro data from Kafka with avro schema
Asked Answered
G

1

0

I'm trying to pass our streaming pipeline to Table API and I almost did it except one field.

I'm reading CSV data from Kafka topic then I'm doing some transformation and sending transformed data to topic out in Avro format. The Avro schema has some complex fields and with one particular field I have a trouble. I'm able to send my data to kafka topic in avro format but unable to read it back using my schema. If I remove that column I can write data and read it back. I can read everything back using Table API.

Avro schema for field:

{
      "name": "problem_field",
      "type": [
        "null", {
          "type": "array",
          "items": {
            "type": "record",
            "name": "ProblemField",
            "doc": "Description of the problem field",
            "fields": [
              {
                "name": "proc",
                "type": "string"
              }, {
                "name": "success",
                "type": "boolean"
              }
            ]
          }
        }
      ],
      "doc": "some doc here",
      "default": null
},

Logical Type for field:

ARRAY<ROW<`proc` STRING NOT NULL, `success` BOOLEAN NOT NULL> NOT NULL>

My table schema:

.column("problem_field", DataTypes.ARRAY(
    ROW(FIELD("proc", STRING().notNull()), FIELD("success", BOOLEAN().notNull()))
)

Error I'm receiving:

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25

My consumer:

    val schema = new Schema.Parser().parse(fs.open(new Path(jobArgs("schema"))))

    val deser = AvroDeserializationSchema.forGeneric(schema)
    val kafka_source = setSourceStream(deser)
    val stream = env.addSource(kafka_source)(deser.getProducedType)

    val config = OutputFileConfig.builder().withPartPrefix("source-part").withPartSuffix(".json").build()
    writeOutputAsText(stream, s"${jobArgs("extracts_path")}", config).setParallelism(1).name("JSON")
    env.execute("Consumer")

I think my problem is Data Types Extraction while I'm writing output format of the field encoded differently than in Avro schema.

If someone could point me in the direction to solve my problem ?

Guinness answered 11/8, 2022 at 13:10 Comment(0)
G
0

I think I found an answer to my problem. I forgot to NOT NULL for the ROW in my ARRAY.

Solution :

.column("problem_field", DataTypes.ARRAY(
    ROW(FIELD("proc", STRING().notNull()), FIELD("success", BOOLEAN().notNull()).notNull())
)

Now I'm able to read everything

Guinness answered 11/8, 2022 at 16:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.