开云体育官方注册网址Debezium连接器用于SQL Server
想帮助我们进一步磨练和改进它吗?学习如何. |
开云体育官方注册网址Debezium的SQL Server Connector可以监视和记录SQL Server数据库模式中的行级更改。开云体育电动老虎机
当它第一次连接到SQL Server数据库/集群时,它会读取所有模式的一致快照。开云体育电动老虎机当快照完成时,连接器将连续地传输提交给SQL Server的更改,并生成相应的插入、更新和删除事件。每个表的所有事件都记录在一个单独的Kafka主题中,应用程序和服务可以很容易地使用它们。
概述
连接器的功能是基于变更数据捕获SQL Server Standard提供的功能(自SQL Server 2016 SP1)或企业版。使用这种机制,SQL Server捕获过程监视用户感兴趣的所有数据库和表,并将更改存储到专门创建的数据库中开云体育电动老虎机疾病预防控制中心具有存储过程外观的表。
数据库操开云体育电动老虎机作符必须启用疾病预防控制中心对于连接器应该捕获的表。连接器然后生成一个更改事件控件发布的每个行级插入、更新和删除操作美国疾病控制与预防中心的API,在一个单独的Kafka主题中记录每个表的所有更改事件。客户端应用程序读取对应于它们感兴趣的数据库表的Kafka主题,并对在这些主题中看到的每一个行级事件做出反应。开云体育电动老虎机
数据库操开云体育电动老虎机作符通常启用疾病预防控制中心在数据库的生命周期中期,表。开云体育电动老虎机这意味着连接器不具有对数据库所做的所有更改的完整历史。开云体育电动老虎机因此,当SQL Server连接器第一次连接到特定的SQL Server数据库时,它首先执行开云体育电动老虎机一致的快照每个数据库模式的。开云体育电动老虎机在连接器完成快照之后,它继续从创建快照的确切位置开始流化更改。通过这种方式,我们从所有数据的一致视图开始,然后继续读取,而不会丢失快照发生时所做的任何更改。
该连接器还具有故障容忍度。当连接器读取更改并产生事件时,它在数据库日志中记录位置(开云体育电动老虎机LSN /日志序列号),与疾病预防控制中心记录每一个事件。如果连接器由于任何原因(包括通信故障、网络问题或崩溃)而停止,在重新启动时,它只是继续读取疾病预防控制中心上次结束的地方。这包括快照:如果在连接器停止时快照没有完成,那么在重新启动时它将开始一个新的快照。
设置SQL Server
在使用SQL Server连接器监视SQL Server上提交的更改之前,首先启用疾病预防控制中心在被监控的数据库上。开云体育电动老虎机请记住这一点疾病预防控制中心属性不能启用主
开云体育电动老虎机数据库。
——====——为CDC模板启用数据开云体育电动老虎机库——==== USE MyDB GO EXEC sys. exesp_cdc_enable_db去
然后启用疾病预防控制中心对于计划监视的每个表。
——====——启用指定文件组选项模板的表——==== USE MyDB GO EXEC sys. xmlsp_cdc_enable_table @source_schema = N'dbo', @source_name = N' mytable ', @role_name = N' myrole ', @filegroup_name = N' mydb_ct ', @supports_net_changes = 0 GO
验证用户是否有权访问疾病预防控制中心表格
——====——验证连接器的用户有访问权限,这个查询不应该有空结果——==== EXEC sys. exesp_cdc_help_change_data_capture去
如果结果为空,则请确保用户具有访问捕获实例和捕获实例的权限疾病预防控制中心表。
SQL Server连接器如何工作
快照
SQL Server CDC不是为存储数据库更改的完整历史而设计的。开云体育电动老虎机因此,Debezium有必要建立当前数据库内容的基线开云体育官方注册网址,并将其传输到Kafka。开云体育电动老虎机这是通过一个称为快照的过程来实现的。
默认为快照模式最初的)连接器将在第一次启动时执行初始化一致的快照数据库(意味着开云体育电动老虎机根据连接器的筛选器配置捕获的任何表中的结构和数据)。
每个快照由以下步骤组成:
确定要捕获的表
获得每个被监视表上的锁,以确保任何表都不会发生结构更改。锁的级别由
snapshot.isolation.mode
配置选项。读取服务器事务日志中的最大LSN(“日志序列号”)位置。
捕获所有相关表的结构。
可选地释放在步骤2中获得的锁,即锁通常只持有很短一段时间。
扫描步骤3中读取的LSN位置上有效的所有相关数据库开云体育电动老虎机表和模式,并生成一个
读
事件,并将该事件写入相应的特定于表的Kafka主题。在连接器偏移中记录快照的成功完成。
读取更改数据表
在第一次启动时,连接器获取捕获的表的结构快照,并将此信息保存在其内部数据库历史主题中。开云体育电动老虎机然后连接器为每个源表标识一个更改表,并执行主循环
对于每个更改表,读取从上次存储的最大LSN到当前最大LSN之间创建的所有更改
根据提交LSN和更改LSN的顺序递增读取变化。这可以确保Debezium按照对数据库所做的更改的顺序重播这些更改。开云体育官方注册网址开云体育电动老虎机
将提交和更改LSNs作为偏移量传递给Kafka Connect。
存储最大LSN并重复循环。
重新启动后,连接器将从之前中断的偏移量(提交和更改LSNs)恢复。
连接器能够检测CDC是否为白名单源表启用或禁用,并调整其行为。
主题名称
SQL Server连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。卡夫卡主题的名称总是采用这种形式serverName.schemaName.的表,在那里serverName连接器的逻辑名称是否与开云体育电动老虎机database.server.name
配置属性,schemaName发生操作的模式名称和的表发生操作的数据库表的名称。开云体育电动老虎机
例如,考虑一个SQL Server安装库存
开云体育电动老虎机包含四个表的数据库:产品
,products_on_hand
,客户
,订单
在模式dbo
.如果给监视此数据库的连接器一个逻辑服务器名开云体育电动老虎机实现
,那么连接器将在这四个Kafka主题上产生事件:
fulfillment.dbo.products
fulfillment.dbo.products_on_hand
fulfillment.dbo.customers
fulfillment.dbo.orders
架构更改主题
对于启用了CDC的表,Debezium SQL Server连接器将模式更改的历史记开云体育官方注册网址录存储在数据库历史主题中。开云体育电动老虎机这个主题反映了一个内部连接器状态,您不应该使用它。如果应用程序需要跟踪模式更改,则有一个公共模式更改主题。模式更改主题的名称与连接器配置中指定的逻辑服务器名称相同。
连接器向其模式更改主题发出的消息格式处于酝酿状态,可以在不通知的情况下更改。 |
开云体育官方注册网址Debezium在以下情况下向模式更改主题发出一条消息:
为表启用CDC。
为一个表禁用CDC。
方法更改已启用CDC的表的结构模式演化过程.
模式更改主题的消息包含表模式的逻辑表示,例如:
{"schema":{…}, "payload": {"source": {"version": "1.2.5. "Final", "connector": "sqlserver", "name": "server1", "ts_ms": 1588252618953, "snapshot": "true", "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": null, "commit_lsn": "00000025:00000d98:00a2", "event_serial_no": null}, "d开云体育电动老虎机atabaseName": "testDB",(1)"schemaName": "dbo", "ddl": null,(2)“tableChanges”:((3){"type": "CREATE",(4)“id”:\“\”,\“testDB dbo \“\”,\“客户,(5)"表":{(6)"defaultCharsetName": null, "primaryKeyColumnNames": [(7)"id"], "columns": [(8){"name": "id", "jdbcType": 4, "nativeType": null, "typeName": "int身份","typeExpression": "int身份","charsetName": null, "length": 10, "scale": 0, "position": 1, "optional": false, "autoIncremented": false, "generated": false}, {"name": "first_name", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false}, {"name": "int身份","typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": false}, {"name":"last_name", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 3, "optional": false, "autoIncremented": false, "generated": false}, {"name": "email", "jdbcType": 12, "nativeType": null, "typeName": "varchar", "typeExpression": "varchar", "charsetName": null, "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false}]}}}}}}}}}}}}}}}}}}}}}}}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
标识包含更改的数据库和模式。开云体育电动老虎机 |
2 |
|
总是 |
3. |
|
包含DDL命令生成的模式更改的一个或多个项的数组。 |
4 |
|
描述变化的类型。取值为:
|
5 |
|
创建、修改或删除的表的完整标识符。 |
6 |
|
表示应用更改后的表元数据。 |
7 |
|
组成表主键的列列表。 |
8 |
|
已更改表中每列的元数据。 |
在发给模式更改主题的消息中,键是包含模式更改的数据库的名称。开云体育电动老虎机在下面的示例中,使用有效载荷
字段包含键:
{"schema": {"type": "struct", "fields": [{"type": "string", "optional": false, "field": "data开云体育电动老虎机baseName"}], "optional": false, "name": "io. debez开云体育官方注册网址um .connector.sqlserver. schemachangekey "}, "payload": {"databaseName": "testDB"}}
更改数据事件
Debe开云体育官方注册网址zium SQL Server连接器为每个行级生成一个数据更改事件插入
,更新
,删除
操作。每个事件包含一个键和一个值。键和值的结构取决于所更改的表。
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。
下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式
字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器,并配置它来生成所有四个基本的变更事件部分,则变更事件具有以下结构:
{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
项 | 字段名 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3. |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始表相同。看到主题名称.
SQL Server连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名中的每个剩余字符以及数据库名和表名中的每个字符必须是拉丁字母、数字或下划线,即a-z、a-z、0-9或\_。开云体育电动老虎机如果存在无效字符,则将其替换为下划线字符。 如果逻辑服务器名、数据库名或表名包含无效字符,并且区分名称之间的唯一字符无效,因此用下划线替换,则可能导致意外冲突。开云体育电动老虎机 |
更改事件键
更改事件的键包含已更改表的键和已更改行的实际键的模式。在连接器创建事件时,模式及其对应的有效负载都为已更改表的主键(或唯一键约束)中的每一列包含一个字段。
考虑以下几点客户
表,后面是此表的更改事件键的示例。
CREATE TABLE customers (id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE);
对象的更改的每个更改事件客户
表具有相同的事件键模式。只要客户
表具有前面的定义,则捕获对的更改的每个更改事件客户
table有如下的键结构,在JSON中是这样的:
{"schema": {(1)"type": "struct", "fields": [(2){“类型”:“int32”、“可选”:假的,“场”:“id”}],“可选”:假的,(3)“名称”:“server1.dbo.customers.Key”(4)}, "有效载荷":{(5)id: 1004}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容 |
2 |
|
属性中期望的每个字段 |
3. |
|
指示事件键是否必须在其中包含值 |
4 |
|
定义键的有效负载结构的模式的名称。此模式描述已更改表的主键的结构。关键模式名具有该格式connector-name.开云体育电动老虎机数据库模式名.表名.
|
5 |
|
包含为其生成此更改事件的行的键。在本例中,键包含一个单键 |
虽然 |
如果表没有主键或唯一键,则更改事件的键为空。这是有意义的,因为没有主键或唯一键约束的表中的行不能被唯一标识。 |
更改事件值
change事件中的值比键稍微复杂一些。和键一样,值也有模式
Section和a有效载荷
部分。的模式
类的模式信封
的结构有效载荷
节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。
考虑用于显示更改事件键示例的同一个示例表:
CREATE TABLE customers (id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE);
该表的更改的更改事件的值部分将针对每种事件类型进行描述。
创建事件
对象中创建数据的操作所生成的更改事件的值部分,示例如下客户
表:
{"schema": {(1)“类型”:“结构”、“字段”:[{“类型”:“结构”、“字段”:[{“类型”:“int32”、“可选”:假的,“场”:“id”},{“类型”:“弦”、“可选”:假的,“场”:“first_name”},{“类型”:“弦”、“可选”:假的,“场”:“last_name”},{“类型”:“弦”、“可选”:假的,“场”:“电子邮件”}],“可选”:真的,“名字”:“server1.dbo.customers.Value”,(2)“字段”:“之前”},{“类型”:“结构”、“字段”:[{“类型”:“int32”、“可选”:假的,“场”:“id”},{“类型”:“弦”、“可选”:假的,“场”:“first_name”},{“类型”:“弦”、“可选”:假的,“场”:“last_name”},{“类型”:“弦”、“可选”:假的,“场”:“电子邮件”}],“可选”:真的,“名字”:“server1.dbo.customers。价值", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "string", "optional": true, "field": "change_lsn" }, { "type": "string", "optional": true, "field": "commit_lsn" }, { "type": "int64", "optional": true, "field": "event_serial_no" } ], "optional": false, "name": "io.debezium.connector.sqlserver.Source",(3)"field": "source"}, {"type": "string", "optional": false, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "server1.dbo.customers.Envelope"(4)}, "有效载荷":{(5)“之前”:空,(6)"后":{(7)“id”:1005年,“first_name”:“约翰”,“last_name”:“母鹿”、“电子邮件”:“john.doe@example.org”},“源”:{(8)“版本”:“1.2.5。Final", "connector": sqlserver, "name": server1, "ts_ms": 1559729468470, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000758:0003", "commit_lsn": "00000027:00000758:0005", "event_serial_no": "1"}, "op": "c",(9)“ts_ms”:1559729471739(10)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
值的模式,它描述值的有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值模式都是相同的。 |
2 |
|
在 |
3. |
|
|
4 |
|
|
5 |
|
该值为实际数据。这是变更事件提供的信息。 |
6 |
|
可选字段,指定事件发生之前行的状态。当 |
7 |
|
可选字段,指定事件发生后行的状态。在本例中, |
8 |
|
描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:
|
9 |
|
返回string,描述导致连接器产生事件的操作类型。在这个例子中,
|
10 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
更新事件
样例中更新的更改事件的值客户
表的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。中的更新中连接器生成的事件中的更改事件值的示例客户
表:
{"schema":{…}, "payload": {"before": {(1)“id”:1005年,“first_name”:“约翰”,“last_name”:“母鹿”、“电子邮件”:“john.doe@example.org”},“后”:{(2)“id”:1005年,“first_name”:“约翰”,“last_name”:“母鹿”、“电子邮件”:“noreply@example.org”},“源”:{(3)“版本”:“1.2.5。Final", "connector": sqlserver", "name": server1", "ts_ms": 1559729995937, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000ac0:0002", "commit_lsn": "00000027:00000ac0:0007", "event_serial_no": "2"}, "op": "u",(4)“ts_ms”:1559729998706(5)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
可选字段,指定事件发生之前行的状态。在一个更新事件值,则 |
2 |
|
可选字段,指定事件发生后行的状态。你可以比较 |
3. |
|
描述事件的源元数据的必填字段。的
的
|
4 |
|
返回string,描述操作类型。在一个更新事件值,则 |
5 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
更新一行的主键/唯一键的列将更改该行键的值。当一个键改变时,Debezium输出开云体育官方注册网址三个事件:删除事件和墓碑上的事件用旧键表示行,后面跟着创建事件,使用该行的新键。 |
删除事件
的值删除改变事件有相同之处模式
部分为创建而且更新同一表的事件。的有效载荷
的一部分删除事件。客户
表是这样的:
{"schema":{…}},“有效载荷”:{“前”:{< >“id”:1005年,“first_name”:“约翰”,“last_name”:“母鹿”、“电子邮件”:“noreply@example.org”},“后”:空,(2)“源”:{(3)“版本”:“1.2.5。Final", "connector": sqlserver", "name": server1", "ts_ms": 1559730445243, "snapshot": false, "db": "testDB", "schema": "dbo", "table": "customers", "change_lsn": "00000027:00000db0:0005", "commit_lsn": "00000027:00000db0:0007", "event_serial_no": "1"}, "op": "d",(4)“ts_ms”:1559730450205(5)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
可选字段,指定事件发生之前行的状态。在一个删除事件值,则 |
2 |
|
可选字段,指定事件发生后行的状态。在一个删除事件值,则 |
3. |
|
描述事件的源元数据的必填字段。在一个删除事件值,则
|
4 |
|
返回string,描述操作类型。的 |
5 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
SQL Server连接器事件设计用于处理Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。
删除一行时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有相同键的消息,消息值必须为零
.为了实现这一点,在Debezium的SQL Server连开云体育官方注册网址接器发出一个删除事件时,连接器发出一个特殊的墓碑事件,该事件具有相同的键,但具有零
价值。
事务的元数据
开云体育官方注册网址Debezium可以生成表示事务元数据边界和丰富数据消息的事件。
事务边界
开云体育官方注册网址Debezium为每个事务生成事件开始
而且结束
.每个事件包含
状态
-开始
或结束
id
唯一事务标识符的字符串表示event_count
(结束
事件)——事务发出的事件总数data_collections
(结束
事件)-对的数组data_collection
而且event_count
它提供了由来自给定数据收集的更改所发出的事件数
下面是一条消息的示例:
{"status": "BEGIN", "id": "00000025:00000d08:0025", "event_count": null, "data_collections": null} {"status": "END", "id": "00000025:00000d08:0025", "event_count": 2, "data_collections": [{"data_collection": "testDB.dbo. dbo. dbo. dbo. dbo. dbo. dbo. dbo. dbo。tablea", "event_count": 1}, {"data_collection": "testDB.dbo. data_collection"。Tableb ", "event_count": 1}]}
事务事件被写入指定的主题<开云体育电动老虎机 database.server.name > .transaction
.
数据事件丰富
启用事务元数据时,数据消息信封
是充实了新的事务
字段。这个字段以复合字段的形式提供关于每个事件的信息:
id
唯一事务标识符的字符串表示total_order
-该事件在事务生成的所有事件中的绝对位置data_collection_order
-该事件在事务发出的所有事件中的每数据收集位置
下面是一条消息的示例:
{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…},“人事处”:“c”、“ts_ms”:“1580390884335”,“交易”:{" id ":“00000025:00000d08:0025”、“total_order”:“1”,“data_collection_order”:" 1 "}}
开云体育电动老虎机数据库模式演变
开云体育官方注册网址Debezium能够随时间捕获模式更改。由于CDC在SQL Server中的实现方式,有必要与数据库操作员合作,以确保连接器在模式更新时继续产生数据更改事件。开云体育电动老虎机
如前所述,Debezium使用SQL Server的变更数据捕获功开云体育官方注册网址能。这意味着SQL Server创建一个捕获表,其中包含在源表上执行的所有更改。不幸的是,捕获表是静态的,当源表结构发生变化时需要更新。此更新不是由连接器本身完成的,而是必须由具有提升权限的操作符执行。
执行模式更改通常有两个过程:
cold -当Debezium停止时执行开云体育官方注册网址
热执行,而Debezium正在运行开云体育官方注册网址
两种方法都有各自的优点和缺点。
在这两种情况下,在对同一个源表进行新的模式更新之前完全执行过程是非常重要的。因此,建议在一个批处理中执行所有ddl,这样该过程只执行一次。 |
当对源表启用CDC时,并不支持所有模式更改。其中一个被识别的异常是重命名列或更改其类型,SQL Server将不允许执行该操作。 |
虽然SQL Server的CDC机制本身并不需要,但是在更改列时必须创建一个新的捕获实例 |
冷模式更新
这是最安全的过程,但对于高可用性需求的应用程序可能不可行。操作符应该遵循这个步骤序列
挂起生成数据库记录的应用程序开云体育电动老虎机
等待Debeziu开云体育官方注册网址m流化所有未流化的更改
停止连接器
将所有更改应用到源表模式
为更新源表创建一个新的捕获表
sys.sp_cdc_enable_table
过程,其参数值为唯一值@capture_instance
恢复申请
启动连接器
当Debe开云体育官方注册网址zium从新的捕获表开始流时,可以删除旧的捕获表
sys.sp_cdc_disable_table
带参数的存储过程@capture_instance
设置为旧的捕获实例名称
热模式更新
热模式更新不需要应用程序和数据处理中的任何停机时间。这个过程本身也比冷模式更新简单得多
将所有更改应用到源表模式
为更新源表创建一个新的捕获表
sys.sp_cdc_enable_table
过程,其参数值为唯一值@capture_instance
当Debe开云体育官方注册网址zium从新的捕获表开始流时,可以删除旧的捕获表
sys.sp_cdc_disable_table
带参数的存储过程@capture_instance
设置为旧的捕获实例名称
热模式更新有一个缺点。在数据库模式更新和创建新的捕获实例之间有一段时间。开云体育电动老虎机在此期间将到达的所有更改都由具有旧结构的旧实例捕获。例如,这意味着对于新添加的列,在此期间产生的任何更改事件都不会包含该新列的字段。如果您的应用程序不允许这样的过渡期,我们建议您进行冷模式更新。
例子
让我们部署基于SQL Server的开云体育官方注册网址Debezium教程演示热模式更新。
在本例中,是列phone_number
添加到客户
表格
#启动数据库shell 开云体育电动老虎机docker-compose -f docker-compose-sqlserver。yaml exec sqlserver bash -c '/opt/mssql-tools/bin/ sqlmd -U sa -P $SA_PASSWORD -d testDB'
——修改源表模式ALTER table customers ADD phone_number VARCHAR(32);——创建新的捕获实例EXEC sys。sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2';Insert INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456');去
Kafka连接日志将包含以下消息:
connect_1 | 2019-01-17 10:11:14,924信息| |多个捕获实例呈现相同的表:“dbo_customers”[sourceTableId = testDB.dbo捕获实例。客户,changeTableId = testDB.cdc。dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] connect_1 | 2019-01-17 10:11:14,924 INFO || Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] ... connect_1 | 2019-01-17 10:11:33,719 INFO || Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
最终,在写入Kafka主题的消息的模式和值中会有一个新的字段。
...{“类型”:“弦”、“可选”:真的,“场”:“phone_number”}…“后”:{" id ": 1005年,“first_name”:“约翰”,“last_name”:“母鹿”、“电子邮件”:“john.doe@example.com”,“phone_number”:“+ 1-555-123456”},
——删除旧的捕获实例EXEC sys。Sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers';去
数据类型
如上所述,SQL Server连接器用事件表示行更改,事件的结构类似于行所在的表。事件包含每个列值的字段,该值在事件中的表示方式取决于列的SQL数据类型。本节描述此映射。
下表描述了连接器如何将每个SQL Server数据类型映射到文字类型而且语义类型在事件字段中。在这里,文字类型描述了如何使用Kafka Connect模式类型逐字表示值,即INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,布尔
,字符串
,字节
,数组
,地图
,结构体
.的语义类型描述了Kafka Connect模式如何捕获意义该字段使用Kafka Connect模式的名称。
SQL Server数据类型 |
文字类型(模式类型) |
语义类型(模式名) |
笔记 |
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
包含XML文档的字符串表示形式 |
|
|
|
带有时区信息的时间戳的字符串表示形式,其中时区为GMT |
其他数据类型映射将在下面几节中描述。
如果存在,列的默认值将被传播到相应字段的Kafka Connect模式。更改消息将包含字段的默认值(除非给出了显式的列值),因此很少需要从模式获取默认值。传递默认值有助于满足兼容性规则使用Avro与Confluent模式注册中心一起作为序列化格式。
时间值
除了SQL Server之外DATETIMEOFFSET
类型的值(包含时区信息),其他时态类型取决于time.precision.mode
配置属性。当time.precision.mode
配置属性设置为自适应
(默认值),那么连接器将根据列的数据类型定义确定时态类型的文字类型和语义类型,以便事件完全表示数据库中的值:开云体育电动老虎机
SQL Server数据类型 |
文字类型(模式类型) |
语义类型(模式名) |
笔记 |
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示午夜过后的毫秒数,不包括时区信息。 |
|
|
|
表示午夜过后的微秒数,不包括时区信息。 |
|
|
|
表示午夜过后的纳秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示经过epoch的微秒数,不包括时区信息。 |
|
|
|
表示过去纪元的纳秒数,不包括时区信息。 |
当time.precision.mode
配置属性设置为连接
,那么连接器将使用预定义的Kafka Connect逻辑类型。当用户只知道内置的Kafka Connect逻辑类型而无法处理可变精度的时间值时,这可能很有用。另一方面,由于SQL Server支持十分之一微秒精度,由连接器生成的事件与连接
时间精度模式导致精度的损失当数据库列具有开云体育电动老虎机分数秒精度取值大于3:
SQL Server数据类型 |
文字类型(模式类型) |
语义类型(模式名) |
笔记 |
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示从午夜开始的毫秒数,不包括时区信息。SQL Server允许 |
|
|
|
表示自epoch以来的毫秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示自epoch以来的毫秒数,不包括时区信息。SQL Server允许 |
十进制值
SQL Server数据类型 |
文字类型(模式类型) |
语义类型(模式名) |
笔记 |
|
|
|
的 |
|
|
|
的 |
|
|
|
的 |
|
|
|
的 |
部署
与动物园管理员,卡夫卡,卡夫卡连接安装后,部署Debezium SQL Server连接器的其余任务是下载开云体育官方注册网址连接器的插件存档,将JAR文件解压缩到Kafka Connect环境中,并将JAR文件的目录添加到卡夫卡连接的plugin.path
.重新启动Kafka Connect进程以获取新的JAR文件。
如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Zookeeper, Kafka, SQL Server和Kafka连接已经安装并准备运行的SQL Server连接器。你也可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium.
示例配置
使用连接器为特定的SQL Server数据库或集群产生更改事件:开云体育电动老虎机
启用SQL Server上的CDC出版疾病预防控制中心数据库中的事件。开云体育电动老虎机
为SQL Server连接器创建一个配置文件。
当连接器启动时,它将获取SQL Server数据库中模式的一致快照,并开始流化更改,为每个插入、更新和删除的行生成事件。开云体育电动老虎机您还可以选择为模式和表的一个子集生成事件。可选地忽略、屏蔽或截断敏感、过大或不需要的列。
下面是一个连接器实例的配置示例,该连接器实例监视位于192.168.99.100上端口1433的SQL Server服务器,我们在逻辑上将其命名为192.168.99.100fullfillment
.通常情况下,您可以在配置Debezium SQL Serv开云体育官方注册网址er连接器时使用. json
文件中使用连接器可用的配置属性。
{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.sqlserver. sqlserverconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“1433”,(4)“开云体育电动老虎机数据库。user": "sa",(5)“开云体育电动老虎机数据库。密码”:“密码!”(6)“开云体育电动老虎机数据库。dbname": "testDB",(7):开云体育电动老虎机“database.server.name fullfillment”,(8)”表。白名单”:“dbo.customers”,(9)“开云体育电动老虎机database.history.kafka.bootstrap。服务器”:“卡夫卡:9092”,(10)“开云体育电动老虎机database.history.kafka。来pic": "dbhistory.fullfillment"(11)}}
1 | 当我们向Kafka Connect服务注册连接器时,连接器的名称。 |
2 | 此SQL Server连接器类的名称。 |
3. | SQL Server实例的地址。 |
4 | SQL Server实例的端口号。 |
5 | SQL Server用户名 |
6 | SQL Server用户的密码 |
7 | 要从中捕获更改的数据库的名称。开云体育电动老虎机 |
8 | SQL Server实例/集群的逻辑名称,它形成了一个名称空间,并用于连接器写入的所有Kafka主题的名称、Kafka Connect模式名称以及使用Avro转换器时对应的Avro模式的名称空间中。 |
9 | Debezium应该捕获其更改的所有表的列表。开云体育官方注册网址 |
10 | 这个连接器将使用的Kafka代理列表,用于将DDL语句写入并恢复到数据库历史主题。开云体育电动老虎机 |
11 | 连接器将在其中写入和恢复DDL语开云体育电动老虎机句的数据库历史主题的名称。此主题仅供内部使用,消费者不应使用。 |
看到连接器属性的完整列表可以在这些配置中指定。
该配置可以通过POST发送到正在运行的Kafka Connect服务,然后该服务将记录配置并启动一个连接器任务,该任务将连接到SQL Server数据库,读取事务日志,并记录Kafka主题的事件。开云体育电动老虎机
添加连接器配置
要运行Debezi开云体育官方注册网址um SQL Server连接器,请创建连接器配置并将该配置添加到Kafka Connect集群。
SQL Server被设置为运行Debezium连接器。开云体育官方注册网址
安装开云体育官方注册网址Debezium SQL Server连接器。
为SQL Server连接器创建一个配置。
使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。
当连接器启动时,它将执行一致性快照连接器为其配置的SQL Server开云体育电动老虎机数据库。然后连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到Kafka主题。
监控
除了Zo开云体育官方注册网址okeeper、Kafka和Kafka Connect所支持的内置JMX指标外,Debezium SQL Server连接器还有三种指标类型。
详情请参阅监控文档了解如何通过JMX公开这些指标的详细信息。
快照指标
的MBean是开云体育官方注册网址debezium.sql_server: type = connector-metrics上下文=快照,server =<开云体育电动老虎机 database.server.name >
.
属性 | 类型 | 描述 |
---|---|---|
|
|
连接器读取的最后一个快照事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
快照中包含的表的总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否启动。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。 |
|
|
映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。 |
流指标
的MBean是开云体育官方注册网址debezium.sql_server: type = connector-metrics、上下文=流媒体服务器=<开云体育电动老虎机 database.server.name >
.
属性 | 类型 | 描述 |
---|---|---|
|
|
连接器读取的最后一个流事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机 |
|
|
从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机 |
|
|
提交的已处理事务的数量。 |
|
|
上次接收事件的坐标。 |
|
|
最后处理的事务的事务标识符。 |
架构历史指标
的MBean是开云体育官方注册网址debezium.sql_server: type = connector-metrics、上下文= schema-history服务器=<开云体育电动老虎机 database.server.name >
.
属性 | 类型 | 描述 |
---|---|---|
|
|
之一 |
|
|
恢复开始的时间(以epoch秒为单位)。 |
|
|
在恢复阶段读取的更改数。 |
|
|
在恢复和运行时应用的模式更改的总数。 |
|
|
自上次更改从历史存储区恢复以来所经过的毫秒数。 |
|
|
自应用最后一次更改以来所经过的毫秒数。 |
|
|
从历史存储中恢复的最后一次更改的字符串表示形式。 |
|
|
最后应用的更改的字符串表示形式。 |
连接器属性
以下配置属性为要求除非有默认值可用。
财产 |
默认的 |
描述 |
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。) |
||
连接器的Java类的名称。始终使用值 |
||
|
应该为此连接器创建的最大任务数。SQL Server连接器总是使用单个任务,因此不使用此值,因此默认值总是可以接受的。 |
|
SQL Server数据库服务器的IP地址或主机名。开云体育电动老虎机 |
||
|
SQL Server数据库服务器的整数端口号。开云体育电动老虎机 |
|
连接SQL Server数据库服务器时使用的用户名。开云体育电动老虎机 |
||
连接SQL Server数据库服务器时使用的密码。开云体育电动老虎机 |
||
用于传输更改的SQL Server数据库的名称开云体育电动老虎机 |
||
逻辑名称,用于标识并提供被监视的特定SQL Server数据库服务器的名称空间。开云体育电动老虎机逻辑名在所有其他连接器中应该是唯一的,因为它被用作从该连接器发出的所有Kafka主题名的前缀。只能使用字母数字字符和下划线。 |
||
Kafka主题的全称,连接器将在其中存储数据库模式历史。开云体育电动老虎机 |
||
一个主机/端口对列表,连接器将使用它建立到Kafka集群的初始连接。此连接用于检索以前由连接器存储的数据库模式历史,并用于写入从源数据库读取的每个开云体育电动老虎机DDL语句。这应该指向Kafka Connect进程使用的相同的Kafka集群。 |
||
一个可选的逗号分隔的正则表达式列表,它匹配要监控的表的完全限定表标识符;任何不在白名单中的表将被排除在监控范围之外。每个标识符都是这样的schemaName.的表.默认情况下,连接器将监视每个被监视模式中的每个非系统表。不得与 |
||
一个可选的逗号分隔的正则表达式列表,它与要排除在监视之外的表的完全限定表标识符匹配;任何不在黑名单中的表都将被监控。每个标识符都是这样的schemaName.的表.不得与 |
||
空字符串 |
一个以逗号分隔的可选正则表达式列表,该列表与应从更改事件消息值中排除的列的完全限定名称匹配。列的完全限定名的格式为schemaName.的表.columnName.请注意,主键列总是包含在事件的键中,如果从值中列入黑名单也一样。 |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它匹配基于字符的列的完全限定名称,这些列的值应该是更改事件消息值中的假名,字段值由使用该算法的散列值组成 可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数或零。列的完全限定名的格式为schemaName.的表.columnName. 例子: column.mask.hash.sha with.salt——256.。CzQMA0cB5K = dbo.orders。customerName, dbo.shipment.customerName 在哪里 注:取决于 |
|
|
时间、日期和时间戳可以用不同的精度表示,包括: |
|
|
布尔值,指定连接器是否应该将数据库模式中的更改发布到与数据库服务器ID同名的Kafka主题。开云体育电动老虎机每个模式更改都用一个包含数据库名称的键和一个描述模式更新的JSON结构值来记录。开云体育电动老虎机这与连接器内部记录数据库历史的方式无关。开云体育电动老虎机默认为 |
|
|
控制是否在删除事件之后生成墓碑事件。 |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与基于字符的列的完全限定名匹配,如果字段值长于指定的字符数,则这些列的值应在更改事件消息值中被截断。可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数。列的完全限定名的格式为schemaName.的表.columnName. |
|
N/A |
一个以逗号分隔的可选正则表达式列表,该列表与基于字符的列的完全限定名称匹配,这些列的值应在更改事件消息值中被替换为由指定数量的星号( |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与列的完全限定名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模式中。模式参数 |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与特定于数据库的数据类型名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模开云体育电动老虎机式中。模式参数 |
|
空字符串 |
与完全限定的表和列匹配以映射主键的正则表达式的分号列表。 |
以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。
财产 |
默认的 |
描述 |
最初的 |
一种对所捕获表的结构和可选数据进行初始快照的模式。快照完成后,连接器将继续从数据库的重做日志中读取更改事件。开云体育电动老虎机 |
|
repeatable_read |
模式,以控制使用哪种事务隔离级别以及连接器锁定受监控表的时间。有五个可能的值: 这是值得记录的 另一个方面是数据一致性。只有 |
|
提交 |
表示源记录中附加时间戳标准的字符串(ts_ms)。 |
|
|
指定连接器在事件处理期间应如何对异常作出反应。 |
|
|
正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|
|
正整数值,指定阻塞队列的最大大小,从数据库日志中读取的更改事件在写入Kafka之前被放置在其中。开云体育电动老虎机例如,当写入Kafka较慢或Kafka不可用时,该队列可以为CDC表阅读器提供反压力。出现在队列中的事件不包括在此连接器定期记录的偏移量中。属性中指定的最大批处理大小,默认值为8192 |
|
|
正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。 |
|
|
控制心跳消息的发送频率。 |
|
|
控制要向其发送心跳消息的主题的命名。 |
|
连接器启动后在快照之前应该等待的间隔(以毫秒为单位); |
||
|
指定在进行快照时应一次性从每个表读取的最大行数。连接器将以这个大小的多个批次读取表内容。默认为2000。 |
|
|
整数值,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果在此时间间隔内无法获取表锁,快照将失败(另请参阅快照). |
|
控制快照中包含表中的哪些行。 |
||
v2 |
的架构版本 |
|
|
字段名是否被净化以符合Avro命名要求。看到Avro命名欲知详情。 |
|
服务器时区。 这用于定义从服务器(实际上没有分区)检索的事务时间戳(ts_ms)的时区。默认值为未设置。只有在SQL Server 2014或更老版本上运行,并为数据库服务器和运行Debezium连接器的JVM使用不同的时区时才应该指定。开云体育官方注册网址开云体育电动老虎机 |
||
|
当设置为 看到事务的元数据更多细节。 |
连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。属性开头的所有连接器配置属性开云体育电动老虎机database.history.producer。
在创建写入数据库历史的Kafka生产者时使用前缀(不带前缀),以及所有以前缀开头的生产者开云体育电动老虎机开云体育电动老虎机database.history.consumer。
在创建Kafka消费者时使用(没有前缀),在连接器启动时读取数据库历史。开云体育电动老虎机
例如,可以使用以下连接器配置属性安全连接到Kafka代理:
除了直通到Kafka生产者和消费者,属性从开云体育电动老虎机数据库。
,如。开云体育电动老虎机database.applicationName 开云体育官方注册网址= debezium
传递给JDBC URL。
开云体育电动老虎机database.history.producer.security。协议SSL databas开云体育电动老虎机e.history.producer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.producer.ssl.keystore。密码= test1234 datab开云体育电动老虎机ase.history.producer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks 开云体育电动老虎机database.history.producer.ssl.truststore。密码= test1234 datab开云体育电动老虎机ase.history.producer.ssl.key。密码= test1234 datab开云体育电动老虎机ase.history.consumer.security。协议SSL databas开云体育电动老虎机e.history.consumer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.consumer.ssl.keystore。密码= test1234 datab开云体育电动老虎机ase.history.consumer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks 开云体育电动老虎机database.history.consumer.ssl.truststore。密码= test1234 datab开云体育电动老虎机ase.history.consumer.ssl.key.password = test1234