I have a Kafka listener that accepts a batch of messages.
I need to get a List of custom headers from this listener, but it gives me an error that the header is not found.
@KafkaListener(id = KAFKA_LISTENER_ID,
topics = "${kafka.event-topics.someTopic}",
properties = {"spring.json.value.default.type=com.id.somegateway.domain.dto.consumerevent.SomeEventConsumedV1"})
public void consumeMessages(@Payload
List<SomeEventConsumedV1> messages,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header("CUSTOM_HEADER") List<Long> sendTimeHeaders,
Acknowledgment acknowledgment) {
in this code the app is not able to read @Header(“CUSTOM_HEADER”) List sendTimeHeaders
There is a way to solve it by giving to method:
@KafkaListener(id = KAFKA_LISTENER_ID,
topics = "${kafka.event-topics.someTopic}",
properties = {"spring.json.value.default.type=com.id.somegateway.domain.dto.consumerevent.SomeEventConsumedV1"})
public void consumeMessages(List<Message<SomeEventConsumedV1>> messages) {
messages.map(msg -> msg.getHeaders().get("CUSTOM_HEADER"))........
}
But maybe there is a way to solve it with @Header approach ?
The batch processing relies on the BatchMessagingMessageConverter
. By default it does not do any custom headers mapping.
You can look into injecting a custom one into your ListenerContainerFactory
or just extract your header from the provided one by that converter from KafkaHeaders.NATIVE_HEADERS
header.
If you rely on JSON conversion, then DefaultKafkaHeaderMapper
is involved and it provides converted headers into KafkaHeaders.BATCH_CONVERTED_HEADERS
as a List<Map<String, Object>>
.
I suggest to debug when you have a @Headers Map<>
argument and see which one would fit into your scenario.