How can I map incoming headers as String instead of byte[] in my Spring Cloud Stream project?
C

1

8

I have a simple Spring Cloud Stream project using Spring Integration DSL flows and using the Kafka binder. Everything works great, but message header values coming from Kafka arrive as byte[].

This means that my SI @Header parameters need to be of type byte[]. Which works, but it'd be nice to have them as Strings (all the inbound headers I care about are String values).

I've configured the Kafka clients to use StringSerializer/StringDeserializer. I assume I also need to somehow tell Spring Kafka which headers to map as Strings and what character encoding to use.

I'm obviously missing something here. Any tips?

Capernaum answered 26/10, 2019 at 5:3 Comment(0)
C
4

Set the binder property headerMapperBeanName to the bean name of a DefaultKafkaHeaderMapper bean.

spring.cloud.stream.kafka.binder.headerMapperBeanName

The bean name of a KafkaHeaderMapper used for mapping spring-messaging headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in a DefaultKafkaHeaderMapper that uses JSON deserialization for the headers.

You can then specify which headers you want to be mapped as Strings by the mapper:

/**
 * Set the headers to not perform any conversion on (except {@code String} to
 * {@code byte[]} for outbound). Inbound headers that match will be mapped as
 * {@code byte[]} unless the corresponding boolean in the map value is true,
 * in which case it will be mapped as a String.
 * @param rawMappedHeaders the header names to not convert and
 * @since 2.2.5
 * @see #setCharset(Charset)
 * @see #setMapAllStringsOut(boolean)
 */
public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {
Confinement answered 26/10, 2019 at 13:20 Comment(2)
spelling mistake in setRawMappedHaeadersMaclaine
that spelling mistake was actually present in the method name for awhile. It's since been fixed.Capernaum

© 2022 - 2024 — McMap. All rights reserved.