kafka avro console consumer does not deserialize DECIMAL correctly as decimal
Asked Answered
B

0

7

I am using Confluent's kafka-connect-jdbc to read data from different RDBMS into kafka. Here is my test table:

CREATE TABLE DFOCUSVW.T4(
    COL1 VARCHAR(100) NOT null,
    COL2 DECIMAL(6, 3) NOT null,
    COL3 NUMERIC(6, 3) NOT null,            
    COL4 DECIMAL(12, 9) NOT null,
    COL5 NUMERIC(12, 9) NOT null,
    COL6 DECIMAL(18, 15) NOT null,
    COL7 NUMERIC(18, 15) NOT null,
    COL8 INTEGER NOT null,
    Td_Update_Ts timestamp NOT null,
    PRIMARY KEY (col1)
    );

In my view, the numeric.mapping=best_fit could convert for COL2...COL5 into FLOAT64 (15 digits precision), but COL6...COL7 should be serialized as bytes without any conversion because they do not fit into FLOAT64.

Here is the auto-generated AVRO schema, which is the same for both numeric.mapping=best_fit and numeric.mapping=none:

{
  "connect.name": "T4",
  "fields": [
    {
      "name": "COL1",
      "type": "string"
    },
    {
      "name": "COL2",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "connect.parameters": {
          "scale": "3"
        },
        "connect.version": 1,
        "logicalType": "decimal",
        "precision": 64,
        "scale": 3,
        "type": "bytes"
      }
    },
    {
      "name": "COL3",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "connect.parameters": {
          "scale": "3"
        },
        "connect.version": 1,
        "logicalType": "decimal",
        "precision": 64,
        "scale": 3,
        "type": "bytes"
      }
    },
    {
      "name": "COL4",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "connect.parameters": {
          "scale": "9"
        },
        "connect.version": 1,
        "logicalType": "decimal",
        "precision": 64,
        "scale": 9,
        "type": "bytes"
      }
    },
    {
      "name": "COL5",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "connect.parameters": {
          "scale": "9"
        },
        "connect.version": 1,
        "logicalType": "decimal",
        "precision": 64,
        "scale": 9,
        "type": "bytes"
      }
    },
    {
      "name": "COL6",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "connect.parameters": {
          "scale": "15"
        },
        "connect.version": 1,
        "logicalType": "decimal",
        "precision": 64,
        "scale": 15,
        "type": "bytes"
      }
    },
    {
      "name": "COL7",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "connect.parameters": {
          "scale": "15"
        },
        "connect.version": 1,
        "logicalType": "decimal",
        "precision": 64,
        "scale": 15,
        "type": "bytes"
      }
    },
    {
      "name": "COL8",
      "type": "int"
    },
    {
      "name": "Td_Update_Ts",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "connect.version": 1,
        "logicalType": "timestamp-millis",
        "type": "long"
      }
    }
  ],
  "name": "T4",
  "type": "record"
}

This schema shows that even, in case of best_fit, the connect-framework did not convert the logical type “DECIMAL” into AVRO’s primitive type “double” for the COL2...COL5 before passing the rows to AVRO serializer.

This schema also reports precision always as 64, which does not conform to the AVRO spec:

From Avro spec:

  • scale, a JSON integer representing the scale (optional). If not specified the scale is 0.
  • precision, a JSON integer representing the (maximum) precision of decimals stored in this type (required).

So, the “precision” for these types should be 6,12, and 18 and not 64!

All that being said, the avro deserializer should still have enough info to deserialize accurately, but when reading the topic with avro console consumer, I am getting:

{"COL1":"x2","COL2":"\u0003g“","COL3":"\u0003g“","COL4":"3ó1Ã\u0015","COL5":"3ó1Ã\u0015","COL6":"\u0003\u0018±š\u000E÷_y","COL7":"\u0003\u0018±š\u000E÷_y","COL8":2,"Td_Update_Ts":1583366400000}
{"COL1":"x3","COL2":"\u0004î3","COL3":"\u0004î3","COL4":"K;¨«\u0015","COL5":"K;¨«\u0015","COL6":"\u0004{÷\u0012l_y","COL7":"\u0004{÷\u0012l_y","COL8":3,"Td_Update_Ts":1583366400000}
{"COL1":"x1","COL2":"\u0001àó","COL3":"\u0001àó","COL4":"\u001CªºÛ\u0015","COL5":"\u001CªºÛ\u0015","COL6":"\u0001µl!±m_y","COL7":"\u0001µl!±m_y","COL8":1,"Td_Update_Ts":1583366400000}

For this data:

INSERT INTO t4 VALUES('x1', 123.123, 123.123, 123.123456789, 123.123456789, 123.123456789012345, 123.123456789012345, 1, '2020-03-05 00:00:00.000000 +00:00');
INSERT INTO t4 VALUES('x2', 223.123, 223.123, 223.123456789, 223.123456789, 223.123456789012345, 223.123456789012345, 2, '2020-03-05 00:00:00.000000 +00:00');
INSERT INTO t4 VALUES('x3', 323.123, 323.123, 323.123456789, 323.123456789, 323.123456789012345, 323.123456789012345, 3, '2020-03-05 00:00:00.000000 +00:00');

I have tried kafka-avro-console-consumer both with --property value.schema passing the above schema manually and --property schema.registry.url=http://localhost:8081

So, the deserializer clearly failed to use the AVRO schema to de-ser properly. I was wondering why?

Bistro answered 9/7, 2020 at 20:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.