Debezium 用于捕获数据库中的更改,以便应用程序可以查看这些更改并对其做出响应。 Debezium 会记录每个数据库表中所有行级别的更改,并将其作为更改事件流记录下来

主流CDC工具:

  1. canal : 监控binlog,支持mysql,社区已不活跃,非首选
  2. debezium : 支持mysql, postgresql, mongodb等十余种数据库,mysql也支持binlog,成熟稳定
  3. flink cdc: 主要应用于大数据,基于debezium,延迟较低

这里选择debezium的理由:

  1. 不选canal的原因 只支持mysql,并且社区已不活跃,除了老项目使用新的已经不是首选,在阿里也都边缘化了
  2. 不选flink cdc的原因,公司大数据没有flink的项目,增加运维成本,并且需要使用java开发,并不是所有开发都会使用java
  3. 选择debezium的原因,debezium成熟稳定,支持多种数据库,经典的使用方式是使用的是kafka connect部署, 我们有自己的kafka,不会增加运维成本, kafka connect是kafka自带的分布式工具,不用搭建额外的平台管理任务,使用消息队列和固定语言解耦,降低开发成本

当我们想监控数据库数据变动时我可以使用一些CDC工具,这里介绍一款配合Kafka使用的Connect插件Debezium,他支持Mysql, PostgreSQL, MongoDB等

能做什么:

1. 解耦数据修改端和使用端,减少应该通知但是没有通知的数据变更的错误,或者手动修改数据库时没有或忘记通知其他端
2. 异构数据库同步数据,比如搜索相关业务,需要数据实时同步,但是要保证数据一致
3. 缓存数据的生成和过期
4. 一些计算任务(数据统计,聚合)

这里我们以MySQL为例

环境

1. Kafka 3.7

2. MySQL 8.0 

3. Debezium Mysql connect Plugin 2.6.2 Final

Mysql

需要开启row格式binlog

[mysqld]
bind-address = 0.0.0.0
binlog_format = ROW
server_id = 1
log_bin = /var/log/mysql/mysql-bin.log

需要以下权限

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
权限作用
SELECT查询数据,仅在执行快照时使用
RELOAD允许连接器使用 FLUSH 语句来清除或重新加载内部缓存、刷新表或获取锁。仅在执行快照时使用
SHOW DATABASES使连接器能够通过发出 SHOW DATABASE 语句来查看数据库名称。仅在执行快照时使用
REPLICATION SLAVE使连接器能够连接并读取 MySQL 服务器二进制日志
REPLICATION CLIENT允许连接器使用以下语句:SHOW MASTER STATUS,SHOW SLAVE STATUS,SHOW BINARY LOGS
LOCK TABLES执行快照时需要锁表

Debezium

把插件解压到一个目录下,我这里解压到Kafka目录下,创建一个connects目录,解压到这里,解压后我的目录像这样
--kafka
    |--connects
        |--debezium-connector-mysql

在kafka的config文件夹下配置connect-distributed.properties这个文件 修改plugin.path填写connects文件夹路径

这两个值配置为false,json格式不需要这个,否则事件里面key和value会带着冗长的schema信息

key.converter.schemas.enable=false
value.converter.schemas.enable=false

bootstrap.servers这个配置项设置为kafka的地址

其他选项根据使用情况修改

完整配置如下

bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#下面这两个只有在json格式的时候才需要配置为false, 其他格式在解析时是需要schema信息的
key.converter.schemas.enable=false  
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
listeners=HTTP://:8083
plugin.path=/xxx/kafka/connects

启动分布式connect,./bin/connect-distributed.sh config/connect-distributed.properties

也可以使用docker镜像方式启动

docker run  -d \
    --restart=always \
    --name connect \
    -p 8084:8083 \
    -e GROUP_ID=1 \
    -e CONFIG_STORAGE_TOPIC=docker_connect_configs \
    -e OFFSET_STORAGE_TOPIC=docker_connect_offsets \
    -e STATUS_STORAGE_TOPIC=docker_connect_statuses \
    -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 \
    -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 \
    -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 \
    -e CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000 \
    -e OFFSET_FLUSH_INTERVAL_MS=10000 \
    -e BOOTSTRAP_SERVERS=xxxx:9092,xxx:9092 \
    -e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false \
    -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
    -e VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
    quay.io/debezium/connect:2.4

#具体配置项还可以参考容器内docker_entrypoint.sh

debezium mysql connect配置

{
    //连接器名称
    "name": "connector-test",
    "config": {
      //连接器类, mysql connector,需根据版本按文档填写
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      //数据库信息
      "database.hostname": "192.168.20.156",
      "database.port": "3306",
      "database.user": "flink",
      "database.password": "flink2023",
      //数据库服务器ID,不能重复
      "database.server.id": "123123",

      //要监控的数据库和表
      "database.include.list": "web",
      "table.include.list": "web.A_ExpertInfo_1,web.W_ListenRec_(.*)",

      //主题前缀,起命名空间作用
      "topic.prefix": "mx1",
      //快照模式,只获取schema,监听在这之后的数据变化
      "snapshot.mode": "schema_only",

      //配置schema历史相关信息,内部使用,不需消费
      "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
      "schema.history.internal.kafka.topic": "mx.schemahistory.customer",
      //是否包含schema变更事件,在"topic.prefix" 主题里可以看到变更事件
      "include.schema.changes": "true",

      //处理不一致的schema的模式, warn打印warn日志而不是停止任务
      "inconsistent.schema.handling.mode": "warn",
      //不发送墓碑消息
      "tombstones.on.delete": "false",
      
      //定义转换器
      "transforms": "ListenRec,ListenRecPartition",

      //这里配置的是主题路由
      "transforms.ListenRec.type": "io.debezium.transforms.ByLogicalTableRouter",
      "transforms.ListenRec.topic.regex": "(.*)easyweb_new_trans\\.W_ListenRec_(.*)",
      "transforms.ListenRec.topic.replacement": "$1ListenRec",

      //这里配置的是分区路由
      "transforms.ListenRecPartition.type": "io.debezium.transforms.partitions.PartitionRouting",
      "transforms.ListenRecPartition.partition.payload.fields":"change.userid", //这里可以指定before.userid或者after.userid change.userid是自动判断使用before或者after,比如删除后after为空,新建后before为空
      "transforms.ListenRecPartition.partition.topic.num":"12",
      "transforms.ListenRecPartition.predicate":"listen",

      //配置断言,用于指定上面的分区路由,指定如何根据条件将转换应用
      "predicates":"listen",
      "predicates.listen.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
      "predicates.listen.pattern":"(.*)ListenRec",

      //配置自动创建topic, 分区数和副本数, 注意较低的kafka版本不支持,需要手动创建
      //设置为-1则使用kafka 配置的默认值
      "topic.creation.default.partitions": "1",
      "topic.creation.default.replication.factor": "3",

      //配置自动创建topic的条件,类似transforms
      //还可以配置一些其他topic属性,比如cleanup.policy,retention.ms等
      "topic.creation.listen.partitions":"12",
      "topic.creation.listen.replication.factor":"1",
      "topic.creation.listen.include":"listenrec\\.Listen", //还可以配置exclude
      "topic.creation.groups": "listen",
    }
}

配置项解析:

name: 连接器名称

connector.class : 连接器类,固定写法

database.hostname: 数据库地址

database.port: 数据库端口

database.user: 数据库用户名

database.password: 数据库密码

database.server.id: 数据库服务器ID

database.include.list: 要监控的数据库类表,逗号分隔

topic.prefix: 主题前缀,起命名空间作用

table.include.list: 要监控的表名,逗号分隔

schema.history.internal.kafka.topic: 记录历史变更的主题,内部使用,不需消费,同集群多个任务可共享

snapshot.mode:

1. always: 连接器每次启动时都会执行快照。快照包括捕获schema和数据.

2. initial: 仅当未记录逻辑服务器名称的偏移量或者之前的快照失败时,连接器才会运行快照。快照完成后,连接器开始传输事件记录以进行后续数据库更改。

3. initial_only: 仅当逻辑服务器名称没有记录偏移时,连接器才会运行快照。快照完成后,连接器*停止*。它不会转换为流式传输以从二进制日志中读取更改事件。

4. schema_only: 已弃用,请参阅 no_data

5. no_data: 连接器运行仅捕schema的快照,但不捕获任何表数据。也就是说只捕获启动之后的数据变动,重新启动后会继续之前捕获的位置捕获变动.

6. recovery: 设置此选项可恢复丢失或损坏的数据库schema历史主题。重新启动后,连接器运行快照,从源表重建主题, 如果在上次连接器关闭后将schema更改提交到数据库,请勿使用此模式执行快照

7. never: 当连接器启动时,它不会执行快照,而是立即开始流式传输事件记录以进行后续数据库更改。正在考虑将来弃用此选项,转而使用 no_data 选项。

8. when_needed: 连接器启动后,仅在检测到以下情况之一时才执行快照: 1.  它无法检测到任何主题偏移 2. 先前记录的偏移量指定服务器上不可用的二进制日志位置或GTID

9.  configuration_based:  使用此选项,您可以通过一组具有前缀“snapshot.mode.configuration.based”的连接器属性来控制快照行为。

10. custom: 连接器根据 snapshot.mode.custom.name 属性指定的实现执行快照,该属性定义 io.debezium.spi.snapshot.Snapshotter 接口的自定义实现

tombstones.on.delete: 是否在删除事件发生后发送墓碑消息,默认为true,会在删除事件后发送一个value为null的事件

schema.history.*配置schema历史相关信息,用户解析binlog

include.schema.changes: 是否包含schema变更事件

inconsistent.schema.handling.mode: 指定连接器应如何对与内部架构表示中不存在的表相关的二进制日志事件做出反应。即内部表示与数据库不一致

1. fail: 失败抛出一个异常,指示有问题的事件及其二进制日志偏移量,并导致连接器停止, 默认行为

2. warn: warn记录有问题的事件及其二进制日志偏移量并跳过该事件

3. skip: 跳过有问题的事件并且不记录任何内容

transforms: 转换器列表,逗号分隔,可用于把分表的事件转换到一个主题下

使用curl把配置文件发送给connect

curl -X POST -H "Content-Type: application/json" --data @mysql.json http://localhost:8083/connectors

其他接口方法请参考

postman json

这条语句会把mysql.json这个文件POST给connect, connect收到请求后会启动task启动对mysql的监控

剩下要做的是编写consumer来消费数据库变动事件就可以了

注意:

schema.history.internal.kafka.topic:

1. 这个配置项是connector内部使用,不需消费
2. partition必须为1(因为要保证顺序性)
3. `cleanup.policy` 需要设置为delete(默认是这个),不可以设置为compact(因为需要保留所有历史,不能只保留最新的);`retention.ms` 保留时间设置为9223372036854775807(最大值,默认,或者-1),意味着永不删除; `retention.bytes` 保留大小设置为-1(不限制,默认)
4. 最好是每个task单独创建一个topic,不要共用一个topic,否则可能会导致找不到schema而导致数据丢失

如果*较低版本*不支持设置关于topic的配置,或者不能自动创建,可以手动创建topic:
    默认topic: topicPrefix.dbname.table
    如果transform指定了replacement,需要按指定的topic创建