Kafka connect with debezium SMT routing and filters

Today I observe an outbox table that has several events of various types, products, payments, orders, etc…, all of them today go to the outbox topic.
Here is how containers are configured today and how my MySQL connector is configured.

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: ecommerce-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    networks:
      - kafka

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: ecommerce-kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://host.docker.internal:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
    extra_hosts:
      - "host.docker.internal:172.17.0.1"
    networks:
      - kafka

  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:6.0.0
    container_name: ecommerce-kafka-connect
    depends_on:
      - kafka
      - mysql
    ports:
      - "8083:8083"
    networks:
      - ecommerce_network
      - kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "ecommerce-kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.2.2
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity
    extra_hosts:
      - "host.docker.internal:172.17.0.1"
{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "123456",
    "database.server.id": "10000",
    "database.server.name": "ecommerce-mysql",
    "database.allowPublicKeyRetrieval": "true",
    "database.include.list": "ecommerce",
    "table.include.list": "ecommerce.outbox",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "ecommerce.dbhistory",
    "include.schema.changes": "false",
    "schema.enable": "false",
    "transforms": "extractValue",
    "transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractValue.field": "after"
}

I would like to filter the delete operations and ignore them and in addition I wanted to find a way to route the events based on their type, for example, if it is products it goes to the topic products-topic, I was doing some research and arrived at SMT, When researching I had also found that it was not very recommended to do this, the best thing would be to create an application that consumes this topic outbox, filters it and sends it to its respective topics, I wanted some opinions on what would be the best way to do it.

  • 1

    Using ExtracTopic SMT is the best method here. Not sure where did you read it. Only caution is here to enable monitoring on Connect logs to catch failures in SMT transformation. They way you’re doing is the best approach. It works really fast and uses a recommended approach.

    – 




  • If you managed to solve your problem, please do not edit your question to add things like (SOLVED) to the title. Instead, post an answer detailing the solution and accept it after the timeout (or if there is an existing answer that helped you solve the problem, accept that).

    – 

Leave a Comment