NATS Logo by Example

Change Data Capture via Debezium in Integrations

Change data capture (CDC) can be desirable to setup on databases in order to stream granular change events to other parts of the system as data changes.

A fairly popular open source project that handles the intricacies of interfacing with a variety of databases to capture changes is Debezium.

This example highlights a minimal setup to leverage the new standalone Debezium Server for performing change data capture (CDC) from a Postgres database to a NATS stream.

For Postgres, Debzium leverages the logical replication API and registers itself as a replication target. It then takes this raw representation of the change and converts it to a standard event data model. Finally, it publishes to the configured NATS stream, by default, called DebeziumStream.

Each event is published to a subject corresponding to the Postgres schema and table, for example, postgres.public.test. The first token is an optional prefix specified in the Debezium configuration.

See the source repo for the Docker Compose file and Debezium configuration.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run integrations/debezium/cli
View the source code or learn how to run this example yourself

Code

#!/bin/bash


set -euo pipefail

Ensure all the services are up and running.

until nc -z nats 4222; do sleep 1; done
until nc -z postgres 5432; do sleep 1; done
until nc -z debezium 8080; do sleep 1; done

Allow Debezium to setup its connection to Postgres.

sleep 1

Create a table and insert, update, and delete some data.

printf 'Create and populate the database.\n'
psql -h postgres -c "create table profile (id serial primary key, name text, color text);"


psql -h postgres -c "insert into profile (name, color) values ('Joe', 'blue');"
psql -h postgres -c "insert into profile (name, color) values ('Pam', 'green');"


psql -h postgres -c "update profile set color = 'red' where name = 'Joe';"
psql -h postgres -c "update profile set color = 'yellow' where name = 'Pam';"


psql -h postgres -c "delete from profile where name = 'Joe';"


psql -h postgres -c "create table books (id serial primary key, title text, author text);"


psql -h postgres -c "insert into books (title, author) values ('NATS Diaries', 'Pam');"

Ensure the change events have been published to the stream.

sleep 1

Print the stream subjects, seeing each schema/table pair as subjects.

printf '\nStream subjects.\n'
nats stream subjects DebeziumStream

Print the data in the stream, plucking out the before and after values for ease of reading.

printf '\nChange events.\n'
nats consumer add DebeziumStream viewer --ephemeral --pull --defaults > /dev/null
nats consumer next --raw --count 5 DebeziumStream viewer | jq -r '.payload'

Output

Create and populate the database.
CREATE TABLE
INSERT 0 1
INSERT 0 1
UPDATE 1
UPDATE 1
DELETE 1
CREATE TABLE
INSERT 0 1

Stream subjects.
╭─────────────────────────────────────────────────────────────────╮
│               2 Subjects in stream DebeziumStream               │
├───────────────────────┬───────┬─────────────────────────┬───────┤
│ Subject               │ Count │ Subject                 │ Count │
├───────────────────────┼───────┼─────────────────────────┼───────┤
│ postgres.public.books │ 1     │ postgres.public.profile │ 5     │
╰───────────────────────┴───────┴─────────────────────────┴───────╯


Change events.
{
  "before": null,
  "after": {
    "id": 1,
    "name": "Joe",
    "color": "blue"
  },
  "source": {
    "version": "2.4.0.Final",
    "connector": "postgresql",
    "name": "postgres",
    "ts_ms": 1699905639230,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22920144\",\"22920248\"]",
    "schema": "public",
    "table": "profile",
    "txId": 733,
    "lsn": 22920248,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1699905639524,
  "transaction": null
}
{
  "before": null,
  "after": {
    "id": 2,
    "name": "Pam",
    "color": "green"
  },
  "source": {
    "version": "2.4.0.Final",
    "connector": "postgresql",
    "name": "postgres",
    "ts_ms": 1699905639240,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22920528\",\"22920528\"]",
    "schema": "public",
    "table": "profile",
    "txId": 734,
    "lsn": 22920528,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1699905639527,
  "transaction": null
}
{
  "before": null,
  "after": {
    "id": 1,
    "name": "Joe",
    "color": "red"
  },
  "source": {
    "version": "2.4.0.Final",
    "connector": "postgresql",
    "name": "postgres",
    "ts_ms": 1699905639249,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22920712\",\"22920712\"]",
    "schema": "public",
    "table": "profile",
    "txId": 735,
    "lsn": 22920712,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1699905639527,
  "transaction": null
}
{
  "before": null,
  "after": {
    "id": 2,
    "name": "Pam",
    "color": "yellow"
  },
  "source": {
    "version": "2.4.0.Final",
    "connector": "postgresql",
    "name": "postgres",
    "ts_ms": 1699905639257,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22920840\",\"22920840\"]",
    "schema": "public",
    "table": "profile",
    "txId": 736,
    "lsn": 22920840,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1699905639527,
  "transaction": null
}
{
  "before": {
    "id": 1,
    "name": null,
    "color": null
  },
  "after": null,
  "source": {
    "version": "2.4.0.Final",
    "connector": "postgresql",
    "name": "postgres",
    "ts_ms": 1699905639266,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22920976\",\"22920976\"]",
    "schema": "public",
    "table": "profile",
    "txId": 737,
    "lsn": 22920976,
    "xmin": null
  },
  "op": "d",
  "ts_ms": 1699905639527,
  "transaction": null
}

Recording

Note, playback is half speed to make it a bit easier to follow.