Today I observe an outbox table that has several events of various types, products, payments, orders, etc…, all of them today go to the outbox topic.
Here is how containers are configured today and how my MySQL connector is configured.
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: ecommerce-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- kafka
kafka:
image: confluentinc/cp-kafka:latest
container_name: ecommerce-kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://host.docker.internal:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
extra_hosts:
- "host.docker.internal:172.17.0.1"
networks:
- kafka
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.0.0
container_name: ecommerce-kafka-connect
depends_on:
- kafka
- mysql
ports:
- "8083:8083"
networks:
- ecommerce_network
- kafka
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "ecommerce-kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.2.2
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
extra_hosts:
- "host.docker.internal:172.17.0.1"
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "123456",
"database.server.id": "10000",
"database.server.name": "ecommerce-mysql",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "ecommerce",
"table.include.list": "ecommerce.outbox",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "ecommerce.dbhistory",
"include.schema.changes": "false",
"schema.enable": "false",
"transforms": "extractValue",
"transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractValue.field": "after"
}
I would like to filter the delete operations and ignore them and in addition I wanted to find a way to route the events based on their type, for example, if it is products it goes to the topic products-topic, I was doing some research and arrived at SMT, When researching I had also found that it was not very recommended to do this, the best thing would be to create an application that consumes this topic outbox, filters it and sends it to its respective topics, I wanted some opinions on what would be the best way to do it.
Using ExtracTopic SMT is the best method here. Not sure where did you read it. Only caution is here to enable monitoring on Connect logs to catch failures in SMT transformation. They way you’re doing is the best approach. It works really fast and uses a recommended approach.
If you managed to solve your problem, please do not edit your question to add things like (SOLVED) to the title. Instead, post an answer detailing the solution and accept it after the timeout (or if there is an existing answer that helped you solve the problem, accept that).