开云体育官方注册网址Debezium连接器Vitess
开云体育官方注册网址Debezium的Vitess连接器捕获Vitess碎片中的行级更改用于.有关与此连接器兼容的Vitess版本的信息,请参见开云体育官方注册网址Debezium发布概述.
连接器暂不支持快照特性。第一次连接到Vitess集群时,它从键空间的当前VGTID位置开始,并持续捕获插入、更新和删除数据库内容以及提交到Vitess键空间的行级更改。开云体育电动老虎机连接器生成数据更改事件记录,并将其传输到Kafka主题。对于每个表,默认行为是连接器将所有生成的事件流到该表的单独Kafka主题中。应用程序和服务使用来自该主题的数据更改事件记录。
概述
维塔斯的VStream特性是在4.0版中引入的。它是一个更改事件订阅服务,从Vitess集群的底层MySQL碎片中提供与MySQL二进制日志等效的信息。用户可以在一个密钥空间中订阅多个分片,这使得它成为向下游CDC进程提供数据的方便工具。
为了读取和处理数据库更改,Vitess连开云体育电动老虎机接器订阅了VTGateVStream gRPC服务。VTGate是一个轻量级的、无状态的gRPC服务器,它是Vitess集群设置的一部分。
连接器使您可以灵活地选择订阅主
节点,或到副本
用于更改事件的节点。
连接器为每个被捕获的行级插入、更新和删除操作生成一个更改事件,并为单独Kafka主题中的每个表发送更改事件记录。客户端应用程序读取与感兴趣的数据库表对应的Kafka主题,并可以对从这些主题接收到的每一个行级事件做出反应。开云体育电动老虎机
这个连接器容错能力强。当连接器读取更改并产生事件时,它会记录每个事件的VGTID位置。如果连接器由于任何原因(包括通信故障、网络问题或崩溃)而停止,重新启动时连接器将继续读取它上次停止的WAL。
连接器如何工作
为了优化配置和运行Debezium Vitess连接器,了解连接器开云体育官方注册网址流如何更改事件、确定Kafka主题名称和使用元数据是很有帮助的。
流的变化
Vitess连接器将其所有时间都用于从它所订阅的VTGate的VStream gRPC服务传输更改。客户端从VStream接收更改,因为它们被提交到底层MySQL服务器的binlog的特定位置,这些位置被称为VGTID。
Vitess中的VGTID相当于MySQL中的GTID,它描述了VStream中发生更改事件的位置。通常,一个VGTID有多个分片GTID,每个分片GTID是的元组(Keyspace, Shard, GTID)
,它描述了给定shard的GTID位置。
在订阅VStream服务时,连接器需要提供VGTID和平板电脑类型(如。主
,副本
).VGTID描述了VStream应该开始发送变更事件的位置;Tablet类型描述了我们从每个分片中的哪个底层MySQL实例(master或replica)中读取更改事件。
当连接器第一次连接到Vitess集群时,它从名为Vitess的组件获取当前VGTIDVTCtld并向VStream提供当前的VGTID。
Debe开云体育官方注册网址zium Vitess连接器作为VStream的gRPC客户端。当连接器接收到更改时,它将事件转换为Debezium开云体育官方注册网址创建,更新,或删除事件,包括事件的VGTID。Vitess连接器将这些变更事件以记录的形式转发给Kafka Connect框架,Kafka Connect框架运行在同一个进程中。Kafka Connect进程异步地将更改事件记录按照它们生成的相同顺序写入相应的Kafka主题。
Kafka Connect会定期记录最新的数据抵消在另一个卡夫卡的话题。偏移量表示Debezium包含在每个事件中的特定于源的位置信息。开云体育官方注册网址对于Vitess连接器,每个更改事件中记录的VGTID就是偏移量。
当Kafka Connect优雅地关闭时,它会停止连接器,将所有事件记录刷新到Kafka,并记录从每个连接器接收到的最后偏移量。当Kafka Connect重新启动时,它会读取每个连接器的最后记录偏移量,并在其最后记录偏移量处启动每个连接器。当连接器重新启动时,它向VStream发送一个请求,以发送刚好在该位置之后开始的事件。
主题名称
Vitess连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。默认情况下,Kafka主题名为topicPrefix.keyspaceName.的表地点:
topicPrefix主题前缀是否与
topic.prefix
连接器配置属性。keyspaceName发生操作的密钥空间(也就是数据库)的名称。开云体育电动老虎机
的表发生操作的数据库表的名称。开云体育电动老虎机
例如,假设实现
在Vitess安装中捕获更改的连接器的配置中的逻辑服务器名称是商务
包含四个表的Keyspace:产品
,products_on_hand
,客户
,订单
.不管密钥空间有多少分片,连接器都会将记录流到以下四个Kafka主题:
fulfillment.commerce.products
fulfillment.commerce.products_on_hand
fulfillment.commerce.customers
fulfillment.commerce.orders
事务的元数据
开云体育官方注册网址Debezium可以生成表示事务边界的事件,并丰富数据更改事件消息。
Debezium何时接收事务元开云体育官方注册网址数据的限制
开云体育官方注册网址Debezium仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。 |
开云体育官方注册网址类生成事务边界事件开始
而且结束
每个事务中的分隔符。事务边界事件包含以下字段:
-
状态
-
开始
或结束
. -
id
-
唯一事务标识符的字符串表示形式。
-
ts_ms
-
事务边界事件的时间(
开始
或结束
事件)。如果数据源没有向Debezium提供事件时间,则该字段表示Debeziu开云体育官方注册网址m处理事件的时间。 -
event_count
(结束
事件) -
事务发出的事件总数。
-
data_collections
(结束
事件) -
对的数组
data_collection
而且event_count
元素,该元素指示连接器为源自数据集合的更改发出的事件数。
{“状态”:“开始”,“id”:“[{\“用于\”,\“test_unsharded_keyspace \”,\“碎片\”:\“0 \”,\“gtid \”,\“MySQL56 / e03ece6c - 4 - c04 - 11 - ec - 8 - e20 - 0242 - ac110004:1 - 37 \”}]”、“ts_ms”:1486500577125,“event_count”:空,“data_collections”:零}{“状态”:“结束”,“id”:“[{\“用于\”,\“test_unsharded_keyspace \”,\“碎片\”:\“0 \”,\“gtid \”,\“MySQL56 / e03ece6c - 4 - c04 - 11 - ec - 8 - e20 - 0242 - ac110004:1 - 37 \”}]”、“ts_ms”:1486500577691,“event_count”:1、“data_collections”:[{:“data_collection test_unsharded_keyspace。My_seq ", "event_count": 1}]}
方法覆盖topic.transaction
选项时,连接器将事务事件发出到< topic.prefix >
.transaction
的话题。
启用事务元数据时,数据消息信封
是充实了新的事务
字段。这个字段以复合字段的形式提供关于每个事件的信息:
id
唯一事务标识符的字符串表示total_order
-该事件在事务生成的所有事件中的绝对位置data_collection_order
-该事件在事务发出的所有事件中的每数据收集位置
下面是一个消息的例子:
{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…},“人事处”:“c”、“ts_ms”:1637988245467,“交易”:{" id ": "[{\“用于\”,\“test_unsharded_keyspace \”,\“碎片\”:\“0 \”,\“gtid \”,\“MySQL56 / e03ece6c - 4 - c04 - 11 - ec - 8 - e20 - 0242 ac110004:1 - 68 \”}]”、“total_order”:1、“data_collection_order”:1}}
数据变更事件
Debe开云体育官方注册网址zium Vitess连接器为每个行级生成一个数据更改事件插入
,更新
,删除
操作。每个事件包含一个键和一个值。键和值的结构取决于所更改的表。
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。
下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式
字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器并配置它来生成所有四个基本的更改事件部分,则更改事件的结构如下:
{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
项 | 字段名 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3. |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始表相同.
从Kafka 0.10开始,Kafka可以选择记录事件键和值时间戳消息被创建(由生产者记录)或被Kafka写入日志的时间。 |
Vitess连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名中的每个剩余字符、模式名和表名中的每个字符必须是拉丁字母、数字或下划线,即a-z、a-z、0-9或\_。如果存在无效字符,则将其替换为下划线字符。 如果逻辑服务器名、模式名或表名包含无效字符,并且唯一区分名称的字符无效,因此用下划线替换,则可能导致意外的冲突。 |
连接器不允许使用 |
更改事件键
对于给定的表,更改事件的键具有一个结构,该结构在创建事件时包含表主键中每一列的字段。
考虑一个客户
表中定义的商务
Keyspace和该表的更改事件键的示例。
CREATE TABLE customers (id INT NOT NULL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id));
如果topic.prefix
属性的值Vitess_server
,每变事件为客户
表,虽然它有这个定义有相同的键结构,在JSON中看起来像这样:
{"schema": {(1)"type": "struct", "name": "Vitess_server.commerce.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){" name ": " id ", "指数":" 0 ","模式":{“类型”:“INT32”、“可选”:“假的 " } } ] }, " 有效载荷":{(5)id: "1"},}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容 |
2 |
|
定义键的有效负载结构的模式的名称。此模式描述已更改表的主键的结构。关键模式名具有该格式connector-name.keyspace-name.表名.
|
3. |
|
指示事件键是否必须在其中包含值 |
4 |
|
属性中期望的每个字段 |
5 |
|
包含为其生成此更改事件的行的键。在本例中,键包含一个单键 |
虽然 |
如果表没有主表,则更改事件的键为空。没有主键约束的表中的行不能被唯一标识。 |
更改事件值
change事件中的值比键稍微复杂一些。和键一样,值也有模式
Section和a有效载荷
部分。的模式
类的模式信封
的结构有效载荷
节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。
考虑用于显示更改事件键示例的同一个示例表:
CREATE TABLE customers (id INT NOT NULL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id));
的触发事件更新
而且删除
操作包含表中所有列的先前值。
创建事件
对象中创建数据的操作所生成的更改事件的值部分,示例如下客户
表:
{"schema": {(1)"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": " vitess_server . business .customers. value ",(2)“字段”:“之前”},{“类型”:“结构”、“字段”:[{“类型”:“int32”、“可选”:假的,“场”:“id”},{“类型”:“弦”、“可选”:假的,“场”:“first_name”},{“类型”:“弦”、“可选”:假的,“场”:“last_name”},{“类型”:“弦”、“可选”:假的,“场”:“电子邮件”}],“可选”:真的,“名字”:“Vitess_server.commerce.customers.Value”、“字段”:“之后”},{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“弦”、“可选”:假的,“场”:“分贝”},{“类型”:“弦”、“可选”:假的,“场”:“模式”},{“类型”:“弦”、“可选”:假的,“场”:“表”},{“类型”:“int64”、“可选”:真的,“场”:“vgtid”}],“可选的”:false, "name": "io.开云体育官方注册网址debezium.connector. vitss . source ",(3)"field": "source"}, {"type": "string", "optional": false, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "Vitess_server.commerce.customers.Envelope"(4)}, "有效载荷":{(5)“之前”:空,(6)"后":{(7)“id”:1、“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{(8)“版本”:“2.0.1。最后”、“连接器”:“维塔斯”、“名称”:“my_sharded_connector”、“ts_ms”:1559033904863,“快照”:真的,“分贝”:“”、“用于”:“商务”、“表”:“客户”、“vgtid”:“[{\“用于\”,\“商务\”,\“碎片\”:\“80 - \”,\“gtid \”,\“MariaDB / 0-54610504-47 \”},{\“用于\”,\“商务\”,\“碎片\”:\“-80 \”,\“gtid \”,\“MariaDB / 0-1592148-45 \"}]" }, " op”:“c”,(9)“ts_ms”:1559033904863(10)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
值的模式,它描述值的有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值模式都是相同的。 |
2 |
|
在 |
3. |
|
|
4 |
|
|
5 |
|
该值为实际数据。这是变更事件提供的信息。 |
6 |
|
可选字段,指定事件发生之前行的状态。当 |
7 |
|
可选字段,指定事件发生后行的状态。在本例中, |
8 |
|
描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:
|
9 |
|
返回string,描述导致连接器产生事件的操作类型。在这个例子中,
|
10 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
更新事件
样例中更新的更改事件的值客户
表的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。中的更新中连接器生成的事件中的更改事件值的示例客户
表:
{"schema":{…}, "payload": {"before": {(1)“id”:1、“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“后”:{(2)"id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "source": {(3)“版本”:“2.0.1。最后”、“连接器”:“维塔斯”、“名称”:“my_sharded_connector”、“ts_ms”:1559033904863,“快照”:空,“分贝”:“”、“用于”:“商务”、“表”:“客户”、“vgtid”:“[{\“用于\”,\“商务\”,\“碎片\”:\“80 - \”,\“gtid \”,\“MariaDB / 0-54610504-47 \”},{\“用于\”,\“商务\”,\“碎片\”:\“-80 \”,\“gtid \”,\“MariaDB / 0-1592148-46 \"}]" }, " op”:“u”,(4)“ts_ms”:1465584025523(5)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
一个可选字段,包含数据库提交前该行中所有列的所有值。开云体育电动老虎机 |
2 |
|
可选字段,指定事件发生后行的状态。在本例中, |
3. |
|
描述事件的源元数据的必填字段。的
|
4 |
|
返回string,描述操作类型。在一个更新事件值,则 |
5 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
更新行主键的列将更改行键的值。当一个键改变时,Debezium输出开云体育官方注册网址三个事件: |
删除事件
的值删除改变事件有相同之处模式
部分为创建而且更新同一表的事件。的有效载荷
的一部分删除事件。客户
表是这样的:
{"schema":{…}, "payload": {"before": {(1)"id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "after": null,(2)“源”:{(3)“版本”:“2.0.1。最后”、“连接器”:“维塔斯”、“名称”:“my_sharded_connector”、“ts_ms”:1559033904863,“快照”:空,“分贝”:“”、“用于”:“商务”、“表”:“客户”、“vgtid”:“[{\“用于\”,\“商务\”,\“碎片\”:\“80 - \”,\“gtid \”,\“MariaDB / 0-54610504-47 \”},{\“用于\”,\“商务\”,\“碎片\”:\“-80 \”,\“gtid \”,\“MariaDB / 0-1592148-47 \"}]" }, " op”:“d”,(4)“ts_ms”:1465581902461(5)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
可选字段,指定事件发生之前行的状态。在一个删除事件值,则 |
2 |
|
可选字段,指定事件发生后行的状态。在一个删除事件值,则 |
3. |
|
描述事件的源元数据的必填字段。在一个删除事件值,则
|
4 |
|
返回string,描述操作类型。的 |
5 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
一个删除更改事件记录为使用者提供了处理删除该行所需的信息。
Vitess连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。
删除一行时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有相同键的消息,消息值必须为零
.为了实现这一点,Vitess连接器遵循一个删除特别活动墓碑上事件,具有相同的键,但零
价值。
数据类型映射
Vitess连接器用事件表示对行的更改,事件的结构类似于行所在的表。该事件为每个列值包含一个字段。该值在事件中的表示方式取决于列的Vitess数据类型。本节描述这些映射。
如果默认的数据类型转换不能满足您的需求,您可以这样做创建自定义转换器对于连接器。
基本类型
下表描述连接器如何将基本Vitess数据类型映射到文字类型和一个语义类型在事件字段中。
文字类型描述了如何使用Kafka Connect模式类型逐字表示值:
INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,布尔
,字符串
,字节
,数组
,地图
,结构体
.语义类型描述了Kafka Connect模式如何捕获意义该字段使用Kafka Connect模式的名称。
Vitess数据类型 | 文字类型(模式类型) | 语义类型(模式名)和Notes |
---|---|---|
|
|
N/A |
|
还不支持 |
N/A |
|
还不支持 |
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
|
|
|
|
|
|
|
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
还不支持 |
N/A |
设置Vitess
开云体育官方注册网址Debezium与Vitess一起使用不需要任何特定的配置。安装Vitess的标准说明请参见通过Docker进行本地安装指南,或Kubernetes的Vitess操作符指南。
确保安装了Vitess连接器的机器可以访问VTGate主机及其gRPC端口(默认为15991)
确保安装Vitess连接器的机器可以访问VTCtld主机及其gRPC端口(默认为15999)
部署
与动物园管理员,卡夫卡,卡夫卡连接安装后,部署Debezium Vitess连接器的其余任务是下载开云体育官方注册网址连接器的插件存档,将JAR文件解压缩到Kafka Connect环境中,并将JAR文件的目录添加到卡夫卡连接的plugin.path
.然后需要重新启动Kafka Connect进程来获取新的JAR文件。
如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Zookeeper, Kafka和Kafka Connect已经安装并准备运行Vitess连接器。你也可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium.
连接器配置示例
下面是一个Vitess连接器的配置示例,该连接器连接到Vitess (VTGate的VStream)服务器,端口为192.168.99.100,端口为15991,逻辑名称为fullfillment
.它还连接到端口15999(192.168.99.101)上的VTCtld服务器以获得初始VGTID。通常,配置Debezium Vitess连接器的方法是开云体育官方注册网址. json
文件中使用连接器可用的配置属性。
您可以选择为模式和表的一个子集生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。
{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezu .connector.vitess. vitessconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“15991”,(4)“开云体育电动老虎机数据库。user": "vitess",(5)“开云体育电动老虎机数据库。密码”:“vitess_password”,(6)“维塔斯。用于": "commerce",(7)“vitess.tablet。类型”:“大师”,(8)“vitess.vtctld。主机”:“192.168.99.101”,(9)“vitess.vtctld。港”:“15999”,(10)“vitess.vtctld。user": "vitess",(11)“vitess.vtctld。密码”:“vitess_password”,(12)”的话题。前缀”:“fullfillment”,(13)”任务。马克斯:1(14)}}
1 | 在Kafka Connect服务中注册连接器时的名称。 |
2 | 这个Vitess连接器类的名称。 |
3. | Vitess (VTGate的VStream)服务器的地址。 |
4 | Vitess (VTGate的VStream)服务器的端口号。 |
5 | Vitess数据库服务器(VTGate gRPC)的用开云体育电动老虎机户名。 |
6 | Vitess数据库服务器(VTGate gRPC)的密开云体育电动老虎机码。 |
7 | keyspce(也就是数据库)的名称。开云体育电动老虎机因为没有指定分片,所以它从密钥空间中的所有分片读取更改事件。 |
8 | 读取更改事件的MySQL实例的类型(MASTER或REPLICA)。 |
9 | VTCtld服务器地址。 |
10 | VTCtld服务器的端口。 |
11 | VTCtld服务器(VTCtld gRPC)的用户名。 |
12 | VTCtld数据库服务器(VTCtld gRPC)密码开云体育电动老虎机。 |
13 | Vitess集群的主题前缀,它形成了一个名称空间,用于连接器写入的所有Kafka主题的名称、Kafka Connect模式名称以及使用Avro转换器时对应的Avro模式的名称空间。 |
14 | 在同一时间内只能执行一个任务。 |
看到Vitess连接器属性的完整列表可以在这些配置中指定。
您可以使用帖子
命令到正在运行的Kafka Connect服务。该服务记录配置并启动连接到Vitess数据库的连接器任务,并将更改事件记录流到Kafka主题。开云体育电动老虎机
每个任务偏移存储模式的连接器配置示例
当您有一个大型Vitess安装,需要多个连接器任务来处理更改日志时,您可以使用每个任务偏移存储特性来启动多个连接器任务,并让每个任务使用Vitess碎片的一个子集。每个任务将在Kafka的偏移主题中在自己的分区空间中持久化其偏移量(它所跟踪的碎片的vgtids)。
下面是连接到Vitess (VTGate的VStream)服务器的Vitess连接器的相同示例,但是有三个额外的参数来调用每任务偏移存储模式。
{"name": "inventory-connector", "config": {"connector.class": "io.de开云体育官方注册网址bezium.connector.vitess.VitessConne开云体育电动老虎机ctor", "数据库。主机名:192.168.99.100,数据库。开云体育电动老虎机端口:“15991”,“数据库”。开云体育电动老虎机用户”:“维塔斯”,“开云体育电动老虎机数据库。Password ": "vitess_password", "vitess. Password "。Keyspace ": "commerce", "vitess.tablet "。type": "MASTER", "vitess.vtctld. "主机”:“192.168.99.101”,“vitess.vtctld。港”:“15999”,“vitess.vtctld。user": "vitess", "vitess.vtctld.password": "vitess_password", "database.server.name": "fullfillment", "vitess.offset.storage.per.task": true,(1)“vitess.offset.storage.task.key。创”:1、(2)“vitess.prev.num。任务”:1、(3)”任务。马克斯:2(4)}}
1 | 指定我们要启用每个任务偏移量存储特性 |
2 | 指定当前任务并行度的生成号为1 |
3. | 指定上一代任务并行度中的任务数为1 |
4 | 指定我们希望为当前任务并行性启动两个任务 |
vitess分片分配的任务基于一个简单的轮询算法。在这个启动两个连接器任务的例子中,假设我们有4个vitess碎片(-40,40-80,80-c0,c0-), task0将在碎片上工作(-40,80-c0), task1将在碎片上工作(40-80,c0-)。
我们需要三个配置参数的原因是确保每个连接器任务保存的偏移量不会相互冲突,并自动处理前一个任务并行性对偏移量的迁移。为了确保Kafka offset topic中的分区键不冲突,我们对每个连接器任务使用这个分区名称方案:taskId_numTasks_gen。因此,对于当前启动两个第一代任务的例子,task0将在Kafka的offset主题分区key: task0_2_1中写入它的偏移量,task1将使用分区key:(task1_2_1)。生成配置参数用于区分不同代生成的分区键(生成对应于任务并行度的每次更改)
当任务并行性发生变化时(例如,您希望启动4个连接器任务而不是2个,以处理来自vitess的更大流量),您将指定任务。max = 4, vitess.offset.storage.task.key。创= 2,vitess.prev.num。当Tasks =2时,此任务并行生成的偏移分区为:task0_4_2, task1_4_2, task2_4_2, task3_4_2。一旦连接器重新启动,连接器将检测到当前4个分区键没有保存以前的偏移量,它将从保存在上一代键:task0_2_1和task1_2_1中的偏移量调用自动偏移量迁移。对于当前的例子,4个vitess碎片(-40,40-80,80-c0,c0-), task0将工作在shard:(-40), task1:(40-80), task2:(80-c0), task3:(c0-)。上一代并行性(使用2个任务,每个任务使用2个分片)的4个分片的偏移量将自动迁移到使用4个任务的这一代(每个任务使用一个分片)。
注意,在Kafka offset-storage-per-task特性启用之前,任务并行度生成数默认为0,在偏移量迁移过程中会有一个特殊的偏移量查找。因此,如果你让vitess连接器运行了一段时间,但没有打开每个任务的偏移存储功能,现在你想打开这个功能,请指定vitess.offset.storage.task.key。创= 1,vitess.prev.num。任务=1,帮助偏移量自动迁移。
注意,vitess. prem .num.tasks需要匹配上一个任务并行生成中启动的实际任务数量。连接器任务的数量通常与任务相同。最大配置参数您指定,但在极少数情况下,任务。最大> vitess shards数量,连接器将只启动_number_of_tasks = the_number_of_vitess_shards。这种罕见的情况可能首先是配置错误造成的。
看到Vitess连接器属性的完整列表可以在这些配置中指定。
您可以使用帖子
命令到正在运行的Kafka Connect服务。该服务记录配置并启动连接到Vitess数据库的连接器任务,并将更改事件记录流到Kafka主题。开云体育电动老虎机
添加连接器配置
要开始运行Vitess连接器,创建一个连接器配置,并将该配置添加到Kafka Connect集群中。
安装了Vitess连接器的机器可以访问VTGate主机及其gRPC端口(默认为15991)
可以从安装Vitess连接器的机器访问VTCtld主机及其gRPC端口(默认为15999)
完成Vitess连接器的安装。
为Vitess连接器创建一个配置。
使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。
当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到Kafka主题。
监控
除了Zo开云体育官方注册网址okeeper、Kafka和Kafka Connect提供的内置JMX指标支持外,Debezium Vitess连接器只提供了一种类型的指标。
流指标当连接器捕获变更和流化变更事件记录时,提供有关连接器操作的信息。
开云体育官方注册网址Debezium监控文档提供了如何使用JMX公开这些指标的详细信息。
流指标
的MBean是开云体育官方注册网址debezium.vitess: type = connector-metrics、上下文=流媒体服务器=< vitess.server.name >
.
属性 | 类型 | 描述 |
---|---|---|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已由连接器上配置的包含/排除列表筛选规则筛选的事件数。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机 |
|
|
从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机 |
|
|
提交的已处理事务的数量。 |
|
|
队列的最大缓冲区(以字节为单位),用于在streamer和主Kafka Connect循环之间传递事件。 |
|
|
队列的当前缓冲区(以字节为单位),用于在streamer和主Kafka Connect循环之间传递事件。 |
连接器配置属性
Debe开云体育官方注册网址zium Vitess连接器具有许多配置属性,您可以使用这些属性为应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:
以下配置属性为要求除非有默认值可用。
财产 | 默认的 | 描述 | ||
---|---|---|---|---|
没有默认的 |
连接器的唯一名称。尝试使用相同的名称再次注册将失败。所有Kafka Connect连接器都需要这个属性。 |
|||
没有默认的 |
连接器的Java类的名称。始终使用值 |
|||
|
应该为此连接器创建的最大任务数。如果启用offset.storage.per.task模式,Vitess连接器可以使用多个任务。 |
|||
没有默认的 |
Vitess数据库服务器(VTGate)的IP地址或主机名。开云体育电动老虎机 |
|||
|
Vitess数据库服务器(VTGate)的整型端口号。开云体育电动老虎机 |
|||
用于传输更改的密钥空间的名称。 |
||||
N/A |
用于流更改的碎片的可选名称。如果没有配置,对于非分片密钥空间,连接器流将从唯一的分片更改,对于分片密钥空间,连接器流将从密钥空间中的所有分片更改。我们建议不要为了从密钥空间中的所有分片进行流配置,因为它对重分片操作有更好的支持。如果配置,例如, |
|||
|
一个碎片流的可选GTID位置。这个必须和 |
|||
|
控制Vitess标志stop_on_reshard。 |
|||
N/A |
Vitess数据库服务器(VTGate)的可选用户名。开云体育电动老虎机如果未配置,则使用未认证的VTGate gRPC。 |
|||
N/A |
Vitess数据库服务器(VTGate)的可选密码。开云体育电动老虎机如果未配置,则使用未认证的VTGate gRPC。 |
|||
|
平板电脑的类型(即MySQL),从它传输更改: |
|||
没有默认的 |
主题前缀,为Debezium在其中捕获更改的特定Vitess数据库服务器或集群提供名称空间。开云体育官方注册网址开云体育电动老虎机数据库服务器逻辑名中只能使用字母数字字符、连字符、点和下划线。开云体育电动老虎机该前缀在所有其他连接器中应该是唯一的,因为它被用作从该连接器接收记录的所有Kafka主题的主题名前缀。 +
|
|||
没有默认的 |
一个可选的、以逗号分隔的正则表达式列表,它匹配您想要捕获其更改的表的完全限定表标识符。中未包含的任何表 |
|||
没有默认的 |
一个可选的、以逗号分隔的正则表达式列表,它为您更改的表匹配完全限定的表标识符不想要捕捉。中未包含的任何表 |
|||
没有默认的 |
一个可选的、以逗号分隔的正则表达式列表,它与应该包含在变更事件记录值中的列的完全限定名称相匹配。列的完全限定名的格式为用于.的表.columnName.不也定了吗 |
|||
没有默认的 |
一个可选的、以逗号分隔的正则表达式列表,它匹配应从更改事件记录值中排除的列的完全限定名称。列的完全限定名的格式为用于.的表.columnName.不也定了吗 |
|||
|
控制是否删除事件之后是一个墓碑事件。 |
|||
|
指定是否启用每任务偏移量存储模式,启动多个连接器任务并持久化按任务划分的偏移量。 |
|||
|
当vitess.offset.storage.per.task开启时,指定任务并行生成号。当您决定更改连接器任务并行度时(启动更多或更少的连接器任务),您应该增加生成数。 |
|||
|
指定在vitess.offset.storage.per.task打开时在上一代任务并行中使用的连接器任务的数量。 |
|||
空字符串 |
使用正则表达式匹配表列名的分号分隔的表列表。连接器将匹配列中的值映射到它发送给Kafka主题的更改事件记录中的关键字段。当一个表没有主键,或者当你想根据一个不是主键的字段来排序Kafka主题中的更改事件记录时,这是很有用的。 |
|||
没有一个 |
指定应如何调整模式名称以与连接器使用的消息转换器兼容。可能的设置:
|
以下先进的配置属性具有在大多数情况下都适用的默认值,因此很少需要在连接器的配置中指定。
财产 | 默认的 | 描述 |
---|---|---|
没有默认的 |
属性的符号名称的逗号分隔列表自定义转换器连接器可以使用的实例。例如,
您必须设置 对于为连接器配置的每个转换器,还必须添加
例如, isbn。类型:io.deb开云体育官方注册网址ezium.test.IsbnConverter 如果希望进一步控制已配置转换器的行为,可以添加一个或多个配置参数来将值传递给转换器。若要将任何其他配置参数与转换器关联起来,请在参数名称前加上转换器的符号名称。例如, isbn.schema.name: io.开云体育官方注册网址debezium.vitess.type.Isbn |
|
|
指定连接器在事件处理过程中应该如何对异常做出反应: |
|
|
正整数值,指定阻塞队列可以容纳的最大记录数。当Debe开云体育官方注册网址zium从数据库读取事件流时,它会在将事件写入Kafka之前开云体育电动老虎机将其放在阻塞队列中。如果连接器接收消息的速度比写入Kafka的速度快,或者Kafka变得不可用,阻塞队列可以为从数据库读取更改事件提供反压力。开云体育电动老虎机当连接器定期记录偏移量时,队列中保存的事件将被忽略。总是设置的值 |
|
|
正整数值,指定连接器处理的每批事件的最大大小。 |
|
|
一个长整数值,以字节为单位指定阻塞队列的最大容量。默认情况下,不为阻塞队列指定卷限制。若要指定队列可以使用的字节数,请将此属性设置为正的长值。 |
|
|
正整数值,指定连接器在开始处理一批事件之前等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|
|
指示是否对字段名进行清除以遵守Avro命名要求. |
|
没有默认的 |
以逗号分隔的操作类型列表,将在流处理期间跳过。操作包括: |
|
|
确定连接器是否生成具有事务边界的事件,并使用事务元数据丰富更改事件信封。指定 |
|
|
控制VStream的gPRC保活周期间隔。默认为 |
|
没有默认的 |
指定以逗号分隔的gRPC报头列表。默认为空。格式为: |
|
没有默认的 |
指定通道上允许接收的最大消息大小(以字节为单位)。 |
|
N/A |
一个可选的、以逗号分隔的正则表达式列表,它匹配列的完全限定名称,这些列的原始类型和长度应该作为参数添加到发出的更改事件记录中的相应字段模式中。这些模式参数:
分别用于传播变宽类型的原始类型名和长度。这对于接收器数据库中相应列的适当大小非常有用。开云体育电动老虎机列的完全限定名的形式如下: keyspaceName.的表.columnName |
|
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与特定于数据库的数据类型名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改事件记录中的相应开云体育电动老虎机字段模式中。这些模式参数:
分别用于传播变宽类型的原始类型名和长度。这对于接收器数据库中相应列的适当大小非常有用。开云体育电动老虎机列的完全限定名的形式如下: keyspaceName.的表.columnName 看到Vitess连接器如何映射数据类型查看vitess特定数据类型名称的列表。 |
|
|
TopicNamingStrategy类的名称默认为,该类应用于确定数据更改、模式更改、事务、心跳事件等的主题名称 |
|
|
指定主题名称的分隔符,默认为 |
|
|
用于在有界并发散列映射中保存主题名称的大小。此缓存将帮助确定与给定数据集合对应的主题名称。 |
|
|
控制连接器向其发送事务元数据消息的主题的名称。主题名称有这样的模式: |
连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。
出现问题时的行为
开云体育官方注册网址Debezium是一个分布式系统,可以捕获多个上游数据库中的所有更改;开云体育电动老虎机它从不错过或丢失任何事件。当系统正常运行或被仔细管理时,Debezium提供开云体育官方注册网址只有一天交付每个变更事件记录。
如果确实发生了故障,则系统不会丢失任何事件。然而,当它从错误中恢复时,它可能会重复一些更改事件。在这些不正常的情况下,Debezium就像Kafka一样提供开云体育官方注册网址了帮助至少一次变更事件的交付。
本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址
配置和启动错误
在以下情况下,连接器在尝试启动时失败,在日志中报告错误/异常,并停止运行:
连接器的配置无效。
连接器无法通过指定的连接参数连接到Vitess。
在这些情况下,错误消息有关于问题的详细信息,可能还有建议的解决方案。修改配置或解决Vitess问题后,重新启动连接器。
Vitess不可用
当连接器正在运行时,它所连接的Vitses服务器(VTGate)可能由于各种原因变得不可用。如果发生这种情况,连接器将失败并报错并停止。当服务器再次可用时,重新启动连接器。
Vitess连接器在外部以Vitess VGTID的形式存储最后处理的偏移量。在连接器重新启动并连接到服务器实例之后,连接器与服务器通信以继续从该特定偏移量进行流处理。
无效的列名错误
这种错误很少发生。如果您接收到带有错误的消息列:x的非法前缀“@”,来自架构:y,表:z
,你的表没有这样的列,它是一个Vitess vstream错误这是由列重命名或列类型更改引起的。这是一个短暂的误差。您可以在一个小的回退之后重新启动连接器,它应该会自动解决。
Kafka Connect进程优雅地停止
假设Kafka Connect正在以分布式模式运行,并且Kafka Connect进程优雅地停止。在关闭该进程之前,Kafka Connect会将该进程的连接器任务迁移到该组中的另一个Kafka Connect进程。新的连接器任务开始处理之前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。
Kafka连接进程崩溃
如果Kafka连接器进程意外停止,任何正在运行的连接器任务都将终止,而不记录最近处理的偏移量。当Kafka Connect以分布式模式运行时,Kafka Connect会重新启动其他进程上的连接器任务。然而,Vitess连接器从上一次偏移量开始恢复记录通过早期的过程。这意味着新的替换任务可能会生成一些在崩溃之前处理的相同的更改事件。重复事件的数量取决于偏移刷新周期和崩溃前更改的数据量。
由于在故障恢复过程中可能会出现重复事件,因此使用者应该始终预测到一些重复事件。开云体育官方注册网址Debezium的变化是幂等的,所以一系列事件的结果总是相同的状态。
在每个更改事件记录中,Debezium连接器插入有关事件起开云体育官方注册网址源的源特定信息,包括Vitess服务器的事件时间、事务更改写入binlog的位置。消费者可以跟踪这些信息,特别是VGTID,以确定事件是否重复。
Kafka不可用
当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。定期地,在Kafka Connect配置中指定的频率,Kafka Connect会记录在这些更改事件中出现的最新偏移量。如果Kafka代理变得不可用,运行连接器的Kafka Connect进程会反复尝试重新连接到Kafka代理。换句话说,连接器任务将暂停,直到重新建立连接,此时连接器将从停止的位置恢复。
连接器停止一段时间
如果连接器正常停止,则可以继续使用数据库。开云体育电动老虎机任何更改都记录在Vitess binlog中。当连接器重新启动时,它将从中断的地方恢复流更改。也就是说,它为连接器停止时所做的所有数据库更改生成更改事件记录。开云体育电动老虎机
一个正确配置的Kafka集群能够处理大量的吞吐量。Kafka Connect是根据Kafka最佳实践编写的,如果有足够的资源,Kafka Connect连接器也可以处理大量的数据库更改事件。开云体育电动老虎机因此,在停止一段时间后,当Debezium连接器重新启动时,它很可能会赶上在停止时所做的数据库更改。开云体育官方注册网址开云体育电动老虎机这种情况发生的速度取决于Kafka的能力和性能,以及Vitess中对数据的修改量。