Unable to get custom @Header when processing a batch messages with Spring Boot @KafkaListener

I have a Kafka listener that accepts a batch of messages.
I need to get a List of custom headers from this listener, but it gives me an error that the header is not found.

@KafkaListener(id = KAFKA_LISTENER_ID,
    topics = "${kafka.event-topics.someTopic}",
    properties = {"spring.json.value.default.type=com.id.somegateway.domain.dto.consumerevent.SomeEventConsumedV1"})
  public void consumeMessages(@Payload
                List<SomeEventConsumedV1> messages,
                @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                @Header("CUSTOM_HEADER") List<Long> sendTimeHeaders,
                Acknowledgment acknowledgment) {

in this code the app is not able to read @Header(“CUSTOM_HEADER”) List sendTimeHeaders

There is a way to solve it by giving to method:

@KafkaListener(id = KAFKA_LISTENER_ID,
    topics = "${kafka.event-topics.someTopic}",
    properties = {"spring.json.value.default.type=com.id.somegateway.domain.dto.consumerevent.SomeEventConsumedV1"})

public void consumeMessages(List<Message<SomeEventConsumedV1>> messages) {

    messages.map(msg -> msg.getHeaders().get("CUSTOM_HEADER"))........

}

But maybe there is a way to solve it with @Header approach ?

The batch processing relies on the BatchMessagingMessageConverter. By default it does not do any custom headers mapping.

You can look into injecting a custom one into your ListenerContainerFactory or just extract your header from the provided one by that converter from KafkaHeaders.NATIVE_HEADERS header.

If you rely on JSON conversion, then DefaultKafkaHeaderMapper is involved and it provides converted headers into KafkaHeaders.BATCH_CONVERTED_HEADERS as a List<Map<String, Object>>.

I suggest to debug when you have a @Headers Map<> argument and see which one would fit into your scenario.

Leave a Comment