0%
April 28, 2024

Kafka and Debezium with Everything Hosted Locally Without Confluent

debezium

kafka

Repository

Docker-Compose File and Registration of Debezium Connector

Spin up Instances

The docker-compose file is modified from this repository with tutorial in this video

version: "3.7"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [ zookeeper ]
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT     

  debezium:
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER_SCHEMAS_ENABLE: false
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    depends_on: [ kafka ]
    ports:
      - 8083:8083

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
    ports:
      - 8081:8081
    depends_on: [ zookeeper, kafka ]
Configure a project-root/debezium.json for an Instance of Connector
{
    "name": "billie-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "xxx.rds.amazonaws.com",
        "database.port": "5432",
        "database.user": "xxx",
        "database.password": "yyy",
        "database.dbname": "zzz",
        "database.server.name": "postgres",
        "table.include.list": "public.MessagesSession,public.SummaryFollow,public.LLMSummary,public.UserToProject,public.UserToChannel"
    }
}

From experience this database.server.name will determine the topic name in Kafka.

For example, we will be having

  • postgres.public.LLMSummary
  • postgres.public.MessagesSession
  • postgres.public.SummaryFollow
  • postgres.public.UserToChannel
  • postgres.public.UserToProject

as our topics to listen.

Create Debezium Source Connector from the Configuration
Post Request to Create a Connector

Let's execute the following in a bash shell:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ --data "@debezium.json"
Health-check the connector

Let's health-check the latest connector:

curl -H "Accept:application/json" localhost:8083/connectors/billie-connector/status
List all Topics Created

Note that the name of the running container depends on your working directory name.

docker exec -it <directory-name>-kafka-1 bash

and then run

/usr/bin/kafka-topics --bootstrap-server localhost:9092 --list

In my case I get:

__confluent.support.metrics
__consumer_offsets
_schemas
connect-status
connect_configs
connect_offsets
postgres.public.LLMSummary
postgres.public.MessagesSession
postgres.public.SummaryFollow
postgres.public.UserToChannel
postgres.public.UserToProject

Adjust the Logical Replication Level

Without full identity we cannot capture the state before changes (which will be null in the messages from Kafka), let's adjust it:

ALTER TABLE "LLMSummary" REPLICA IDENTITY FULL;
ALTER TABLE "MessagesSession" REPLICA IDENTITY FULL;
ALTER TABLE "SummaryFollow" REPLICA IDENTITY FULL;
ALTER TABLE "UserToChannel" REPLICA IDENTITY FULL;
ALTER TABLE "UserToProject" REPLICA IDENTITY FULL;

Listening to the Topics

main.js

Let's try kafkajs instead of node-rdkafka:

yarn add kafkajs

with

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:29092']
})

const topics = [
    "postgres.public.LLMSummary",
    "postgres.public.MessagesSession",
    "postgres.public.SummaryFollow",
    "postgres.public.UserToChannel",
    "postgres.public.UserToProject"
];
const run = async () => {
    const consumer = kafka.consumer({ groupId: "kafka" });
    consumer.subscribe({ topics, fromBeginning: false })
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            const payload = JSON.parse(message.value.toString()).payload;
            console.log("------------------------")
            console.log(topic)
            console.log(payload)
        },
    })
}

run().catch(console.error);

process.on('uncaughtException', function (err) {
    logger.error(err.stack);
    logger.info("Node NOT Exiting...");
});
Results

Suppose that I have done an action in our frontend, then it is very clear what happened in the backend:

For example:

  • What is inserted into the database
  • What is updated into the database

It helps understand the business from the tables and understand the tables from the business as well.