Kafka AVRO - conversion from long to datetime
Asked Answered
P

4

6

I get the following error when I want to send an AVRO message which contains a field that has the type long:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime

I use Confluent 3.2.0 and Apache Spark 2.2.0. This error is thrown in a Spark Job which processes AVRO messages and prints them in a console. In the AVRO schema, the corresponding field is defined like this:

{\"name\": \"event_time\", \"type\": { \"type\" : \"long\", \"logicalType\": \"timestamp-millis\"}}

In the Java class generated from the .avsc file, the field is defined as below:

private DateTime event_time;
Passed answered 18/12, 2017 at 11:48 Comment(0)
L
12

I encountered a similar issue using Confluent 4.0.0 and Avro 1.8.2. I had a stream processor that was attempting to convert a long to a DateTime. I overcame the issue by adding the correct conversion. Before I begin any processing logic, I used the Specific Data static utilities class and adding the correct Logical type conversion.

SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
Lysozyme answered 16/3, 2018 at 20:50 Comment(2)
can you demonstrate you used this?Cortneycorty
It needs to be used in Main object, wherever the applicaton starts.Squint
D
3

In Avro 1.9.X and above, the symbol TimestampConversion is not present anymore. replacing the above from @user3222582 with TimestampMillisConversion fixed a compilation error for me with avro 1.9.2

Ditzel answered 8/10, 2019 at 13:39 Comment(0)
B
0

Try upgrading the version of avro to 1.9.X in avro dependency and avro-maven-plugin in pom.xml

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.9.1</version>
</dependency>

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.9.1</version>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
                <!--<goal>idl-protocol</goal>-->
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
                <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                <enableDecimalLogicalType>true</enableDecimalLogicalType>
                <stringType>String</stringType>
            </configuration>
        </execution>
    </executions>
</plugin>

Also make sure to delete previously generated class from AVRO schema and do mvn compile.

Beamer answered 28/1, 2020 at 6:49 Comment(0)
J
0

In case you are configuring a SpecificAvroSerde you can set AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG to true:

final Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schema-registry-url");
serdeConfig.put(KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);

final SpecificAvroSerde<T> serde = SpecificAvroSerde<>();
serde.configure(serdeConfig, false);

Please note that the config is called KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, but it is also used for the serdes deserializer.

Jacquenette answered 11/12, 2023 at 16:43 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.