Skip to main content

使用轻量级 CDC debezium-server-databend 构建实时数据同步

简介

Debezium Server Databend 是一个基于 Debezium Engine 自研的轻量级 CDC 项目,用于实时捕获数据库更改并将其作为事件流传递最终将数据写入目标数据库 Databend。它提供了一种简单的方式来监视和捕获关系型数据库的变化,并支持将这些变化转换为可消费事件。

使用 Debezium server databend 实现 CDC 无须依赖大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一个启动脚本即可开启实时数据同步。

这篇教程将展示如何基于 Debezium server databend 快速构建 MySQL 到 Databend 的实时数据同步。

假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。

接下来的内容将介绍如何使用 Debezium server databend CDC 来实现这个需求,系统的整体架构如下图所示:

准备阶段

准备一台已经安装了 Docker ,docker-compose 以及 Java 11 环境 的 Linux 或者 MacOS 。

准备教程所需要的组件

接下来的教程将以 docker-compose 的方式准备所需要的组件。

debezium-MySQL

docker-compose.yaml

version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw

Debezium Server Databend

  • Clone 项目: git clone ``https://github.com/databendcloud/debezium-server-databend.git

  • 从项目根目录开始:

    • 构建和打包 debezium server: mvn -Passembly -Dmaven.test.skip package
    • 构建完成后,解压服务器分发包: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
    • 进入解压后的文件夹: cd databendDist
    • 创建 application.properties 文件并修改: nano conf/application.properties,将下面的 application.properties 拷贝进去,根据用户实际情况修改相应的配置。
    • 使用提供的脚本运行服务: bash run.sh
    • Debezium Server with Databend 将会启动

同时我们也提供了相应的 Docker image,可以在容器中一键启动:

version: '2.1'
services:
debezium:
image: ghcr.io/databendcloud/debezium-server-databend:pr-2
ports:
- "8080:8080"
- "8083:8083"
volumes:
- $PWD/conf:/app/conf
- $PWD/data:/app/data

NOTE: 在容器中启动注意所连接数据库的网络。

Debezium Server Databend Application Properties

本文章使用下面提供的配置,更多的参数说明以及配置可以参考文档

debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN

准备数据

MySQL 数据库中准备数据

进入 MySQL 容器

docker-compose exec mysql mysql -uroot -p123456

创建数据库 mydb 和表 products,并插入数据:

CREATE DATABASE mydb;
USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");

在 Databend 中创建 Database

NOTE: 用户可以不必先在 Databend 中创建表,系统检测到后会自动为用户建表。

启动 Debezium Server Databend

bash run.sh

首次启动会进入 init snapshot 模式,通过配置的 Batch Size 全量将 MySQL 中的数据同步到 Databend,所以在 Databend 中可以看到 MySQL 中的数据已经同步过来了:

同步 Insert 数据

我们继续往 MySQL 中插入 5 条数据:

INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer");

Debezium server databend 日志:

同时在 Databend 中可以查到 5 条数据已经同步过来了:

同步 Update 数据

配置文件中 debezium.sink.databend.upsert=true ,所以我们也可以处理 Update/Delete 的事件。

在 MySQL 中更新 id=10 的数据:

update products set name="from debezium" where id=10;

在 Databend 中可以查到 id 为 10 的数据已经被更新:

同步 Delete 数据

在配置文件中,有以下的配置,既可开启处理 Delete 事件的能力:

debezium.sink.databend.upsert-keep-deletes=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

Debezim Server 对 Delete 的处理比较复杂,在 DELETE 操作下会生成两条事件记录:

  1. 一个包含 "op": "d",其他的行数据以及字段;
  2. 一个tombstones记录,它具有与被删除行相同的键,但值为null。

这两条事件会同时发出,在 Debezium Server Databend 中我们选择对 Delete 数据实行软删除,这就要求我们在 target table 中拥有 __deleted 字段,当 Delete 事件过来的时候我们将该字段置为 TRUE 后插入到目标表。

这样设计的好处是,有些用户想要保留这些数据,但可能未来会想到将其删除,这样就为用户提供了可选的方案,未来想要删除这些数据的时候,只需要 delete from table where __deleted=true 即可。

关于 Debezium 对删除事件的说明以及处理方式,详情可参考文档

在 MySQL 中删除 id=12 的数据:

delete from products where id=12;

在 Databend 中可以观察到 id=12 的值的 __deleted 字段已经被置为 true

环境清理

操作结束后,在 docker-compose.yml 文件所在的目录下执行如下命令停止所有容器:

docker-compose down

结论

以上就是基于轻量级 CDC debezium server databend 构建 MySQL 到 Databend 的 实时数据同步的全部过程,这种方式不需要依赖 Flink, Kafka 等大型组件,启动和管理非常方便。