Kstream-Ktable join with CloudEvent value not working

I want to take join between a kafka stream and a ktable. The poc works fine with stream data. However, when I use CloudEvent, I keep running into some or other issue related to serialization.

Here is my code sample –

Map<String, Object> ceSerializerConfigs = new HashMap<>();
ceSerializerConfigs.put(ENCODING_CONFIG, Encoding.STRUCTURED);
ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);

CloudEventSerializer serializer = new CloudEventSerializer();
serializer.configure(ceSerializerConfigs, false);

CloudEventDeserializer deserializer = new CloudEventDeserializer();
deserializer.configure(ceSerializerConfigs, false);
Serde<CloudEvent> cloudEventSerde = Serdes.serdeFrom(serializer, deserializer);
KStream<String, CloudEvent> kStream = builder.stream("stream-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KTable<String, CloudEvent> kTable = builder.table("ktable-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KStream<String, CloudEvent> joined = kStream
    .join(kTable, (left, right) -> CloudEventBuilder.v1().withId(left.getId().concat(right.getId())).build());
joined.to(output, Produced.with(Serdes.String(), eventsSerde));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamProps);
kafkaStreams.start();

I also tried using WrapperSerde – Issue with configuring Serdes for Kafka Streams

However I keep running into exception –

18:12:08.691
[basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9-StreamThread-1]
ERROR org.apache.kafka.streams.KafkaStreams – stream-client
[basic-streams-updated-0630c691-0080-4e02-8c85-7bff650f34e9]
Encountered the following exception during processing and the
registered exception handler opted to SHUTDOWN_CLIENT. The streams
client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000002,
topic=cloudevent-ktable, partition=0, offset=80,
stacktrace=java.lang.UnsupportedOperationException:
CloudEventSerializer supports only the signature serialize(String,
Headers, CloudEvent)

Caused by: java.lang.UnsupportedOperationException:
CloudEventSerializer supports only the signature serialize(String,
Headers, CloudEvent) at
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84)
~[cloudevents-kafka-2.5.0.jar:?] at
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38)
~[cloudevents-kafka-2.5.0.jar:?] at
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82)
~[kafka-streams-2.8.0.jar:?] at
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73)
~[kafka-streams-2.8.0.jar:?] at
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30)
~[kafka-streams-2.8.0.jar:?] at
org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
~[kafka-streams-2.8.0.jar:?] at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:200)
~[kafka-streams-2.8.0.jar:?] at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
~[kafka-streams-2.8.0.jar:?] at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:200)
~[kafka-streams-2.8.0.jar:?] at
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120)
~[kafka-streams-2.8.0.jar:?] at
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:122)
~[kafka-streams-2.8.0.jar:?]

Has anyone used CloudEvent successfully with ktable?

Your stack trace says this:

Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at 
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at 
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at 

Which is essentially this code:

    final byte[] rawValue = valueSerializer.serialize(topic, data);

So, the CloudEventSerializer you use is not adapted for Kafka Streams:

public byte[] serialize(String topic, CloudEvent data) {
    throw new UnsupportedOperationException("CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)");
}

This is because Kafka Streams does not support headers on (de)serialization.

I suggest you to extend that CloudEventSerializer and override its serialize(String topic, CloudEvent data) to delegate to the serialize(String topic, Headers headers, CloudEvent data) with empty new RecordHeaders().

Leave a Comment