Kafka Kraft Cluster with Debezium Connector, Schema Registry, Kafka UI Configuration

2022-01-01

If you're looking to set up a robust Kafka cluster with Kafka Connect and Schema Registry for your data streaming needs, you're in the right place. In this article, we'll walk you through a Docker Compose configuration for Kafka 3.5, designed to provide a multi-broker Kafka cluster that's ready to handle your data pipeline requirements. We'll also explore the custom configurations used for Kafka Connect and Schema Registry.


Note: In below configuration, we don't need Zookeper. Below configuration uses Apache Kafka Raft (KRaft) instead of Zookeper.


Note: We've chosen not to use Apicurio Schema Registry due to specific requirements involving Kafka Foreign Key join auto-topic creation.


Note: At least, we need minimum 8GB RAM for this configuration, so change RAM capaticiy of Docker desktop app.


Prerequisites

Before diving into the configuration, make sure you have Docker and Docker Compose installed on your system.


The Docker Compose Configuration

Here's a Docker Compose file for setting up a Kafka 3.4 cluster with three broker nodes, Kafka Connect, Schema Registry, ksqlDB, and a Kafka UI. Let's break down the key components and their configurations:


Kafka Broker Nodes

We configure three Kafka broker nodes: kafka-0, kafka-1, and kafka-2. Each broker is based on the Bitnami Kafka image and exposes port 9092, 9093, and 9094, respectively.


Note: As the default debezium docker image isn't bundled with confluent converter classes such as AvroConverter class, we have used custom debezium connector which has base debezium image.

version: "3.2"
services:
  kafka-0:
    image: docker.io/bitnami/kafka:3.5.1
    ports:
      - "9092:9092"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093
      - KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      - KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:29092,EXTERNAL://localhost:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_JMX_PORT=19101

    volumes:
      - kafka_0_data:/bitnami/kafka
  kafka-1:
    image: docker.io/bitnami/kafka:3.5.1
    ports:
      - "9093:9093"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      - KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-1:29092,EXTERNAL://localhost:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_JMX_PORT=19101

    volumes:
      - kafka_1_data:/bitnami/kafka
  kafka-2:
    image: docker.io/bitnami/kafka:3.5.1
    ports:
      - "9094:9094"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:19093,1@kafka-1:19093,2@kafka-2:19093
      - KAFKA_KRAFT_CLUSTER_ID=Dpi4jdiOTyW0G3vV112ZWg
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      - KAFKA_CFG_LISTENERS=INTERNAL://:29092,CONTROLLER://:19093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-2:29092,EXTERNAL://localhost:9094
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_JMX_PORT=19101

    volumes:
      - kafka_2_data:/bitnami/kafka

  schema-registry:
    image: docker.io/bitnami/schema-registry:7.3
    ports:
      - "8081:8081"
    depends_on:
      - kafka-0
      - kafka-1
      - kafka-2
    environment:
      - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
      - SCHEMA_REGISTRY_KAFKA_BROKERS=PLAINTEXT://kafka-0:29092,PLAINTEXT://kafka-1:29092,PLAINTEXT://kafka-2:29092
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry

  connect:
    image: olyanren/debezium-confluent:2.3.1
    hostname: connect
    container_name: connect
    depends_on:
      - kafka-0
      - kafka-1
      - kafka-2
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092"
      ADVERTISED_PORT: 8083
      ADVERTISED_HOST_NAME: "kafka-connect"
      GROUP_ID: compose-connect-group
      CONFIG_STORAGE_TOPIC: docker-connect-configs
      OFFSET_STORAGE_TOPIC: docker-connect-offsets
      STATUS_STORAGE_TOPIC: docker-connect-status
      ENABLE_SCHEMA_CONVERTERS: "true"
      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_AUTO-REGISTER: "true"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_FIND-LATEST: "true"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_AUTO-REGISTER: "true"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_FIND-LATEST: "true"
      CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE: "avro"
      LOG4J_ROOT_LOGLEVEL: "INFO"
      LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONFIG_STORAGE_REPLICATION_FACTOR: "3"
      OFFSET_STORAGE_REPLICATION_FACTOR: "3"
      STATUS_STORAGE_REPLICATION_FACTOR: "3"
      OFFSET_FLUSH_INTERVAL_MS: 60000
      PLUGIN_PATH: '/usr/share/java'


  ksqldb-server:
    image: confluentinc/ksqldb-server:0.29.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SINK_REPLICAS: "3"
      KSQL_KSQL_STREAMS_REPLICATION_FACTOR: "3"
      KSQL_KSQL_INTERNAL_TOPIC_REPLICAS: "1"
    volumes:
      - ksql_data:/bitnami/ksql
    depends_on:
      - kafka-0
      - kafka-1
      - kafka-2
      - schema-registry
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.29.0
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      DYNAMIC_CONFIG_ENABLED : "true"
      KAFKA_CLUSTERS_0_NAME: Dpi4jdiOTyW0G3vV112ZWg
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka-0:29092,kafka-1:29092,kafka-2:29092"
      KAFKA_CLUSTERS_0_METRICS_PORT: 19101
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
      KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME: admin
      KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD: admin
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083
    restart: always
    depends_on:
      - kafka-0
      - kafka-1
      - kafka-2
      - schema-registry
      - connect
volumes:
  kafka_0_data:
    driver: local
  kafka_1_data:
    driver: local
  kafka_2_data:
    driver: local
  ksql_data:
    driver: local

Custom Debezium Connector for SQL Server


To work with SQL Server, we've used a custom Debezium connector. You can register it using a POST request to Kafka Connect's REST API:


URL: http://localhost:8083/connectors

Method: POST

Content-Type: application/json


{
  "name": "hitit-basit-belge-isletme-connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "topic.prefix": "hitit.isletme.tis",
    "database.hostname": "localhost",
    "database.port": "1433",
    "database.user": "ktbHitit",
    "database.password": "1234",
    "database.names": "demo",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-0:29092,kafka-1:29092,kafka-2:29092",
    "schema.history.internal.kafka.topic": "schema-changes.demo-internal",
    "database.encrypt": "false",
    "snapshot.mode": "initial",
    "snapshot.isolation.mode": "repeatable_read",
    "table.include.list": "dbo.users, dbo.roles, dbo.permissions",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schema.registry.auto-register": true,
    "key.converter.schema.registry.find-latest": true,
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.auto-register": true,
    "value.converter.schema.registry.find-latest": true,
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "schema.name.adjustment.mode": "avro",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq_hitit_isletme_tis",
    "errors.deadletterqueue.topic.replication.factor": 1,
    "transforms": "unwrap,createKey,extractInt",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "Id",
    "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field": "Id",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "topic.creation.default.cleanup.policy": "delete",
    "topic.creation.default.partitions": "3",
    "topic.creation.default.replication.factor": "3"
  }
}

© 2019 All rights reserved. Codesenior.COM