Debezium
1. Debezium
Debezium是一种成熟的数据库CDC工具(Change Data Capture),可将现有数据库中的信息转换为事件流,使应用程序能够检测并立即响应数据库中的行级变化,这对于需要快速,实时反应数据变更的场景非常重要。并且Debezium对于不同数据库的支持丰富,这意味着使用Debezium作为CDC解决方案,无需更改数据库引擎。除此之外,Debezium是基于事件的架构,同时在设计时考虑了幂等性,确保了容错,并且Debezium提供多种部署方式,可以适用于不同规模和环境的应用。
2. Debezium-server-rain
Debezium-server-rain是算场科技基于Debezium-server开发的轻量级CDC工具,用于监控并捕获关系型数据库中的数据变更,将变更事件作为数据流导入到算场目标表中。
3. 安装debezium-server-rain
3.1 通过源码安装
3.1.1 基本环境
编译环境:JDK11 + maven3x
运行环境:JDK11
3.1.2 获取源码(todo 项目传到github)
git clone git@code.suan-chang.com:rain-connect/debezium-server-rain.git
3.1.3 编译打包
cd debezium-server-rain
mvn -Passembly -Dmaven.test.skip package
3.1.4 解压程序
unzip debezium-server-rain-dist/target/debezium-server-rain-dist*.zip -d rainDist
cd rainDist/debezium-server-rain
3.1.5 修改配置文件
vim conf/application.properties
在conf目录下创建application.properties
3.1.6 启动服务
chmod +x run.sh
bash run.sh
3.2 使用docker安装(todo 镜像上传到dockerHub)
3.2.1 基本环境
Docker,docker Compose
3.2.2 创建配置
在conf目录下创建application.properties
mkdir debeziumRain
cd debeziumRain
mkdir conf
mkdir data
vim conf/application.properties
3.2.3 创建docker compose
vim docker-compose.yml
docker-compose.yml 内容为
version: '2.1'
services:
debezium:
image: registry.cn-beijing.aliyuncs.com/suanchang/debeziumrain
ports:
- "8080:8080"
- "8083:8083"
volumes:
- $PWD/conf:/debezium-server-rain/conf
- $PWD/data:/debezium-server-rain/data
3.2.4 启动docker容器
todo:目前使用内部docker镜像 等待公网docker镜像上线
docker-compose up -d
4. 操作示例
示例中的所有数据准备都是在docker中启动数据库,如您没有docker环境,可使用本地数据库,在配置中将数据库地址按需更改即可
4.1 采集Mysql数据到算场中
本节演示如何采取mysql数据到算场中
运行环境
需要在网页获取用户的accessKey
和secretKey
以及metaEndpoint
mysql数据准备
docker run --name my-debezium-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -d debezium/example-mysql:2.1.2.Final
docker exec -it my-debezium-mysql mysql -uroot -pdebezium
CREATE database debezium;
USE debezium;
CREATE TABLE employee (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
age INT,
email VARCHAR(255),
phone VARCHAR(20),
address VARCHAR(255),
status TINYINT
);
INSERT INTO debezium.employee (name, age, email, phone, address, status) VALUES
('John Doe', 30, 'john.doe@example.com', '555-1234', '123 Main St', 1),
('Jane Smith', 28, 'jane.smith@example.com', '555-5678', '456 Oak St', 1),
('Alice Johnson', 35, 'alice.johnson@example.com', '555-8765', '789 Pine St', 1),
('Bob Brown', 40, 'bob.brown@example.com', '555-4321', '321 Maple St', 1),
('Charlie Davis', 25, 'charlie.davis@example.com', '555-2345', '654 Birch St', 1),
('Diana Evans', 32, 'diana.evans@example.com', '555-6789', '987 Cedar St', 1),
('Frank Green', 29, 'frank.green@example.com', '555-3456', '159 Elm St', 1),
('Grace Hall', 27, 'grace.hall@example.com', '555-7890', '753 Spruce St', 1),
('Henry Adams', 33, 'henry.adams@example.com', '555-4567', '951 Walnut St', 1),
('Ivy Baker', 31, 'ivy.baker@example.com', '555-8901', '357 Poplar St', 1),
('Jack Carter', 26, 'jack.carter@example.com', '555-5678', '852 Cherry St', 1),
('Karen Lee', 34, 'karen.lee@example.com', '555-6789', '258 Chestnut St', 1),
('Larry Martin', 36, 'larry.martin@example.com', '555-7890', '654 Redwood St', 1),
('Mona Nelson', 30, 'mona.nelson@example.com', '555-8901', '951 Cypress St', 1),
('Nate Owens', 28, 'nate.owens@example.com', '555-4567', '753 Fir St', 1),
('Olivia Parker', 35, 'olivia.parker@example.com', '555-5678', '258 Palm St', 1),
('Paul Quinn', 40, 'paul.quinn@example.com', '555-6789', '654 Ash St', 1),
('Quincy Roberts', 25, 'quincy.roberts@example.com', '555-7890', '357 Willow St', 1),
('Rachel Scott', 32, 'rachel.scott@example.com', '555-8901', '951 Magnolia St', 1),
('Steve Turner', 29, 'steve.turner@example.com', '555-4567', '753 Juniper St', 1);
设置application.properties
设置application.properties,并启动服务,具体启动流程参考导航3
如果是docker启动,需要将debezium.source.database.hostname 改为数据库对应的ip
debezium.sink.type=rain
debezium.sink.rain.accessKey=yourAccessKey
debezium.sink.rain.secretKey=yourSecretKey
debezium.sink.rain.metaEndpoint=127.0.0.1:4002
debezium.sink.rain.clientId=suanchang-test
debezium.sink.rain.databaseName=debezium
debezium.sink.rain.destination-regexp=\\.
debezium.sink.rain.destination-regexp-replace=_
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# mysql source, related docs: https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/mysql_source_connector_config.html
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage=io.debezium.server.rain.offset.RainOffsetBackingStore
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=data/schemahistory_internal.dat
debezium.source.schema.history.file.filename=data/schemahistory.dat
debezium.source.offset.flush.interval.ms=1000
debezium.source.include.schema.changes=true
debezium.source.topic.prefix=rain
#conf for mysql
debezium.source.database.hostname=localhost
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=debezium
debezium.source.database.dbname=debezium
debezium.source.table.include.list=debezium.employee
debezium.source.database.server.name=serverName
debezium.source.database.server.id=1
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
# https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-drop-tombstones
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
debezium.transforms.unwrap.operation.header=true
# ############ SET LOG LEVELS ############
quarkus.log.console.json=warn
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=warn
quarkus.log.file.path=./logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
quarkus.log.level=info
quarkus.log.file.enable=true
quarkus.http.port=8080
查看同步结果
启动会会在算场中创建数据库debezium和表users,这是执行查询即可看到数据同步结果,这时不关闭服务,在mysql中对users表进行CURD操作,即可看到算场中数据会同步改变
select * from employee order by id;
2. 采集postgresSQL数据到算场中
数据库准备
docker run --name my-debezium-postgres -e POSTGRES_PASSWORD=suanchang -p 5432:5432 -d debezium/example-postgres:2.6.0.Final
docker exec -it my-debezium-postgres psql -U postgres
create database debezium;
-- 创建 schema debezium
CREATE SCHEMA debezium;
-- 创建枚举类型 user_role
CREATE TYPE user_role AS ENUM ('Admin', 'User', 'Guest');
-- 创建表 inventory.types
CREATE TABLE debezium.types (
-- 整数类型
int_column INT PRIMARY KEY,
-- 小数类型
decimal_column DECIMAL(10, 2),
-- 字符串类型
varchar_column VARCHAR(255),
-- 日期和时间类型
timestamp_column TIMESTAMP,
-- 布尔类型
boolean_column BOOLEAN,
-- 枚举类型
enum_column user_role,
-- 数组类型
integer_array_column INT[],
-- JSON 数据类型
json_column JSON,
-- UUID 类型
uuid_column UUID,
-- 字节类型
bytea_column BYTEA,
-- 网络地址类型
inet_column INET,
-- 二进制数据类型
binary_column BYTEA
);
-- 插入数据
INSERT INTO debezium.types (int_column, decimal_column, varchar_column, timestamp_column, boolean_column, enum_column, integer_array_column, json_column, uuid_column, bytea_column, inet_column, binary_column)
VALUES
(1, 12.34, 'John Doe', '2023-09-12 10:00:00', true, 'Admin', '{1, 2, 3}', '{"key": "value"}', '550e8400-e29b-41d4-a716-446655440000'::uuid, E'\\x0123456789ABCDEF', '192.168.1.1'::inet, E'\\x0123456789ABCDEF'),
(2, 45.67, 'Jane Smith', '2023-09-12 11:00:00', false, 'Admin', '{4, 5, 6}', '{"key": "value2"}', '551e8400-e29b-41d4-a716-446655440001'::uuid, E'\\x0123456789ABCDEF', '192.168.1.2'::inet, E'\\x0123456789ABCDEF'),
(3, 78.90, 'Alice Johnson', '2023-09-12 12:00:00', true, 'Admin', '{7, 8, 9}', '{"key": "value3"}', '552e8400-e29b-41d4-a716-446655440002'::uuid, E'\\x0123456789ABCDEF', '192.168.1.3'::inet, E'\\x0123456789ABCDEF'),
(4, 23.45, 'Charlie Clark', '2023-09-12 13:00:00', false, 'User', '{10, 11, 12}', '{"key": "value4"}', '553e8400-e29b-41d4-a716-446655440003'::uuid, E'\\x0123456789ABCDEF', '192.168.1.4'::inet, E'\\x0123456789ABCDEF'),
(5, 56.78, 'Eve White', '2023-09-12 14:00:00', true, 'User', '{13, 14, 15}', '{"key": "value5"}', '554e8400-e29b-41d4-a716-446655440004'::uuid, E'\\x0123456789ABCDEF', '192.168.1.5'::inet, E'\\x0123456789ABCDEF'),
(6, 89.01, 'Frank Black', '2023-09-12 15:00:00', false, 'User', '{16, 17, 18}', '{"key": "value6"}', '555e8400-e29b-41d4-a716-446655440005'::uuid, E'\\x0123456789ABCDEF', '192.168.1.6'::inet, E'\\x0123456789ABCDEF'),
(7, 34.56, 'Grace Green', '2023-09-12 16:00:00', true, 'Guest', '{19, 20, 21}', '{"key": "value7"}', '556e8400-e29b-41d4-a716-446655440006'::uuid, E'\\x0123456789ABCDEF', '192.168.1.7'::inet, E'\\x0123456789ABCDEF'),
(8, 67.89, 'Henry Blue', '2023-09-12 17:00:00', false, 'Guest', '{22, 23, 24}', '{"key": "value8"}', '557e8400-e29b-41d4-a716-446655440007'::uuid, E'\\x0123456789ABCDEF', '192.168.1.8'::inet, E'\\x0123456789ABCDEF'),
(9, 90.12, 'Ivy Red', '2023-09-12 18:00:00', true, 'Guest', '{25, 26, 27}', '{"key": "value9"}', '558e8400-e29b-41d4-a716-446655440008'::uuid, E'\\x0123456789ABCDEF', '192.168.1.9'::inet, E'\\x0123456789ABCDEF'),
(10, 123.45, 'Bob Brown', '2023-09-12 19:00:00', false, 'Admin', '{28, 29, 30}', '{"key": "value10"}', '55ae8400-e29b-41d4-a716-446655440009'::uuid, E'\\x0123456789ABCDEF', '192.168.1.10'::inet, E'\\x0123456789ABCDEF');
设置配置
如果是docker启动,需要将debezium.source.database.hostname 改为数据库对应的ip
debezium.sink.type=rain
debezium.sink.rain.accessKey=yourAccessKey
debezium.sink.rain.secretKey=yourSecretKey
debezium.sink.rain.metaEndpoint=127.0.0.1:4002
debezium.sink.rain.clientId=suanchang-test
debezium.sink.rain.databaseName=debezium
debezium.sink.rain.destination-regexp=\\.
debezium.sink.rain.destination-regexp-replace=_
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# pg source
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage=io.debezium.server.rain.offset.RainOffsetBackingStore
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=data/schemahistory_internal.dat
debezium.source.schema.history.file.filename=data/schemahistory.dat
debezium.source.offset.flush.interval.ms=1000
debezium.source.include.schema.changes=true
debezium.source.topic.prefix=rain
#conf for pg
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=suanchang
debezium.source.database.dbname=postgres
debezium.source.schema.include.list=debezium
debezium.source.table.include.list=debezium.types
debezium.source.signal.data.collection=debezium.types
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
# https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-drop-tombstones
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
debezium.transforms.unwrap.operation.header=true
# ############ SET LOG LEVELS ############
quarkus.log.console.json=warn
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=warn
quarkus.log.file.path=./logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
quarkus.log.level=info
quarkus.log.file.enable=true
quarkus.http.port=8080
查看同步结果
在worksheet中执行sql查询结果,并且同步在pg中进行CURD操作,在算场查看同步结果
select * from debezium.debezium_types;
附件
1. 示例配置
debezium.sink.type=rain
debezium.sink.rain.accessKey=yourAccessKey
debezium.sink.rain.secretKey=yourSecretKey
debezium.sink.rain.metaEndpoint=127.0.0.1:4002
debezium.sink.rain.clientId=suanchang-test
debezium.sink.rain.databaseName=debezium
debezium.sink.rain.destination-regexp=\\.
debezium.sink.rain.destination-regexp-replace=_
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# mysql source, related docs: https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/mysql_source_connector_config.html
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage=io.debezium.server.rain.offset.RainOffsetBackingStore
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=data/schemahistory_internal.dat
debezium.source.schema.history.file.filename=data/schemahistory.dat
debezium.source.offset.flush.interval.ms=1000
debezium.source.include.schema.changes=true
debezium.source.topic.prefix=rain
#conf for mysql
debezium.source.database.hostname=192.168.123.150
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=debezium
debezium.source.table.include.list=debezium.test2
debezium.source.database.server.name=serverName
debezium.source.database.server.id=1
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
# https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-drop-tombstones
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
debezium.transforms.unwrap.operation.header=true
# ############ SET LOG LEVELS ############
quarkus.log.console.json=warn
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=warn
quarkus.log.file.path=./logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
quarkus.log.level=warn
quarkus.log.file.enable=true
quarkus.http.port=8080
2. 配置解释
1. 统一固定配置
debezium.sink.type=rain
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
debezium.transforms.unwrap.operation.header=true
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
2. 用户自定义配置
debezium.sink.rain.accessKey=yourAccessKey
debezium.sink.rain.secretKey=yourSecretKey
debezium.sink.rain.metaEndpoint=127.0.0.1:4002
debezium.sink.rain.clientId=suanchang-test
debezium.sink.rain.databaseName=testDB
debezium.sink.rain.destination-regexp=\\.
debezium.sink.rain.destination-regexp-replace=_
debezium.source.offset.flush.interval.ms=1000
debezium.source.include.schema.changes=true
debezium.source.topic.prefix=rain;
debezium.source.schema.history.internal.file.filename=data/schemahistory_internal.dat
debezium.source.schema.history.file.filename=data/schemahistory.dat
# ############ SET LOG LEVELS ############
quarkus.log.console.json=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=INFO
quarkus.log.file.path=./logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
quarkus.log.level=INFO
quarkus.log.file.enable=true
quarkus.http.port=8080
Config | Default | Description |
---|---|---|
debezium.sink.rain.accessKey | "" | The accessKey of user |
debezium.sink.rain.secretKey | "" | The serectKey of user |
debezium.sink.rain.metaEndpoint | localhost:4002 | The metadata endpoint |
debezium.sink.rain.databaseName | debeziumRain | The name of the target database |
debezium.sink.rain.destination-regexp | "" | Matching rules for source table names |
debezium.sink.rain.destination-regexp-replace | "" | Replace matching characters in table name |
debezium.sink.rain.clientId | "" | The name of the debezium client |
debezium.source.offset.flush.interval.ms | 60000 | The interval at which offsets are flushed, in milliseconds |
debezium.source.max.batch.size | 2048 | The maximum size of each batch of records that is read from the source database |
debezium.source.include.schema.changes | true | Whether to include schema changes |
debezium.source.topic.prefix | rain | The prefix for the topic names |
debezium.sink.batch.batch-size-wait | NoBatchSizeWait | The wait time for the batch size |
3. 监控server运行状态
http://<服务器IP或域名>:8080/q/health
如果服务正常运行则会返回
{
"status": "UP",
"checks": [
{
"name": "debezium",
"status": "UP"
}
]
}
4. 参考链接
其余更多配置参考debezium官方文档 debezium-server-2.4.0-Final Documentation