Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)
Asked Answered
B

6

12

I am using this docker-compose setup for setting up Kafka locally: https://github.com/wurstmeister/kafka-docker/

docker-compose up works fine, creating topics via shell works fine.

Now I try to connect to Kafka via spring-kafka:2.1.0.RELEASE

When starting up the Spring application it prints the correct version of Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

I try to send a message like this

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

Sending on client side fails with

UnknownServerException: The server experienced an unexpected error when processing the request

In the server console I get the message Magic v1 does not support record headers

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Googling suggests a version conflict, but the version seem to fit (org.apache.kafka:kafka-clients:1.0.0 is in the classpath).

Any clues? Thanks!

Edit: I narrowed down the source of the problem. Sending plain Strings works, but sending Json via JsonSerializer results in the given problem. Here is the content of my producer config:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())
Broncobuster answered 23/12, 2017 at 15:55 Comment(3)
That doesn't make sense; (getting that message on the server side). If the client version was older, it wouldn't send any headers so all should be well (I have tested using the 1.0.0 client with a 0.10 broker and it works as long as you don't try to send headers). With the 1.0.0 client, an "empty" RecordHeaders is sent (by the client) when the template doesn't send any headers.Eyeless
The image names don’t seem to specify a version so you may be using an older cached docker image and a newer client. A 1.0 client sending headers to a 0.10 broker would get this error. Try checking docker image version and docker pull the newest 1.0 broker image.Neither
After updating the Kafka version to the latest I was not getting "Magic v1 does not support record headers" exception and the code was working like charm.Britishism
B
2

Solved. The problem is neither the broker, some docker cache nor the Spring app.

The problem was a console consumer which I used in parallel for debugging. This was an "old" consumer started with kafka-console-consumer.sh --topic=topic --zookeeper=...

It actually prints a warning when started: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

A "new" consumer with --bootstrap-server option should be used (especially when using Kafka 1.0 with JsonSerializer). Note: Using an old consumer here can indeed affect the producer.

Broncobuster answered 23/12, 2017 at 23:36 Comment(1)
I am not running any consumer in parallel but still this issue is coming. I also tried adding props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); but still it is giving the same error of magicv1 doesnt support adding record header. Can you please help hereMidcourse
A
23

I had a similar issue. Kafka adds headers by default if we use JsonSerializer or JsonSerde for values. In order to prevent this issue, we need to disable adding info headers.

if you are fine with default json serialization, then use the following (key point here is ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

but if you need a custom JsonSerializer with specific ObjectMapper (like with PropertyNamingStrategy.SNAKE_CASE), you should disable adding info headers explicitly on JsonSerializer, as spring kafka ignores DefaultKafkaProducerFactory's property ADD_TYPE_INFO_HEADERS (as for me it's a bad design of spring kafka)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

or if we use JsonSerde, then:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
Armil answered 28/5, 2018 at 18:54 Comment(2)
Adding this line solved my problem props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);Blob
What if I do need the type header? How can I overcome the issue?Galimatias
B
2

Solved. The problem is neither the broker, some docker cache nor the Spring app.

The problem was a console consumer which I used in parallel for debugging. This was an "old" consumer started with kafka-console-consumer.sh --topic=topic --zookeeper=...

It actually prints a warning when started: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

A "new" consumer with --bootstrap-server option should be used (especially when using Kafka 1.0 with JsonSerializer). Note: Using an old consumer here can indeed affect the producer.

Broncobuster answered 23/12, 2017 at 23:36 Comment(1)
I am not running any consumer in parallel but still this issue is coming. I also tried adding props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); but still it is giving the same error of magicv1 doesnt support adding record header. Can you please help hereMidcourse
E
1

I just ran a test against that docker image with no problems...

$docker ps

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

.

@SpringBootApplication
public class So47953901Application {

    public static void main(String[] args) {
        SpringApplication.run(So47953901Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
        return args -> template.send("foo", "bar", "baz");
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

.

2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

EDIT

Still works for me...

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

.

2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    ...
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

...

2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz
Eyeless answered 23/12, 2017 at 18:32 Comment(11)
Thanks for checking! I narrowed down the source of the problem. If I send plain Strings it works! But if I use JsonSerializer, the problem occurs. See update.Broncobuster
Still works for me - see my edit. The JsonSerializer adds headers by default so it definitely looks like the broker in your docker image is < 0.11. You can confirm by setting producer property JsonSerializer.ADD_TYPE_INFO_HEADERS to false. That property stops it from adding the type info in headers.Eyeless
Thanks! You are right, it works with the given property. I just don't get it. When I fire up docker-compose it prints definitely kafka_1 | [2017-12-23 20:51:53,289] INFO Kafka version : 1.0.0. I also tried recreating, deleting all images, etc.. like this: docker-compose down && docker-compose build --no-cache && docker-compose upBroncobuster
Try docker logs <containerId> > server.log in mine, I see this: [2017-12-23 18:07:52,905] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser).Eyeless
Yes.. Same here.Broncobuster
If I /bin/bash into the image I also see all the kafka_2.12-1.0.0 jars.Broncobuster
Bizarre - what's even stranger is, if I talk to an 0.10.2.0 server from S-K 2.1.0 and the JsonSerializer, I get the exception on the client as I would expect Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers. As I said in my original comment to your question, old brokers know nothing about headers so I can't see how you can get this error on a server unless the client is old. Are you running your client on AWS? I have heard before some weirdness with code on AWS running old kafka-clients even if the app is packaged with the right one.Eyeless
Look for 2017-12-23 16:22:16.500 INFO 55322 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 in the client log.Eyeless
So.. another update. I think the problem is neither the broker nor the Spring app. I was using a console consumer in parallel to the Spring app for debugging (based on this tutorial ). I am pretty sure the problem occurs when using the "old" consumer (kafka-console-consumer.sh --topic=topic --zookeeper=$ZK) with the parameter zookeper instead of bootstrap-server. What I find interesting is that this consumer results in an UnknownServerException in the (Spring) producer.Broncobuster
Try using the --bootstrap-server option instead of --zookeeper; the console-consumer will use the new consumer in that case (but you can't see headers with the console consumer, regardless). But I agree it's weird that the consumer can affect the producer. Maybe there's some logic that says "we have an old consumer attached to this topic so you can't send headers".Eyeless
Yes, it works. Thank you very much for your time, happy holidays!Broncobuster
A
0

you are using kafka version <=0.10.x.x once you using using this, you must set JsonSerializer.ADD_TYPE_INFO_HEADERS to false as below.

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

for your producer factory properties.

In case you are using kafka version > 0.10.x.x, it should just work fine

Aron answered 18/3, 2020 at 18:3 Comment(0)
A
0

TL;DR: Try adding this property:

# Disable addition of B3 headers in kafka producer records
spring.sleuth.messaging.enabled=false

Full answer: I tried addingg props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);, but it was still not working. On further debugging inside the send method I discovered that a B3 header was part of the final record being sent. B3 headers are used for tracing purposes, and indeed I had sleuth as part of the project.

To remove this header, I tried disabling kafka tracing related AutoConfiguration classes, but that didn't work. I also tried setting the following properties, neither of which worked:

# These didn't work
#spring.sleuth.kafka.enabled=false
#spring.sleuth.messaging.kafka.streams.enabled=false

After some further debugging + trial & errors, I discovered that the produce was being wrapped via a bean post processor, roughly this flow:

org.springframework.cloud.sleuth.brave.instrument.messaging.KafkaFactoryBeanPostProcessor::postProcessAfterInitialization
-> org.springframework.cloud.sleuth.brave.instrument.messaging.TraceProducerPostProcessor::wrapInTracing
  -> brave.kafka.clients.KafkaTracing::producerInjector
    -> brave.kafka.clients.KafkaProducerRequest::SETTER::put

I still couldn't figure out how to disable KafkaFactoryBeanPostProcessor, but I saw an annotation defined in the same package: ConditionalOnMessagingEnabled, which depended on a the following property, setting which finally worked!

# Disable addition of B3 headers in kafka producer records
spring.sleuth.messaging.enabled=false
Apothecary answered 27/4, 2023 at 7:59 Comment(0)
R
0

I found my problem, i noted that when i was using the sendDefault(object) method i wasn't getting this error but only when using the send(topic, object) method.

I did a little diggind and i noted that the framework generates a ProducerRecord when you use the sendDefault and not when using the send, which creates a "clean" record with no headers.

So my solution was to create a new ProducerRecord before calling the send method, hope this helps someone.

ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, serializedMessage);
kafkaProducerClient.send(record);

Note: I'm using a ByteSerializer, so the "byte[]" part maybe will need to be different in your code.

Ringmaster answered 16/4, 2024 at 0:8 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.