开云体育官方注册网址用于Db2的Debezium连接器
该连接器目前处于孵化状态,即准确的语义,配置选项等可能会根据我们收到的反馈在未来的修订中改变。如果您遇到任何问题,请告诉我们。 |
开云体育官方注册网址Debezium的Db2 Connector可以监视和记录Db2数据库表中的行级更改。开云体育电动老虎机这个连接器受到SQL Server的Debezium实现的强烈启发,使用基于SQL的轮开云体育官方注册网址询模型将表放入“捕获模式”。它使用作为Db2 LUW标准部分的ASN库,可以添加到Db2 zOS中。
为了使用ASN并因此使用该连接器,您需要拥有IBM InfoSphere Data Replication (IIDR)产品的许可证。不需要安装IIDR本身。
当它第一次连接到Db2数据库时,它读取白名单(或未列入黑名单,具体取决于开云体育电动老虎机操作模式)中所有表的一致快照。注意,默认情况下,数据库中的所有表都被快照,而不仅仅是那些处于捕获模式的表。开云体育电动老虎机
当快照完成时,连接器将以捕获模式连续传输提交给Db2数据库的所有白名单表的更改。开云体育电动老虎机这将生成相应的插入、更新和删除事件。每个表的所有事件都记录在一个单独的Kafka主题中,应用程序和服务可以很容易地使用它们。
概述
连接器的功能基于ASN捕获/应用代理在DB2中启用SQL Replication。
捕获代理为CDC模式下的所有表生成ASN表,这些ASN表可以通过SQL接口查询,以便读取更改事件。使用这种机制,Db2 ASN捕获代理监视用户感兴趣的所有数据库和表,并将更改存储到专门创建的ASN表中。开云体育电动老虎机连接器已经在Db2/Linux 11.5.0.0中进行了测试,但我们希望该模型也适用于Windows、AIX和zOS。
数据库管开云体育电动老虎机理员必须将要监视的表设置为捕获模式。为了方便和自动化测试,我们用C语言编写了一个用户定义函数(UDF),它可以被编译,然后用于控制ASN代理,创建ASN模式/表以及添加/删除要捕获的表。中描述了这些实用程序开云体育官方注册网址debezium-connector-db2 / src /测试/码头工人/ db2-cdc-docker /
但这些只是为了方便和手动执行相同的命令使用db2控制命令也会有同样的效果。
连接器然后生成一个更改事件控件发布的每个行级插入、更新和删除操作ASN SQL表,在一个单独的Kafka主题中记录每个表的所有更改事件。客户端应用程序读取对应于它们感兴趣的数据库表的Kafka主题,并对在这些主题中看到的每一个行级事件做出反应。开云体育电动老虎机
数据库管开云体育电动老虎机理员通常启用疾病预防控制中心在一张桌子的生命中期。这意味着连接器没有对表所做的所有更改的完整历史。因此,当Db2连接器第一次连接到特定的Db2数据库时,它首先执行开云体育电动老虎机一致的快照每一个白名单表。在连接器完成快照之后,它继续从捕获模式下为表创建快照的确切位置对更改进行流式处理。通过这种方式,我们从所有数据的一致视图开始,然后继续读取,而不会丢失快照发生时所做的任何更改。
该连接器还具有故障容忍度。当连接器读取更改并产生事件时,它在数据库日志中记录位置(开云体育电动老虎机LSN /日志序列号)的疾病预防控制中心记录每个事件。如果连接器由于任何原因(包括通信故障、网络问题或崩溃)而停止,在重新启动时,它只是继续读取疾病预防控制中心上次结束的地方。这包括快照:如果在连接器停止时快照没有完成,那么在重新启动时它将开始一个新的快照。在同一个主题中混合多个快照时需要小心,因为可能会错过删除操作。
设置Db2
在使用Db2连接器监视提交到Db2数据库上的更改之前,首先启用开云体育电动老虎机疾病预防控制中心在被监控的数据库上。开云体育电动老虎机
我们建议使用我们提供的UDF函数。然而,这些只是为了方便和手动执行相同的命令使用db2控制命令也会有同样的效果。
在这里,我们假设debeiz -connector- Db2 /src/开云体育官方注册网址test/docker/ Db2 -cdc-docker的内容在运行Db2的机器$HOME/asncdctools/src中是可用的,并且用户已经以db2inst1用户登录。
首先在Db2服务器上编译这个函数,使用bldrtnc编译器
$ cd $HOME/asncdctools/src
$ ./bldrtn asncdc
如果数据库尚未运行,开云体育电动老虎机则启动数据库:
$ db2 start db < db NAME> .使用实例
我们需要确保元数据目录可以通过JDBC读取
$ cd $HOME/sqllib/bnd . sh
$ db2 bind db2schemaBND正在阻塞所有授予公共sqlerror继续
数据库需开云体育电动老虎机要最近备份过,以便ASN代理有一个最近的起始点来读取。如果不是这样,请执行以下操作。注意:这将修剪数据,以便只有最新的版本可用:
$ db2 backup db < db NAME>到< backup LOCATION>
如果不需要保留旧版本的数据,可以使用/dev/null作为位置。
$ db2 restart db < db NAME> .使用实例
安装UDF:
$ db2连接到
我们假设db2工具安装在db2inst1用户上
$ cp $HOME/asncdctools/src/asncdc $HOME/sqllib/function
$HOME/sqllib/函数
启用允许启动/停止ASN捕获代理的UDF
$ db2 -tvmf $HOME/asncdctools/src/asncdc_UDF.sql
创建ASN Control表
$ db2 -tvmf $HOME/asncdctools/src/asncdctables.sql
启用允许我们添加/删除要捕获的表的UDF
$ db2 -tvmf $HOME/asncdctools/src/asncdcaddremove.sql
完成上述工作后,我们可以使用udf通过SQL命令来控制ASN。有些udf需要返回值,在这种情况下我们使用SQL价值语句来调用它们,而其他的则是“即发即忘”,在这种情况下我们使用SQL调用声明。
首先需要启动ASN代理:
值ASNCDC.ASNCDCSERVICES(“开始”、“asncdc”);
如果需要停止代理:
值ASNCDC.ASNCDCSERVICES(“停止”、“asncdc”);
可随时查看的座席状态:
值ASNCDC.ASNCDCSERVICES(“状态”,“asncdc”);
MYSCHEMA中的表MYTABLE可以通过以下操作进入捕获模式:
ASNCDC打电话。ADDTABLE(“MYSCHEMA”、“MYTABLE”);
在MYSCHEMA中的表MYTABLE中,可以执行以下操作从捕获模式中删除:
ASNCDC打电话。REMOVETABLE(“MYSCHEMA”、“MYTABLE”);
在添加或删除表之后,用户必须重新初始化ASN服务:
值ASNCDC.ASNCDCSERVICES(“reinit”、“asncdc”);
Db2连接器是如何工作的
快照
Db2 ASN不是为存储数据库更改的完整历史而设计的。开云体育电动老虎机因此,Debezium有必要建立当前表内容的基线,并开云体育官方注册网址将其传输到Kafka。这是通过一个称为快照的过程来实现的。
默认为快照模式最初的)连接器将在第一次启动时执行初始化一致的快照桌子的。
每个快照由以下步骤组成:
从白名单/黑名单中确定需要快照的表
获得每个被监视表上的锁,以确保任何表都不会发生结构更改。锁的级别由
snapshot.isolation.mode
配置选项。读取服务器事务日志中的最大LSN(“日志序列号”)位置。
捕获所有相关表的结构。
可选地释放在步骤2中获得的锁,即锁通常只持有很短一段时间。
扫描步骤3中读取的LSN位置上有效的所有相关数据库开云体育电动老虎机表和模式,并生成一个
读
事件。然后将该事件写入相应的特定于表的Kafka主题,并使用最大LSN,即所有快照插入操作(即opcode='r'的操作)具有相同的LSN。在连接器偏移中记录快照的成功完成。
读取更改数据表
在第一次启动时,连接器获取所请求表结构的结构快照,并将此信息保存在其内部数据库历史主题中。开云体育电动老虎机然后连接器为每个源表标识一个更改表,并执行主循环
对于每个更改表,读取从上次存储的最大LSN到当前最大LSN之间创建的所有更改
根据提交LSN和更改LSN的顺序递增读取变化。这可以确保Debezium按照对数据库所做的更改的顺序重播更改。开云体育官方注册网址开云体育电动老虎机
将提交和更改LSNs作为偏移量传递给Kafka Connect。
存储最大LSN并重复循环。
重新启动后,连接器将从之前中断的偏移量(提交和更改LSNs)恢复。
连接器能够在运行时检测白名单源表的CDC是启用还是禁用,并修改其行为。
主题名称
Db2连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。卡夫卡主题的名称总是采用这种形式开云体育电动老虎机数据库名。schemaName。的表,在那里开云体育电动老虎机数据库名连接器的逻辑名称是否与开云体育电动老虎机database.server.name
配置属性,schemaName发生操作的模式名称和的表发生操作的数据库表的名称。开云体育电动老虎机
与SQL Server不同的是,连接器只能连接到来自一个Db2数据库的更改流。开云体育电动老虎机
例如,考虑一个带有my开云体育电动老虎机database
开云体育电动老虎机包含四个表的数据库:产品
,PRODUCTS_ON_HAND
,客户
,订单
在模式MYSCHEMA
那么连接器将在以下四个Kafka主题上产生事件:
my开云体育电动老虎机database.MYSCHEMA.PRODUCTS
my开云体育电动老虎机database.MYSCHEMA.PRODUCTS_ON_HAND
my开云体育电动老虎机database.MYSCHEMA.CUSTOMERS
my开云体育电动老虎机database.MYSCHEMA.ORDERS
事件
Db2连接器产生的所有数据更改事件都有一个键和一个值,尽管键和值的结构依赖于产生更改事件的表(参见主题名称).
Debe开云体育官方注册网址zium Db2连接器确保所有Kafka Connect模式名是有效的Avro模式名。这意味着逻辑服务器名必须以拉丁字母或下划线开头(例如,[a-z, a-z, _]),逻辑服务器名中的其余字符以及模式和表名中的所有字符必须是拉丁字母、数字或下划线(例如,[a-z, a-z, 0-9,\_])。如果不是,则所有无效字符将自动替换为下划线字符。 当数据库名、模式名和表名包含其他字符时,这可能导致意外的冲突,并且表全名之间唯一的区分字符无效开云体育电动老虎机,因此被下划线取代。此外,请注意,数据库、模式和表在Db2中是区分开云体育电动老虎机大小写的,这意味着不同的表可能映射到相同的Kafka主题。 |
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流,这些事件的结构可能会随着时间而改变。对于消费者来说,这可能很难处理,所以Kafka Connect让每个事件都是自包含的。每个消息键和值都有两部分模式和有效载荷。模式描述有效负载的结构,而有效负载包含实际数据。
更改事件键
对于给定的表,更改事件的键将具有一个结构,该结构在创建事件时包含表的主键(或唯一键约束)中的每一列的字段。
考虑一个客户
表中定义的my开云体育电动老虎机database
开云体育电动老虎机数据库的模式MYSCHEMA
:
CREATE TABLE customers (ID INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, FIRST_NAME VARCHAR(255) NOT NULL, LAST_NAME VARCHAR(255) NOT NULL, EMAIL VARCHAR(255) NOT NULL UNIQUE);
的开云体育电动老虎机database.server.name
配置属性为thenmy开云体育电动老虎机database
。的每一个变化事件客户
表,当它有这个定义时,将具有相同的键结构,在JSON中是这样的:
{"schema": {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "ID"}], "optional": false, "name": "mydataba开云体育电动老虎机se.MYSCHEMA.CUSTOMERS. "Key"}, "payload": {"ID": 1004}}
的模式
密钥部分包含Kafka Connect模式,描述密钥部分的内容。在这种情况下,它意味着有效载荷
值不是可选的,是由名为my开云体育电动老虎机database.MYSCHEMA.CUSTOMERS.Key
,并且有一个名为id
类型的int32
。如果你看键的值有效载荷
字段,你可以看到它确实是一个结构(在JSON中只是一个对象)id
字段,值为1004
。
因此,可以将此键解释为描述MYSCHEMA。客户
表(从连接器读取数据库的输出开云体育电动老虎机my开云体育电动老虎机database
),其id
主键列的值为1004
。
更改事件值
与消息键一样,更改事件消息的值具有模式节和有效载荷部分。Db2连接器产生的每个更改事件值的有效负载部分都有一个信封结构,使用以下字段:
人事处
必选字段,包含描述操作类型的字符串值。Db2连接器的值为c
对于创建(或插入),u
对于更新,d
对于删除,和r
用于读取(在快照的情况下)。之前
是否是可选字段,如果存在则包含行状态之前事件发生了。该结构将由my开云体育电动老虎机database.MYSCHEMA.CUSTOMERS.Value
Kafka连接模式,连接器从中读取my开云体育电动老虎机database
中的所有行MYSCHEMA。客户
表格后
是否是可选字段,如果存在则包含行状态后事件发生了。结构的描述是相同的my开云体育电动老虎机database.MYSCHEMA.CUSTOMERS.Value
中使用的Kafka Connect模式之前
。源
是一个强制性的字段,其中包含一个结构描述元数据的来源,而在Db2中包含这些字段:Debezium版本,连接器名称、事件是否进行的一个快照,提交LSN(不是在快照),的LSN变化,数据库,模式和表发生了改变,和一个时间戳,表示时间点记录时从源数据库中读取的连接器。开云体育官方注册网址开云体育电动老虎机ts_ms
是可选的,如果存在,则包含连接器处理事件的时间(使用运行Kafka Connect任务的JVM中的系统时钟)。
当然,还有模式事件消息值的一部分包含描述此信封结构及其内嵌套字段的模式。
创建事件
我们来看看a创建事件值可能看起来像客户
表:
{"schema": {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "ID"}, {"type": "string", "optional": false, "field": "FIRST_NAME"}, {"type": "string", "optional": false, "field": "LAST_NAME"}, {"type": "string", "optional": false, "field": "EMAIL"}], "optional": true, "name": "mydatabase.MYSCHEMA.CUSTOMERS. "开云体育电动老虎机值","field": "before"}, {"type": "int32", "optional": false, "field": "ID"}, {"type": "string", "optional": false, "field": "FIRST_NAME"}, {"type": "string", "optional": false, "field": "LAST_NAME"}, {"type": "string", "optional": false, "field": "EMAIL"}], "optional": true, "name": "mydatabase.MYSCHEMA.CUSTOMERS. "开云体育电动老虎机价值", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "string", "optional": true, "field": "change_lsn" }, { "type": "string", "optional": true, "field": "commit_lsn" }, ], "optional": false, "name": "io.debezium.connector.db2.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "mydatabase.MYSCHEMA.CUSTOMERS.Envelope" }, "payload": { "before": null, "after": { "ID": 1005, "FIRST_NAME": "john", "LAST_NAME": "doe", "EMAIL": "john.doe@example.org" }, "source": { "version": "1.1.2.Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559729468470, "snapshot": false, "db": "mydatabase", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000758:0003", "commit_lsn": "00000027:00000758:0005", }, "op": "c", "ts_ms": 1559729471739 } }
如果我们看模式
事件的一部分价值的模式信封的模式源
结构(特定于Db2连接器并在所有事件中重用),以及表特定的模式之前
和后
字段。
的模式名称 |
如果我们看有效载荷
事件的一部分价值,我们可以看到事件中的信息,即它正在描述创建的行(sinceop = c
),以及后
字段值包含新插入行的'ID
,FIRST_NAME
,LAST_NAME
,电子邮件
列。
事件的JSON表示形式似乎比它们描述的行要大得多。这是正确的,因为JSON表示必须包含模式和有效载荷部分信息。这是可能的,甚至建议使用Avro转换器以大幅减少写入Kafka主题的实际消息的大小。 |
更新事件
的值更新这个表上的Change事件实际上会有完全相同的结果模式,其有效负载的结构相同,但将持有不同的值。这里有一个例子:
{"schema":{…},“有效载荷”:{“前”:{" ID ": 1005年,“FIRST_NAME”:“约翰”,“LAST_NAME”:“母鹿”、“电子邮件”:“john.doe@example.org”},“后”:{" ID ": 1005年,“FIRST_NAME”:“约翰”,“LAST_NAME”:“母鹿”、“电子邮件”:“noreply@example.org”},“源”:{“版本”:“1.1.2。Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559729995937, "snapshot": false, "db": "my开云体育电动老虎机database", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000ac0:0002", "commit_lsn": "00000027:00000ac0:0007",}, "op": "u", "ts_ms": 1559729998706}}
当我们把这个和插入事件中,我们看到了一些不同有效载荷
部分:
的
人事处
字段值现在是u
,表示该行因更新而更改的
之前
字段现在拥有数据库提交前包含值的行状态开云体育电动老虎机的
后
字段现在已经更新了行状态,这里可以看到电子邮件
价值就是现在noreply@example.org
。的
源
字段结构具有与以前相同的字段,但是值不同,因为此事件来自事务日志中的不同位置。的
ts_ms
显示了Debezium处理此事件的时间戳。开云体育官方注册网址
通过观察,我们可以学到很多东西有效载荷
部分。我们可以比较之前
和后
结构来确定由于提交而在该行中实际更改的内容。的源
结构告诉我们关于此更改的Db2记录的信息(提供可跟踪性),但更重要的是,它提供了我们可以与本主题和其他主题中的其他事件进行比较的信息,以了解此事件是发生在之前、之后,还是作为与其他事件相同的Db2提交的一部分。
当一行的主键/唯一键的列更新时,行键的值也发生了变化,因此Debezium将输出开云体育官方注册网址三个事件: |
删除事件
到目前为止,我们已经看到了样本创建和更新事件。现在,我们来看看a的值删除事件。再一次,模式
的部分值将与创建和更新事件:
{"schema":{…}},“有效载荷”:{“前”:{" ID ": 1005年,“FIRST_NAME”:“约翰”,“LAST_NAME”:“母鹿”、“电子邮件”:“noreply@example.org”},“后”:空,“源”:{“版本”:“1.1.2。Final", "connector": "db2", "name": "myconnector", "ts_ms": 1559730445243, "snapshot": false, "db": "my开云体育电动老虎机database", "schema": "MYSCHEMA", "table": "CUSTOMERS", "change_lsn": "00000027:00000db0:0005", "commit_lsn": "00000027:00000db0:0007"}, "op": "d", "ts_ms": 1559730450205}}
如果我们看有效载荷
部分,我们看到了一些不同的比较创建或更新事件的有效载荷:
的
人事处
字段值现在是d
,表示该行已被删除的
之前
字段现在拥有在数据库提交时删除的行状态。开云体育电动老虎机的
后
字段为空,表示该行不再存在的
源
字段结构具有许多与以前相同的值,除了ts_ms
,commit_lsn
和change_lsn
领域发生了变化的
ts_ms
显示了Debezium处理此事件的时间戳。开云体育官方注册网址
此事件向使用者提供了用于处理删除该行的各种信息。
Db2连接器的事件设计用于处理Kafka对数压缩,只要每个键至少保留最近的消息,就可以删除一些旧消息。这允许Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。
删除一行时,删除上面列出的event值仍然适用于日志压缩,因为Kafka仍然可以删除所有使用相同键的早期消息。但仅当消息值为时零
卡夫卡会知道它可以移除吗所有消息用同样的钥匙。为了实现这一点,Debezium的Db2连接器开云体育官方注册网址总是遵循删除特别活动墓碑上具有相同but键的事件零
价值。
事务的元数据
开云体育官方注册网址Debezium可以生成表示事务元数据边界和丰富数据消息的事件。
事务边界
开云体育官方注册网址Debezium为每个事务生成事件开始
和结束
。每个事件包含
状态
-开始
或结束
id
唯一事务标识符的字符串表示event_count
(结束
事件)——事务发出的事件总数data_collections
(结束
事件)-对的数组data_collection
和event_count
它提供了由来自给定数据收集的更改所发出的事件数
下面是一条消息的示例:
{"status": "BEGIN", "id": "00000025:00000d08:0025", "event_count": null, "data_collections": null} {"status": "END", "id": "00000025:00000d08:0025", "event_count": 2, "data_collections": [{"data_collection": "testDB.dbo. dbo. dbo. dbo. dbo. dbo. dbo. dbo. dbo。tablea", "event_count": 1}, {"data_collection": "testDB.dbo. data_collection"。Tableb ", "event_count": 1}]}
事务事件被写入指定的主题<开云体育电动老虎机 database.server.name > .transaction
。
数据事件丰富
启用事务元数据时,数据消息信封
是充实了新的事务
字段。这个字段以复合字段的形式提供关于每个事件的信息:
id
唯一事务标识符的字符串表示total_order
-该事件在事务生成的所有事件中的绝对位置data_collection_order
-该事件在事务发出的所有事件中的每数据收集位置
下面是一条消息的示例:
{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…},“人事处”:“c”、“ts_ms”:“1580390884335”,“交易”:{" id ":“00000025:00000d08:0025”、“total_order”:“1”,“data_collection_order”:" 1 "}}
开云体育电动老虎机数据库模式演变
开云体育官方注册网址Debezium能够随时间捕获模式更改。由于CDC在Db2中的实现方式,有必要与数据库管理员合作,以确保Debezium连接器在模式更新时继续产生数据更改事件。开云体育官方注册网址开云体育电动老虎机
如前所述,Debezium使用Db2的变更数据捕获功能。开云体育官方注册网址这意味着Db2创建一个捕获表,其中包含在源表上执行的所有更改。不幸的是,捕获表是静态的,当源表结构发生变化时需要更新。这个更新不是由Debezium连接器本身完成的,而是必须由具有开云体育官方注册网址提升权限的管理员执行。
执行模式更改通常有两个过程:
cold -当Debezium停止时执行开云体育官方注册网址
热执行,而Debezium正在运行开云体育官方注册网址
两种方法都有各自的优点和缺点。
在这两种情况下,在对同一个源表进行新的模式更新之前完全执行过程是非常重要的。因此,建议在一个批处理中执行所有ddl,这样该过程只执行一次。 |
当对源表启用CDC时,并不支持所有模式更改。我们在这里注意到一些可能的影响:
如果源的结构发生了变化,建议我们:—在ASN注册表中将表标记为非活动的—重新启动ASN捕获服务(参见udf)—更新表的ASN表示(手动任务)—将表标记为活动的—重新启动ASN捕获服务(参见udf) |
冷模式更新
这是最安全的过程,但对于高可用性需求的应用程序可能不可行。管理员应遵循以下步骤:
挂起生成数据库记录的应用程序开云体育电动老虎机
等待Debeziu开云体育官方注册网址m流化所有未流化的更改
停止Deb开云体育官方注册网址ezium连接器
将所有更改应用到源表模式
在ASN注册表上将表标记为INACTIVE并重新控制ASN捕获服务(参见udf)
从ASN中删除旧的结构表
向ASN添加新的结构表
在ASN注册表上将表标记为ACTIVE,并重新控制ASN捕获服务(参见udf)
恢复申请
启动Debe开云体育官方注册网址zium连接器
热模式更新
热模式更新不需要应用程序和数据处理中的任何停机时间。这个过程本身也比冷模式更新简单得多。首先,我们考虑对源代码进行增量更改,例如在末尾添加一个新列:
锁定要更改的源表
在ASN注册表中将表标记为INACTIVE,并重新控制ASN捕获服务(参见udf)34。将所有更改应用到源表模式
将所有更改应用到ASN表模式
在ASN注册表上将表标记为ACTIVE,并重新控制ASN捕获服务(参见udf)
现在我们考虑对源代码的非增量更改,例如在中间添加一个新列:
锁定要更改的源表
在ASN注册表上将表标记为INACTIVE并重新控制ASN捕获服务(参见udf)
导出要更改的源表的数据
截断源表
修改源表
将导出的数据加载到更改后的源表中
导出ASN表的数据进行修改
截断ASN表
修改ASN表
将导出的数据加载到修改后的ASN表中
在ASN注册表上将表标记为INACTIVE并重新控制ASN捕获服务(参见udf)
数据类型
中描述了Db2数据类型的摘要数据类型。
如上所述,Db2连接器用事件表示行更改,事件的结构类似于行所在的表。事件包含每个列值的字段,该值在事件中的表示方式取决于列的SQL数据类型。本节描述此映射。
下表描述了连接器如何将每个Db2数据类型映射到文字类型和语义类型在事件字段中。在这里,文字类型描述了如何使用Kafka Connect模式类型逐字表示值,即INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,布尔
,字符串
,字节
,数组
,地图
,结构体
。的语义类型描述了Kafka Connect模式如何捕获意义该字段使用Kafka Connect模式的名称。
Db2数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
不含时区信息的时间戳字符串表示形式 |
|
|
|
|
|
|
|
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
没有时区信息的时间的字符串表示形式 |
|
|
|
不含时区信息的时间戳字符串表示形式 |
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
包含XML文档的字符串表示形式 |
其他数据类型映射将在下面几节中描述。
如果存在,列的默认值将被传播到相应字段的Kafka Connect模式。更改消息将包含字段的默认值(除非给出了显式的列值),因此很少需要从模式获取默认值。传递默认值有助于满足兼容性规则使用Avro与Confluent模式注册中心一起作为序列化格式。
时间值
除了Db2之外DATETIMEOFFSET
类型的值(包含时区信息),其他时态类型取决于time.precision.mode
配置属性。当time.precision.mode
配置属性设置为自适应
(默认值),那么连接器将根据列的数据类型定义确定时态类型的文字类型和语义类型,以便事件完全表示数据库中的值:开云体育电动老虎机
Db2数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示午夜过后的毫秒数,不包括时区信息。 |
|
|
|
表示午夜过后的微秒数,不包括时区信息。 |
|
|
|
表示午夜过后的纳秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示经过epoch的微秒数,不包括时区信息。 |
|
|
|
表示过去纪元的纳秒数,不包括时区信息。 |
当time.precision.mode
配置属性设置为连接
,那么连接器将使用预定义的Kafka Connect逻辑类型。当用户只知道内置的Kafka Connect逻辑类型而无法处理可变精度的时间值时,这可能很有用。另一方面,由于Db2支持十分之一微秒精度,由连接器生成的事件使用连接
时间精度模式导致精度的损失当数据库列具有开云体育电动老虎机分数秒精度取值大于3:
Db2数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示从午夜开始的毫秒数,不包括时区信息。Db2允许 |
|
|
|
表示自epoch以来的毫秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示自epoch以来的毫秒数,不包括时区信息。Db2允许 |
十进制值
Db2数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
的 |
|
|
|
的 |
|
|
|
的 |
|
|
|
的 |
部署连接器
如果您已经安装了动物园管理员,卡夫卡,卡夫卡连接,那么使用Debezium开云体育官方注册网址的Db2连接器就很容易了。首先下载连接器的插件存档,提取它,并将包含的目录添加到Kafka Connect的plugin.path
通过使用plugin.path
配置属性。
另外,由于许可原因需要另行获取Db2的JDBC驱动程序。将JDBC驱动程序JAR添加到带有Debezium Db2连接器JAR的目录中。开云体育官方注册网址重新启动Kafka Connect进程以获取新的连接器。
如果你喜欢不可变容器,那就试试吧开云体育官方注册网址Debezium的容器图像对于Zookeeper, Kafka和Kafka Connect的Db2连接器已经预安装并准备好了。你甚至可以在Ope开云体育官方注册网址nShift上运行Debezium。
要使用连接器为特定的Db2数据库或集群产生更改事件:开云体育电动老虎机
启用Db2上的CDC出版疾病预防控制中心数据库中的事件开云体育电动老虎机
创建一个Db2 Connector的配置文件并使用Kafka连接REST API将该连接器添加到Kafka Connect集群。
当连接器启动时,它将获取Db2数据库中模式的一致快照,并开始流化更改,为每个插入、更新和删除的行生成事件。开云体育电动老虎机您还可以选择为模式和表的一个子集生成事件。可选地忽略、屏蔽或截断敏感、过大或不需要的列。
示例配置
使用Db2连接器非常简单。下面是一个连接器实例的配置示例,该连接器实例监视位于192.168.99.100端口50000的Db2服务器(我们在逻辑上命名为192.168.99.100)fullfillment
:
{"name": "db2-connector",(1)"config": {"connector.class": "io.d开云体育官方注册网址ebezium.connector. db2connector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“50000”,(4)“开云体育电动老虎机数据库。user": "db2inst1",(5)“开云体育电动老虎机数据库。密码”:“密码!”(6)“开云体育电动老虎机数据库。dbname": "mydatabase",(7):开云体育电动老虎机“database.server.name fullfillment”,(8)”表。白名单”:“MYSCHEMA。客户”,(9)“开云体育电动老虎机database.history.kafka.bootstrap。服务器”:“卡夫卡:9092”,(10)“开云体育电动老虎机database.history.kafka。主题:“dbhistory.fullfillment”(11)}}
1 | 当我们向Kafka Connect服务注册连接器时,连接器的名称。 |
2 | 这个Db2连接器类的名称。 |
3. | Db2实例的地址。 |
4 | Db2实例的端口号。 |
5 | Db2用户的名称 |
6 | Db2用户的密码 |
7 | 要从中捕获更改的数据库的名称开云体育电动老虎机 |
8 | Db2实例/集群的逻辑名称,它形成了一个名称空间,并且在连接器写入的所有Kafka主题的名称、Kafka Connect模式名称以及相应Avro模式的名称空间中使用Avro连接器使用。 |
9 | Debezium应该捕获其更改的所有表的列表开云体育官方注册网址 |
10 | 这个连接器将使用的Kafka代理列表,用于将DDL语句写入并恢复到数据库历史主题。开云体育电动老虎机 |
11 | 连接器将在其中写入和恢复DDL语开云体育电动老虎机句的数据库历史主题的名称。此主题仅供内部使用,消费者不应使用。 |
看到连接器属性的完整列表可以在这些配置中指定。
该配置可以通过POST发送到正在运行的Kafka Connect服务,然后该服务将记录配置并启动连接到Db2数据库的一个连接器任务,读取事务日志,并将事件记录到Kafka主题。开云体育电动老虎机
监控
除了Zo开云体育官方注册网址okeeper、Kafka和Kafka Connect对JMX指标的内置支持外,Debezium Db2连接器还有三种指标类型。
详情请参阅监控文档了解如何通过JMX公开这些指标的详细信息。
快照指标
的MBean是开云体育官方注册网址debezium.db2: type = connector-metrics上下文=快照,server =<开云体育电动老虎机 database.server.name >
。
属性名称 | 类型 | 描述 |
---|---|---|
|
|
连接器读取的最后一个快照事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
快照中包含的表的总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否启动。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。 |
|
|
映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。 |
流指标
的MBean是开云体育官方注册网址debezium.db2: type = connector-metrics、上下文=流媒体服务器=<开云体育电动老虎机 database.server.name >
。
属性名称 | 类型 | 描述 |
---|---|---|
|
|
连接器读取的最后一个流事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机 |
|
|
从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和Debezium连接器的机器上的时钟之间的任何差异。开云体育官方注册网址开云体育电动老虎机 |
|
|
提交的已处理事务的数量。 |
|
|
上次接收事件的坐标。 |
|
|
最后处理的事务的事务标识符。 |
架构历史指标
的MBean是开云体育官方注册网址debezium.db2: type = connector-metrics、上下文= schema-history服务器=<开云体育电动老虎机 database.server.name >
。
属性名称 | 类型 | 描述 |
---|---|---|
|
|
之一 |
|
|
恢复开始的时间(以epoch秒为单位)。 |
|
|
在恢复阶段读取的更改数。 |
|
|
在恢复和运行时期间应用的模式更改总数。 |
|
|
自上次更改从历史存储区恢复以来所经过的毫秒数。 |
|
|
自应用最后一次更改以来所经过的毫秒数。 |
|
|
从历史存储中恢复的最后一次更改的字符串表示形式。 |
|
|
最后应用的更改的字符串表示形式。 |
连接器属性
以下配置属性为要求除非有默认值可用。
财产 | 默认的 | 描述 |
---|---|---|
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。) |
||
连接器的Java类的名称。始终使用值 |
||
|
应该为此连接器创建的最大任务数。Db2连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。 |
|
Db2数据库服务器的IP地址或主机名。开云体育电动老虎机 |
||
|
Db2数据库服务器的整型端口号。开云体育电动老虎机 |
|
连接到Db2数据库服务器时使用的用户名。开云体育电动老虎机 |
||
连接到Db2数据库服务器时使用的密码。开云体育电动老虎机 |
||
用于传输更改的Db2数据库的名称开云体育电动老虎机 |
||
逻辑名称,用于标识并提供被监视的特定Db2数据库服务器的名称空间。开云体育电动老虎机逻辑名在所有其他连接器中应该是唯一的,因为它被用作从该连接器发出的所有Kafka主题名的前缀。只能使用字母数字字符和下划线。 |
||
Kafka主题的全称,连接器将在其中存储数据库模式历史。开云体育电动老虎机 |
||
一个主机/端口对列表,连接器将使用它建立到Kafka集群的初始连接。此连接将用于检索以前由连接器存储的数据库模式历史,并用于写入从源数据库读取的每个DDL语开云体育电动老虎机句。这应该指向Kafka Connect进程使用的相同的Kafka集群。 |
||
一个可选的逗号分隔的正则表达式列表,它匹配要监控的表的完全限定表标识符;任何不在白名单中的表都将被排除在监控之外。每个标识符都是这样的schemaName。的表。默认情况下,连接器将监视每个被监视模式中的每个非系统表。不得与 |
||
一个可选的逗号分隔的正则表达式列表,它与要排除在监视之外的表的完全限定表标识符匹配;任何不在黑名单中的表都将被监控。每个标识符都是这样的schemaName。的表。不得与 |
||
空字符串 |
一个以逗号分隔的可选正则表达式列表,该列表与应从更改事件消息值中排除的列的完全限定名称匹配。列的完全限定名的格式为schemaName。的表。columnName。请注意,主键列总是包含在事件的键中,如果从值中列入黑名单也一样。 |
|
|
时间、日期和时间戳可以用不同的精度表示,包括: |
|
|
控制是否在删除事件之后生成墓碑事件。 |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与基于字符的列的完全限定名匹配,如果字段值长于指定的字符数,则这些列的值应在更改事件消息值中被截断。可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数。列的完全限定名的格式为schemaName。的表。columnName。 |
|
N/A |
一个以逗号分隔的可选正则表达式列表,该列表与基于字符的列的完全限定名称匹配,这些列的值应在更改事件消息值中被替换为由指定数量的星号( |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与列的完全限定名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模式中。模式参数 |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与特定于数据库的数据类型名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模开云体育电动老虎机式中。模式参数 |
|
空字符串 |
与完全限定的表和列匹配以映射主键的正则表达式的分号列表。 |
以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。
财产 | 默认的 | 描述 |
---|---|---|
最初的 |
一种对所捕获表的结构和可选数据进行初始快照的模式。支持的值为最初的(将捕获表的结构和数据的快照;如果主题应该用捕获表中的数据的完整表示来填充,则很有用)和schema_only(只对捕获的表的结构进行快照;有用的是,从现在开始发生的变化应该传播到主题)。快照完成后,连接器将继续从数据库的重做日志中读取更改事件。开云体育电动老虎机 |
|
repeatable_read |
模式,以控制使用哪种事务隔离级别以及连接器锁定受监控表的时间。有四个可能的值: 这是值得记录的 另一个方面是数据一致性。只有 |
|
|
指定连接器在事件处理期间应如何对异常作出反应。 |
|
|
正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|
|
正整数值,指定阻塞队列的最大大小,从数据库日志中读取的更改事件在写入Kafka之前被放置在其中。开云体育电动老虎机例如,当写入Kafka较慢或Kafka不可用时,该队列可以为CDC表阅读器提供反压力。出现在队列中的事件不包括在此连接器定期记录的偏移量中。属性中指定的最大批处理大小,默认值为8192 |
|
|
正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。 |
|
|
控制心跳消息的发送频率。 |
|
|
控制要向其发送心跳消息的主题的命名。 |
|
连接器启动后在快照之前应该等待的间隔(以毫秒为单位); |
||
|
指定在进行快照时应一次性从每个表读取的最大行数。连接器将以这个大小的多个批次读取表内容。默认为2000。 |
|
|
整数值,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果在此时间间隔内无法获取表锁,快照将失败(另请参阅快照). |
|
控制快照中将包括表中的哪些行。 |
||
|
是否对字段名进行消毒以符合Avro命名要求。看到Avro命名欲知详情。 |
|
|
当设置为 看到事务的元数据更多细节。 |
连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。属性开头的所有连接器配置属性开云体育电动老虎机database.history.producer。
在创建写入数据库历史的Kafka生产者时使用前缀(不带前缀),以及所有以前缀开头的生产者开云体育电动老虎机开云体育电动老虎机database.history.consumer。
在创建Kafka消费者时使用(没有前缀),在连接器启动时读取数据库历史。开云体育电动老虎机
例如,可以使用以下连接器配置属性安全连接到Kafka代理:
除了直通到Kafka生产者和消费者,属性从开云体育电动老虎机数据库。
,如。开云体育电动老虎机database.applicationName 开云体育官方注册网址= debezium
传递给JDBC URL。
开云体育电动老虎机database.history.producer.security。协议SSL databas开云体育电动老虎机e.history.producer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.producer.ssl.keystore。密码= test1234 datab开云体育电动老虎机ase.history.producer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks 开云体育电动老虎机database.history.producer.ssl.truststore。密码= test1234 datab开云体育电动老虎机ase.history.producer.ssl.key。密码= test1234 datab开云体育电动老虎机ase.history.consumer.security。协议SSL databas开云体育电动老虎机e.history.consumer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.consumer.ssl.keystore。密码= test1234 datab开云体育电动老虎机ase.history.consumer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks 开云体育电动老虎机database.history.consumer.ssl.truststore。密码= test1234 datab开云体育电动老虎机ase.history.consumer.ssl.key.password = test1234