Using SendToDlqAndContinue in Function without using processor spring-cloud-stream-binder-kafka-streams

I’m trying to send message to DLQ when the message i’m processing in my Function doesn’t respect some logic.

My function looks like this :

public Function<KStream<String, Bytes>, KStream<String, String>> mapMongoUpdateToIndex() 

and inside it I’m merging different KStream, that why I don’t want to use processor because I’m using different KStream inside my function, merging them etc…

I tried using StreamBridge, but I keep getting the following error :

Caused by: org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'map-out-0' is expected to be of type 'org.springframework.messaging.MessageChannel' but was actually of type 'jdk.proxy2.$Proxy125'

I also tried using SendToDlqAndContinue, but I need a context to get the offset and the partition, and I don’t understand how to get those value without processer.

I saw some solution using the Branching API, but it’s not quite generic, I have different streams and I’ll be my last resort.

Anyone could help me make it works using either SendToDlqAndContinue (preferred solution) or StreamBridge ?

Leave a Comment