原文地址: https://debezium.io/blog/2021/10/07/incremental-snapshots/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
Incremental Snapshots in Debezium
October 7, 2021 by Jiri Pechanec
mysql postgres sqlserver oracle db2 snapshots
从第1.6版开始,德贝唑的主要改进之一是支持 递增快照 .在这篇博文中,我们将解释这个功能的动机,我们将深入研究实现细节,我们还将展示它的演示。
为什么是增量快照?
自从Debezum成立以来最大的痛苦之一是对捕获表列表更改的最优化支持。作为用户,您可以创建一个新的连接器,其中包含要捕获的表列表(table.include.list 在稍后的时间,可能有必要调整这个配置,以便捕捉更多的表,而这些表最初不属于疾病预防控制中心。如果只够 流 从这些表中更改,那么问题就很容易解决了。但是如果您还需要捕捉表中现有的内容呢?
在表格中捕捉现有数据的工作传统上是由数据处理系统中的数据处理的。 快照阶段。这个阶段在第一个连接器启动时执行,其目标是在某个时间点捕捉一致的数据(将静止的数据转换为动态的数据)。这可能是一个相当长的操作,根据定义,它必须完全执行或根本不执行–有点像事务语义。这意味着,如果由于连接器的重新启动而没有完成快照,则必须从头开始重新执行快照,而且已经完成的一切都被丢弃。此外,在获取快照时,在数据库中并行执行的任何数据修改在快照完成之前不会进行流化。这可能会导致非常大的快照的数据库资源问题,因为事务日志必须保存,直到开始流。
因此,我们最终要解决三个问题:
如果必须对现有数据进行流化处理,几乎不可能在捕获的表列表中添加额外的表
无法终止或恢复的用于一致的快照的长期过程
更改被阻塞的数据流,直到快照完成
遗留解决方案
这个问题是众所周知的,随着时间的推移,我们制定了解决办法,并设想了可能的改进和新的解决办法。作为一种变通办法,一般性建议是使用多个连接器方法。用户被要求:
停止连接器
创建一个新的表来获取新表的快照(使用initial_only 浏览模式)
完成后,停止新连接器
用添加到列表中的新捕获的表重新配置和启动旧连接器
这个方法做了一些技巧,但非常笨拙,上面提到的所有有关快照一致性的问题仍然适用。
下一步是通过社区参与到mysql的Debezum连接器中 DBZ-175 .它的基础是设置多个二进制日志读取器的概念。一个读取器将捕获最初配置的表,而另一个读取器将快照新表,然后从新表中捕获更改。后者的读者会赶上原来的读者,然后他们就会和解并合并成一个单一的读者。
该代码运行良好,但从未进入孵化阶段,因为过程本身相当复杂,在角角案例中容易出错。最后但并非最不重要的是,它是一种巧妙的方法,但不幸的是,它不能移植到其他连接器上。
基于水标记的快照
2019年下半年,网飞公司的工程团队宣布,他们开发了一个内部变革数据采集框架。他们还提出了一个使用并发快照执行并发快照的创新解决方案。 水印 ,在论文中描述 基于水印的改变数据捕获框架 安德烈亚斯安德烈亚基斯和约安尼斯帕帕帕纳科图。
这一方法的主要思想是,改变数据流是连续的,并与快照同步执行。框架将低水印和高水印插入事务日志(通过编写到源数据库),并在这两点之间读取快照表的一部分。如果相同的记录在窗口期间被快照和修改,该框架将保存在水印之间的数据库更改记录,并将它们与快照值进行协调。
这意味着数据是以块的形式分解的–在连接器启动时没有冗长的过程,而且在连接器发生崩溃或受控制的终止时,可以从最后一个完成的块开始恢复快照。
按照网飞的要求,实现是为mysql和后格列格SQL数据库提供的。
签名台
在转向Debezum实现基于水标记的快照方法之前,需要一个小的绕行。
有时候,从外部控制德贝兹是有用的,这样就可以迫使它执行一些要求的动作。假设有必要重新拍摄一张已经被剪掉的桌子–一个所谓的 临时的 快照。用户将需要发送一个命令到Debezns,以暂停当前操作并执行快照。为此,德贝兹界定了这一概念 信号 ,经由 签名台 .这是一个特殊的表格,专门用于用户和Debezum之间的通信。Debezum捕获了表,当用户要求执行某个操作时,他们只需将记录写入信号表(发送信号)。德贝兹将接收捕获的更改,然后执行所需的操作。
去贝兹中的渐进式快闪
当我们意识到博客的快照方法时,我们决定该方法是一种通用的方法,我们也可以尝试将它应用于德贝齐姆。由于我们在不同的连接器之间共享很多代码条(使用德贝兹连接器框架),我们的目标是在德贝兹核心组件中实现它,以便所有的连接器都能同时从该特性中获益。设计和实施是由 DDD-3 德贝兹设计文件。
在去贝兹的增量快照是以特别快照的形式提供的。用户不将连接器配置为执行快照,而是使用信号传递机制发送快照信号,从而触发一组表的快照。有关的信号叫做execute-snapshot 而信号讯息的格式如下:
{“data-collections”: [“”, “”, “”, …]}
当请求一个表快照时,Debezns将做以下工作:
获取表中最大的主键;这是快照端点,其值存储在连接器偏移中
根据主键的总顺序和表的大小,将表分成若干块。incremental.snapshot.chunk.size 配置选项
查询块时,构建动态SQL语句,选择下一个incremental.snapshot.chunk.size 记录,其主键比来自前一个块(或第一块的第一个主键)的最后一个键大,且小于或等于所记录的最大主键。
默认的块大小为1,024。为了提高效率,您可以增加值(执行的快照查询总数较少),但这应当与缓冲区所需的内存消耗量增加相平衡。建议在你自己的环境中做一些实验,以确定最适合你情况的环境。
读取一个块是一个稍微复杂的过程:
Asnapshot-window-open 已发出信号
执行块查询并将块内容读取到内存中
Asnapshot-window-close 已发出信号
图1事务隔离
Debezr不是访问数据库的唯一进程。我们可以期待同时访问数据库的大量进程,可能访问当前被快照的相同记录。如图所示,对数据的任何更改都将根据提交顺序写入事务日志。由于不可能精确地安排块读取事务的时间来识别潜在冲突,因此添加了打开和关闭窗口事件来划分冲突可能发生的时间。德贝兹的任务是消除这些冲突的重复。
为此目的,Debezum将由块生成的所有事件记录到缓冲区中。当snapshot-window-open 接收信号,然后检查来自事务日志的所有事件是否属于快照表。如果是,则检查缓冲区是否包含主键。如果是,则从缓冲区删除快照事件,因为这是一个潜在的冲突。由于无法正确排序快照和事务日志事件,因此只保留事务日志事件。当snapshot-window-close 接收信号时,缓冲区中剩余的快照事件被发送到下游。
图2行动中的缓冲区
记录K2、K3和K4已经存在于数据库中。在快照窗口打开之前,记录k1被插入,k2被更新,k3被删除。当从日志中读取这些事件时,这些事件被发送到下游。快照窗口打开,其查询将选择K1、K2和K4进入缓冲区。当窗口打开时,从事务日志中检索到K4的删除;从缓冲区中删除K4的快照事件,并向下游发送删除事件。插入从日志中检索到的K5和K6,将发出相应的事件。根据具体的时间,缓冲区中也可能有它们的读取事件(在图片中,K5就是这种情况),这些事件将被删除。当快照窗口关闭时,将从缓冲区发出K1和K2的剩余快照事件。
连接器重新启动
到目前为止,我们已经演示了,使用增量快照的概念,在连接器运行时,如果需要的话,相同的表可以反复快照。我们已经表明,它的执行不会停止从事务日志流。最后一个项目是暂停和继续进程。
当一个增量快照运行时,将增量快照上下文添加到每个消息抵消。背景是三种信息:
第一张是当前的一张是要剪裁的表格列表
表中最大主键
下游发送的增量快照的最后一个事件的主键
这三个项目足以在连接器重新启动后恢复快照,无论是有意的还是在崩溃后。在连接器启动时,负责快照的组件从抵消集读取数据。它初始化其内部状态,并在最后一个处理事件之后恢复快照。请注意,在连接器不运行时插入或更新的任何记录将通过常规流读取进行处理,即。它们不受正在进行的快照的影响。
这种方法确保了流程的健壮性、重新启动和崩溃的复原力,并将重新交付事件的数量降至最低(至少在仍然适用传递语义的情况下)。
局限性
与最初的一致快照相比,增量快照的缺点很少:
快照表必须包含主键
如果在快照过程中从表中删除一个事件,那么其中一种情况可能会发生:
Aread 事件及事件delete 下游消费者收到事件
只有一个delete 收到事件
如果在快照过程中在表中更新了一个事件,那么其中一种情况可能会发生:
Aread 事件及事件update 下游消费者收到事件
一种update 事件和read 收到事件(注:相反顺序)
只有一个update 事件被接收(如果更新发生在将会发出read 事件,导致read 在去重复过程中丢弃的事件)
一般来说,read 事件不应理解为表中记录的初始状态,而应理解为任意时间点记录的状态。相对于Debezum中的传统初始快照,消费者的语义略有变化,虽然可以保证在增量快照完成后,消费者已收到完整的数据集,但不会有任何数据集。read (快照)所有记录的事件,但可能是update 而是事件。同样的情况delete 事件:消费者必须准备好接收此类事件的记录,他们以前没有看到过。
模拟的
在讨论了一般概念之后,让我们在一个例子中更多地探讨一些事情。我们会用我们的标准 指导部署 展示临时的渐进式快照。我们用的是 后记 作为源数据库。对于这个演示,您需要多个终端窗口。
在开始阶段,我们将启动部署,创建信号表,并启动连接器:
export DEBEZIUM_VERSION=1.7
docker-compose -f docker-compose-postgres.yaml up
echo “CREATE TABLE inventory.dbz_signal (id varchar(64), type varchar(32), data varchar(2048))” | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS=“–search_path=inventory” bash -c “psql -U $POSTGRES_USER postgres”
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @- <<EOF
{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.dbname” : “postgres”,
“database.server.name”: “dbserver1”,
“schema.include”: “inventory”,
“table.include.list”: “inventory.customers,inventory.dbz_signal”,
“signal.data.collection”: “inventory.dbz_signal”
}
}
EOF
从日志上我们看到table.include.list 只放一张桌子,customers :
13:38:21-21-09-24(信息);38:21-81(信息);51(事务);51(事务);31(事务);31(事务);(事务);(事务);(事务)。
在下一步,我们将模拟数据库中的连续活动:
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh
–bootstrap-server kafka:9092
–from-beginning
–property print.key=true
–topic dbserver1.inventory.customers
docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS=“–search_path=inventory” bash -c “i=0; while true; do psql -U $POSTGRES_USER postgres -c “INSERT INTO customers VALUES(default,‘name$i’,‘surname$i’,‘email$i’)”; ((i++)); done”
主题dbserver1.inventory.customers 接收连续的消息流。现在连接器将重新配置,以捕捉orders 表:
#在捕获的订单中添加订单表
curl -i -X PUT -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/inventory-connector/config -d @- <<EOF
{
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.dbname” : “postgres”,
“database.server.name”: “dbserver1”,
“schema.include”: “inventory”,
“table.include.list”: “inventory.customers,inventory.dbz_signal,inventory.orders”,
“signal.data.collection”: “inventory.dbz_signal”
}
平均值
如所料,没有任何讯息orders 表:
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh
–bootstrap-server kafka:9092
–from-beginning
–property print.key=true
–topic dbserver1.inventory.orders
现在,让我们通过发送信号开始一个增量的临时快照。提供给orders 表格已交回dbserver1.inventory.orders 专题。给customers 桌子不间断地交付.
echo “INSERT INTO inventory.dbz_signal VALUES (‘signal-1’, ‘execute-snapshot’, ‘{“data-collections”: [“inventory.orders”]}’)” | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS=“–search_path=inventory” bash -c “psql -U $POSTGRES_USER postgres”
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh
–bootstrap-server kafka:9092
–from-beginning
–property print.key=true
–topic dbserver1.inventory.orders
如果你要修改orders 当快照运行时,这将作为一个read 事件或作为update 事件,取决于事情的准确时间和顺序。
作为最后一步,让我们终止部署的系统并关闭所有终端:
docker-compose -f docker-compose-postgres.yaml down
概括的
在这篇博文中,我们讨论了D博客文章所介绍的"渐进快照"概念的动机。我们回顾了过去为实现所描述的功能而使用的方法。然后,我们跳入深海,在德贝齐姆实施了这种新颖的快速拍摄方法,最后,我们尝试使用它现场。