开云体育官方注册网址Debezium MySQL连接器
MySQL有一个二进制日志(binlog),它按照提交到数据库的顺序记录所有操作。开云体育电动老虎机这包括对表模式的更改以及对表中数据的更改。MySQL使用binlog进行复制和恢复。
Debe开云体育官方注册网址zium MySQL连接器读取binlog,生成行级的更改事件插入
,更新
,删除
操作,并向Kafka主题发出更改事件。客户端应用程序读取这些Kafka主题。
由于MySQL通常设置为在指定的一段时间后清除二进制日志,MySQL连接器执行初始化一致的快照您的每个数据库。开云体育电动老虎机MySQL连接器从创建快照的位置读取binlog。
有关与此连接器兼容的MySQL数据库版本的信息,请参见开云体育电动老虎机开云体育官方注册网址Debezium发布概述.
连接器如何工作
对连接器支持的MySQL拓扑的概述对于规划应用程序非常有用。为了优化配置和运行Debezium MySQL连接器,了解连接器如开云体育官方注册网址何跟踪表的结构、暴露模式更改、执行快照和确定Kafka主题名称是很有帮助的。
Debe开云体育官方注册网址zium MySQL连接器还没有在MariaDB中进行测试,但是来自社区的多个报告表明该连接器已成功地用于该数据库。开云体育电动老虎机MariaDB的官方支持计划在未来的Debezium版本中实现。开云体育官方注册网址 |
支持的MySQL拓扑
Debe开云体育官方注册网址zium MySQL连接器支持以下MySQL拓扑:
- 独立的
-
当使用单个MySQL服务器时,服务器必须启用binlog (并可选地启用gtid),这样Debez开云体育官方注册网址ium MySQL连接器就可以监视服务器。这通常是可以接受的,因为二进制日志也可以用作增量日志备份.在这种情况下,MySQL连接器总是连接并跟随这个独立的MySQL服务器实例。
- 主和副本
-
Debe开云体育官方注册网址zium MySQL连接器可以跟随一个主服务器或一个副本(如果该副本启用了binlog),但是连接器只在该服务器可见的集群中看到更改。通常,除了多主拓扑,这不是问题。
连接器记录其在服务器binlog中的位置,该位置在集群中的每个服务器上都是不同的。因此,连接器必须只跟随一个MySQL服务器实例。如果该服务器发生故障,则必须重新启动或恢复该服务器,然后连接器才能继续。
- 高可用集群
-
各种各样的高可用性MySQL的解决方案是存在的,它们使它更容易容忍和几乎立即从问题和失败中恢复。大多数HA MySQL集群使用gtid,以便副本能够跟踪任何主服务器上的所有更改。
- Multi-primary
-
NDB (Net开云体育电动老虎机work Database)集群复制使用一个或多个MySQL复制节点,每个复制节点从多个主服务器复制。这是聚合多个MySQL集群复制的强大方法。这种拓扑要求使用gtid。
De开云体育官方注册网址bezium MySQL连接器可以使用这些多主MySQL副本作为源,并且只要新副本赶上旧副本,就可以故障转移到不同的多主MySQL副本。也就是说,新的副本拥有在第一个副本上看到的所有事务。即使连接器只使用数据库和/或表的一个子集,这也可以工作,因为当试图重新连接到一个新的多主MySQL副本并在binlog开云体育电动老虎机中找到正确的位置时,连接器可以配置为包括或排除特定的GTID源。
- 主持
-
Debezium MySQL连接器支持使用托管选项开云体育官方注册网址,如Amazon RDS和Amazon Aurora。
因为这些托管选项不允许全局读锁,所以使用表级锁创建全局读锁一致的快照.
架构历史记录主题
当数据库客户端开云体育电动老虎机查询数据库时,客户端使用数据库的当前模式。但是,数据库模式可以在任何开云体育电动老虎机时候更改,这意味着连接器必须能够识别记录每个插入、更新或删除操作时的模式。此外,连接器不能只使用当前模式,因为连接器可能正在处理相对较旧的事件,这些事件是在表的模式更改之前记录的。
为了确保正确处理模式更改后发生的更改,MySQL在binlog中不仅包括对数据的行级更改,还包括应用到数据库的DDL语句。开云体育电动老虎机当连接器读取binlog并遇到这些DDL语句时,它会解析它们并更新每个表的模式的内存表示。连接器使用这种模式表示在每次插入、更新或删除操作时识别表的结构,并产生适当的更改事件。在一个单独的数据库模式历史K开云体育电动老虎机afka主题中,连接器记录了所有DDL语句以及每个DDL语句在binlog中的出现位置。
当连接器在崩溃或正常停止后重新启动时,连接器开始从特定位置(即从特定时间点)读取binlog。连接器通过读取数据库模式历史Kafka主题并解析所有DDL语句,直到连接器开始的binlog点,从而重新构建此时存在的表结构。开云体育电动老虎机
此数据库模开云体育电动老虎机式历史记录主题仅供连接器使用。连接器可以选择将模式更改事件发送到用于使用者应用程序的不同主题.
当MySQL连接器捕获表中的更改时,模式更改工具(如gh-ost
或pt-online-schema-change
时,将在迁移过程中创建辅助表。需要配置连接器以捕获对这些帮助表的更改。如果使用者不需要为辅助表生成的记录,则可以应用单个消息转换将其过滤掉。
看到主题的默认名称接收Debezium事件记开云体育官方注册网址录。
架构更改主题
您可以配置Debezium MySQL连开云体育官方注册网址接器,以生成描述应用于数据库中捕获的表的模式更改的模式更改事件。开云体育电动老虎机连接器将模式更改事件写入名为< topicPrefix >
,在那里topicPrefix
名称空间是否在topic.prefix
连接器配置属性。连接器发送到模式更改主题的消息包含有效负载,并且还可选地包含更改事件消息的模式。
架构更改事件消息的有效负载包括以下元素:
-
ddl
-
提供SQL
创建
,改变
,或下降
语句,该语句将导致架构更改。 -
开云体育电动老虎机数据库名
-
DDL语句应用到的数据库的名称。开云体育电动老虎机的价值
开云体育电动老虎机数据库名
作为消息键。 -
pos
-
语句在binlog中的出现位置。
-
tableChanges
-
模式更改后的整个表模式的结构化表示。的
tableChanges
字段包含一个数组,其中包含表中每列的条目。由于结构化表示以JSON或Avro格式表示数据,消费者可以轻松读取消息,而无需首先通过DDL解析器处理它们。
对于处于捕获模式的表,连接器不仅将模式更改的历史存储在模式更改主题中,还存储在内部数据库模式历史主题中。开云体育电动老虎机内部数据库模式历史记录主题开云体育电动老虎机仅供连接器使用,不打算由消费应用程序直接使用。确保需要模式更改通知的应用程序只使用来自模式更改主题的信息。 |
不要对数据库模式历史主题进行分区。开云体育电动老虎机为了使数据库模式开云体育电动老虎机历史主题正确地工作,它必须维护连接器发出给它的事件记录的一致的全局顺序。 为了确保topic不被分区分割,可以通过以下方法设置topic的分区数:
|
连接器向其模式更改主题发出的消息的格式处于酝酿状态,可以在不另行通知的情况下进行更改。 |
下面的示例显示了JSON格式的典型模式更改消息。该消息包含表模式的逻辑表示。
{"schema": {}, "payload": {"source": {(1)“版本”:“2.1.2。最后,“connector”:“mysql”,“name”:“mysql”,“ts_ms”:1651535750218,(2)"snapshot": "false", "db": "inventory", "sequence": null, "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 570, "row": 0, "thread": null, "query": null}, "d开云体育电动老虎机atabaseName": "inventory",(3)“schemaName”:null,“ddl”:“ALTER TABLE customers ADD middle_name varchar(255) AFTER first_name”,(4)“tableChanges”:((5){"type": "ALTER",(6)“id”:“\”客户库存\”,\“\”,(7)"表":{(8)"defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [(9)"id"], "columns": [(10){"name": "id", "jdbcType": 4, "nativeType": null, "typeName": "INT", "typeExpression": "INT", "charsetName": null, "length": null, "scale": null, "position": 1, "optional": false, "autoIncremented": true, "generated": true}, {"name": "first_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "utf8mb4", "length": 255, "scale": null, "position": 2, "optional": false, "autoIncremented": false, "generated": false}, {"name": "name": "first_name", "jdbcType": 12, "charsetName": "utf8mb4", "length": 255, "scale": null, "position":"middle_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "utf8mb4", "length": 255, "scale": null, "position": 3, "optional": true, "autoIncremented": false, "generated": false}, {"name": "last_name", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "utf8mb4", "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false}, {"name": "length": "length": 255, "scale": null, "position": 4, "optional": false, "autoIncremented": false, "generated": false}, {"name": "length": ""email", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": "VARCHAR", "charsetName": "utf8mb4", "length": 255, "scale": null, "position": 5, "optional": false, "autoIncremented": false, "generated": false}], "attributes": [(11){"customAttribute": "attributeValue"}]}}}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
的 |
2 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 在源对象中,ts_ms表示在数据库中进行更改的时间。开云体育电动老虎机通过比较payload.source的值。Ts_ms和payload的值。ts_ms,you can determine the lag between the source database update and Debezium. |
3. |
|
标识包含更改的数据库和模式。开云体育电动老虎机的值 |
4 |
|
该字段包含负责模式更改的DDL。的 |
5 |
|
包含DDL命令生成的模式更改的一个或多个项的数组。 |
6 |
|
描述变化的类型。取值为:
|
7 |
|
创建、修改或删除的表的完整标识符。对于表重命名,此标识符是的连接 |
8 |
|
表示应用更改后的表元数据。 |
9 |
|
组成表主键的列列表。 |
10 |
|
已更改表中每列的元数据。 |
11 |
|
每个表更改的自定义属性元数据。 |
参见:架构历史记录主题.
快照
当Debezi开云体育官方注册网址um MySQL连接器第一次启动时,它执行初始化一致的快照你的数据库。开云体育电动老虎机下面的流程描述连接器如何创建此快照。此流程用于默认快照模式,即最初的
.有关其他快照模式的信息,请参见MySQL连接器snapshot.mode
配置属性.
一步 | 行动 |
---|---|
1 |
获取一个阻塞的全局读锁写通过其他数据库客户开云体育电动老虎机端。 |
2 |
使用可重复读语义来确保事务中的所有后续读取都是针对一致的快照. |
3. |
读取当前binlog的位置。 |
4 |
读取为其配置连接器以捕获更改的数据库和表的模式。开云体育电动老虎机 |
5 |
释放全局读锁。其他数据库客开云体育电动老虎机户端现在可以写入数据库。 |
6 |
如果适用,将DDL更改写入模式更改主题,包括所有必要的更改 |
7 |
扫描数据库表。开云体育电动老虎机对于每一行,连接器都会触发 |
8 |
提交事务。 |
9 |
在连接器偏移量中记录完成的快照。 |
- 连接器重启
-
操作时,如果连接器失效、停止或重新平衡初始快照,然后在连接器重新启动后,它执行一个新的快照。在那之后开始快照完成后,Debezium MySQL开云体育官方注册网址连接器将从binlog中的相同位置重新启动,因此它不会错过任何更新。
如果连接器停止的时间足够长,MySQL就会清除旧的binlog文件,连接器的位置就会丢失。如果位置丢失,连接器将恢复到初始快照它的起始位置。有关Debezium MySQL连接器故障排除的更多技巧,请参见开云体育官方注册网址出现问题时的行为.
- 不允许全局读锁
-
有些环境不允许全局读锁。如果Debez开云体育官方注册网址ium MySQL连接器检测到不允许全局读锁,则连接器使用表级锁,并使用此方法执行快照。这需要Debezium连接器的数据库开云体育电动老虎机用户开云体育官方注册网址
锁表
特权。表3。使用表级锁执行初始快照的工作流 一步 行动 1
获取表级锁。
2
使用可重复读语义来确保事务中的所有后续读取都是针对一致的快照.
3.
读取并过滤数据库和表的名称。开云体育电动老虎机
4
读取当前binlog的位置。
5
读取为其配置连接器以捕获更改的数据库和表的模式。开云体育电动老虎机
6
如果适用,将DDL更改写入模式更改主题,包括所有必要的更改
滴……
而且创建…
DDL语句。7
扫描数据库表。开云体育电动老虎机对于每一行,连接器都会触发
创建
事件到相关的特定于表的Kafka主题。8
提交事务。
9
释放表级锁。
10
在连接器偏移量中记录完成的快照。
Ad hoc快照
默认情况下,连接器仅在第一次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据都只通过流处理传入。
但是,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供一种重新捕获表数据的机制,Debezium包含了一个执行临时快照的选项。开云体育官方注册网址数据库中的以下更改可能会导致执行临时快照:开云体育电动老虎机
连接器配置被修改以捕获一组不同的表。
Kafka主题被删除,必须重新构建。
数据损坏是由于配置错误或其他问题造成的。
您可以对先前捕获快照的表重新运行快照,方法是启动所谓的特别的快照.临时快照需要使用信号表.通过向Debezium信令表发送信号请求,可以启动一个临时快照。开云体育官方注册网址
当您初始化现有表的临时快照时,连接器将内容追加到已存在的表主题。如果先前存在的主题被删除,Debezium可以自动创建一个主题开云体育官方注册网址自动创建主题启用。
临时快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,也可以仅捕获数据库中表的一个子集。开云体育电动老虎机此外,快照可以捕获数据库中表内容的一个子集。开云体育电动老虎机
指定要捕获的表execute-snapshot
发送到信令表的消息。属性的类型execute-snapshot
信号增量
,并提供要包含在快照中的表的名称,如下表所示:
场 | 默认的 | 价值 |
---|---|---|
|
|
指定要运行的快照类型。 |
|
N/A |
包含与要快照的表的完全限定名称匹配的正则表达式的数组。 |
|
N/A |
一个可选字符串,它根据表的列指定一个条件,以捕获表内容的一个子集。 |
属性添加项来初始化临时快照execute-snapshot
发送到信令表的信号类型。连接器处理消息后,开始快照操作。快照进程读取第一个和最后一个主键值,并使用这些值作为每个表的起点和终点。根据表中条目的数量和配置的块大小,Debezium将表划分为块,然后依次对每个块进行快照,每次快照一个。开云体育官方注册网址
增量快照
为了提供管理快照的灵活性,Debezium包含了一个补充的快照机制,称为开云体育官方注册网址增量快照.增量快照依赖于Debezium机制开云体育官方注册网址发送信号到Debezium连接器开云体育官方注册网址.增量快照基于DDD-3设计文档。
在增量快照中,Debezium不像在初始快照中那样一次性捕获数据库的全部状态,而是在一系列可配置的块中分阶段捕获每个表。开云体育官方注册网址开云体育电动老虎机可以指定希望快照捕获的表和每个块的大小.块大小决定了快照在数据库上的每次获取操作期间收集的行数。开云体育电动老虎机增量快照的默认chunk大小为1kb。
随着增量快照的进行,Debezium使用水印来跟踪进度,维护它捕获的每个表开云体育官方注册网址行的记录。与标准的初始快照过程相比,这种分阶段捕获数据的方法具有以下优点:
您可以在流数据捕获的同时并行运行增量快照,而不是将流数据延迟到快照完成。在整个快照过程中,连接器继续从更改日志中捕获接近实时的事件,并且两个操作都不会阻塞另一个操作。
当增量快照进程中断时,可以恢复增量快照,不会丢失数据。进程恢复后,快照将从它停止的位置开始,而不是从开始重新捕获表。
您可以在任何时候按需运行增量快照,并根据需要重复该过程以适应数据库更新。开云体育电动老虎机例如,在修改连接器配置以向其添加表之后,可以重新运行快照
table.include.list
财产。
运行增量快照时,Debezium根据主键对每个表进行排序,然后根据主键将表分开云体育官方注册网址成块配置的块大小.它逐个块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个读
事件。该事件表示块快照开始时行的值。
随着快照的进行,其他进程可能会继续访问数据库,可能会修改表记录。开云体育电动老虎机为了反映这些变化,插入
,更新
,或删除
操作像往常一样提交到事务日志中。类似地,正在进行的Debezium流处理继续检开云体育官方注册网址测这些更改事件,并向Kafka发出相应的更改事件记录。
在某些情况下,更新
或删除
流流程发出的事件按顺序接收。也就是说,流处理过程可能在快照捕获包含表行的块之前发出修改表行的事件读
事件。当快照最终发出相应的读
事件时,其值已被取代。为了确保按正确的逻辑顺序处理不按顺序到达的增量快照事件,Debezium采用了一种缓冲方案来解决冲突。开云体育官方注册网址只有在快照事件和流事件之间的冲突被解决之后,Debezium才会向Kafka发送事件记录。开云体育官方注册网址
协助解决晚到车辆之间的碰撞读
事件和流事件修改同一表行,Debezium采用了所谓的开云体育官方注册网址快照窗口.快照窗口定义了增量快照为指定表块捕获数据的时间间隔。在块的快照窗口打开之前,Debezium遵循其通常的行为,从事务日志中直接向下游的目标Kaf开云体育官方注册网址ka主题发送事件。但是从特定块的快照打开的那一刻起,直到它关闭,Debezium执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。开云体育官方注册网址
对于每个数据收集,Debezium会发出两种类型的事件,并将开云体育官方注册网址它们的记录存储在一个目的地Kafka主题中。它直接从表捕获的快照记录发出为读
操作。同时,随着用户继续更新数据收集中的记录,事务日志被更新以反映每次提交,Debezium就会发出开云体育官方注册网址更新
或删除
每个更改的操作。
当快照窗口打开时,Debezium开始处理快照块,它将快照记录传递到开云体育官方注册网址内存缓冲区。的主键在快照窗口期间读
缓冲区中的事件与传入流事件的主键进行比较。如果没有匹配到,流事件记录将直接发送到Kafka。如果D开云体育官方注册网址ebezium检测到匹配,它将丢弃缓冲读
事件,并将流记录写入目标主题,因为流事件在逻辑上取代静态快照事件。块的快照窗口关闭后,缓冲区仅包含读
不存在相关事务日志事件的事件。开云体育官方注册网址Debezium释放这些剩余的读
事件到表的Kafka主题。
连接器对每个快照块重复该过程。
触发增量快照
目前,创建增量快照的唯一方式是发送自组织快照信号到源数据库上的信令表。开云体育电动老虎机将一个信号作为SQL提交给信令表插入
查询。
在Debez开云体育官方注册网址ium检测到信号表中的变化后,它读取信号,并运行所请求的快照操作。
提交的查询指定要包含在快照中的表,还可以指定快照操作的类型。目前,快照操作的唯一有效选项是默认值,增量
.
若要指定要包含在快照中的表,请提供数据收集
列出表的数组或用于匹配表的正则表达式数组,例如{“数据收集”:["。MyFirstTable”、“公众。MySecondTable ")}
的数据收集
增量快照信号阵列没有默认值。如果数据收集
数组为空时,Debezium检测开云体育官方注册网址到不需要任何操作,因此不执行快照。
如果要包含在快照中的表的名称包含点( |
启用信令.
源数据库中存在信令数据集合。开云体育电动老虎机
信令数据采集在
signal.data.collection
财产。
发送一个SQL查询,将临时增量快照请求添加到信令表:
插入< signalTable >(id, type, data)' < id > ',' < snapshotType >”, '{"data-collections": ["<表>”、“<表>“,”类型”:“< snapshotType >”、“附加条件”:“<附加条件>“}”);
例如,
INSERT INTO myschema.开云体育官方注册网址debezium_signal (id, type, data)(1)值(“ad-hoc-1”,(2)“execute-snapshot”,(3)”{代码基于schema1中“数据收集”:["。表1”、“schema2.table2”),(4)“类型”:“增量”},(5)“附加条件”:“颜色=蓝色“}”);(6)
的值
id
,类型
,数据
命令中的参数对应信令表字段.样例中参数说明如下:
表5所示。向信令表发送增量快照信号的SQL命令字段说明 项 价值 描述 1
myschema.开云体育官方注册网址debezium_signal
指定源数据库上信令表的全限定名。开云体育电动老虎机
2
ad-hoc-1
的
id
参数指定分配为id
信号请求的标识符。
使用此字符串标识信令表中条目的日志消息。开云体育官方注册网址Debezium不使用这个字符串。相反,在快照期间,Debezium生成自己的快照开云体育官方注册网址id
字符串作为水印信号。3.
execute-snapshot
指定
类型
参数指定信号要触发的操作。4
数据收集
的必需组件
数据
信号的字段,该字段指定表名或正则表达式的数组,以匹配要包含在快照中的表名。
对象中指定连接器的信令表的名称时使用的格式相同,该数组列出了根据表的全限定名匹配表的正则表达式signal.data.collection
配置属性。5
增量
一个可选的
类型
组成部分数据
信号的字段,指定要运行的快照操作的类型。
目前,唯一有效的选项是默认值,增量
.
如果不指定值,连接器将运行增量快照。6
附加条件
一个可选字符串,它根据表的列指定一个条件,以捕获表内容的子集。有关的更多信息
附加条件
参数,看到的临时增量快照附加条件
.
附加条件
如果希望快照仅包含表中内容的子集,则可以通过添加附加条件
参数表示快照信号。
典型快照的SQL查询形式如下:
Select * from<表>....
通过添加附加条件
参数,则追加一个在哪里
输入SQL查询的条件,示例如下:
Select * from<表>在哪里<附加条件>....
下面的示例显示了一个SQL查询,该查询向信令表发送一个附加条件的临时增量快照请求:
插入< signalTable >(id, type, data)' < id > ',' < snapshotType >”, '{"data-collections": ["<表>”、“<表>“,”类型”:“< snapshotType >”、“附加条件”:“<附加条件>“}”);
例如,假设你有一个产品
表,包含以下列:
id
(主键)颜色
数量
的增量快照产品
表中仅包含其中的数据项颜色=蓝色
,可以使用下面的SQL语句触发快照:
INSERT INTO myschema.开云体育官方注册网址debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1. data ", ' "data-collections": ["schema1. data ")产品”),“类型”:“增量”、“附加条件”:“颜色=蓝色"});
的附加条件
参数还允许您传递基于多个列的条件。例如,使用产品
表中,您可以提交一个查询,该查询将触发一个增量快照,该快照仅包含那些项目的数据颜色=蓝色
而且量> 10
:
INSERT INTO myschema.开云体育官方注册网址debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1. data ", ' "data-collections": ["schema1. data ")product "],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');
下面的示例显示了连接器捕获的增量快照事件的JSON。
{“前”:零,“后”:{“pk”:“1”,“价值”:“新数据”},“源”:{…“快照”:“增量”(1)},“人事处”:“r”,(2)"ts_ms":"1620393591654", "transaction":null}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
指定要运行的快照操作的类型。 |
2 |
|
事件类型。 |
停止增量快照
您还可以通过向源数据库上的表发送信号来停止增量快照。开云体育电动老虎机通过发送SQL向表提交停止快照信号插入
查询在Debez开云体育官方注册网址ium检测到信号表中的变化后,它将读取信号,如果增量快照操作正在进行,则停止增量快照操作。
提交的查询指定的快照操作增量
,以及要删除的当前运行快照的表(可选)。
启用信令.
源数据库中存在信令数据集合。开云体育电动老虎机
信令数据采集在
signal.data.collection
财产。
向信令表发送一个SQL查询来停止临时增量快照:
插入< signalTable >(id, type, data)' < id > ', 'stop-snapshot', '{"data-collections": ["<表>”、“<表>”,“类型”:“增量”}’);
例如,
INSERT INTO myschema.开云体育官方注册网址debezium_signal (id, type, data)(1)值(“ad-hoc-1”,(2)“stop-snapshot”,(3)”{代码基于schema1中“数据收集”:["。表1”、“schema2.table2”),(4)“类型”:“增量”}’);(5)
的值
id
,类型
,数据
signal命令中的参数对应信令表字段.样例中参数说明如下:
表6所示。向信令表发送停止增量快照信号的SQL命令字段说明 项 价值 描述 1
myschema.开云体育官方注册网址debezium_signal
指定源数据库上信令表的全限定名。开云体育电动老虎机
2
ad-hoc-1
的
id
参数指定分配为id
信号请求的标识符。
使用此字符串标识信令表中条目的日志消息。开云体育官方注册网址Debezium不使用这个字符串。3.
stop-snapshot
指定
类型
参数指定信号要触发的操作。4
数据收集
的可选组件
数据
信号的字段,该字段指定表名或正则表达式数组,以匹配要从快照中删除的表名。
对象中指定连接器的信令表的名称时使用的格式相同,该数组列出了根据表的全限定名匹配表的正则表达式signal.data.collection
配置属性。如果这个分量数据
字段,则信号停止正在进行的整个增量快照。5
增量
的必需组件
数据
信号的字段,该字段指定要停止的快照操作的类型。
目前,唯一有效的选项是增量
.
如果没有指定类型
值时,信号停止增量快照失败。
只读增量快照
MySQL连接器允许使用到数据库的只读连接运行增量快照。开云体育电动老虎机要运行具有只读访问的增量快照,连接器使用设置为高水位和低水位的已执行全局事务id (GTID)。通过将二进制日志(binlog)事件的gtid或服务器的心跳与低水位和高水位进行比较,可以更新块窗口的状态。
属性的值可切换到只读实现read.only
财产真正的
.
如果连接器从一个多线程副本(即
replica_parallel_workers
大于0
),你必须设置以下其中一个选项:replica_preserve_commit_order =对
slave_preserve_commit_order =对
Ad hoc只读增量快照
当MySQL连接是只读的时,将信号表机制也可以通过向Kafka主题发送消息来运行快照signal.kafka.topic财产。
的值必须匹配Kafka消息的键值topic.prefix
连接器配置选项。
值为JSON对象类型
而且数据
字段。
信号类型为execute-snapshot
和数据
字段必须包含以下字段:
场 | 默认的 | 价值 |
---|---|---|
|
|
需要执行的快照类型。目前只 |
|
N/A |
一个用逗号分隔的正则表达式数组,它匹配要快照的表的完全限定名称。 |
|
N/A |
一个可选字符串,它根据表的列指定一个条件,以捕获表内容的一个子集。 |
Kafka的execute-snapshot消息示例:
Key = ' test_connector ' Value = ' {"type":"execute-snapshot","data": {"data-collections": ["schema1. schema1. "表1”、“代码基于schema1中。表格2"], "type": "INCREMENTAL"}}`
附加条件的临时只读增量快照
附加条件
用于选择表内容的子集。打个比方
附加条件
使用:对于快照,后台执行的SQL查询是这样的:
Select * from<表>....
的快照
附加条件
,附加条件
被追加到SQL查询,类似于:Select * from<表>在哪里<附加条件>....
假设有一个
产品
列表id
(主键),颜色
而且品牌
.的内容进行快照
产品
表在哪里颜色=蓝色
Key = ' test_connector ' Value = ' {"type":"execute-snapshot","data": {"data-collections": ["schema1. schema1. "product "], "type": "INCREMENTAL", "additional-condition":"color=blue"}} '
附加条件
可用于基于多个列传递条件。使用相同的产品
表中,要快照内容的产品
表在哪里颜色=蓝色
而且品牌= foo
Key = ' test_connector ' Value = ' {"type":"execute-snapshot","data": {"data-collections": ["schema1. schema1. "product "], "type": "INCREMENTAL", " extra -condition":"color=blue AND brand=foo"}} '
停止Ad hoc只读增量快照
当MySQL连接是只读的时,将信号表机制也可以通过向Kafka主题发送消息来停止快照signal.kafka.topic财产。
的值必须匹配Kafka消息的键值topic.prefix
连接器配置选项。
值为JSON对象类型
而且数据
字段。
信号类型为stop-snapshot
和数据
字段必须包含以下字段:
场 | 默认的 | 价值 |
---|---|---|
|
|
需要执行的快照类型。目前只 |
|
N/A |
逗号分隔的正则表达式的可选数组,它匹配要快照的表的完全限定名称。 |
Kafka的stop-snapshot消息示例:
Key = ' test_connector ' Value = ' {"type":"stop-snapshot","data": {"data-collections": ["schema1. schema1. "表1”、“代码基于schema1中。表格2"], "type": "INCREMENTAL"}}`
快照事件的操作类型
MySQL连接器发出快照事件为读
操作("op": "r")
.如果希望连接器发出快照事件为创建
(c
)事件,配置Debezium开云体育官方注册网址ReadToInsertEvent
单个消息转换(SMT)来修改事件类型。
配置SMT的示例如下:
ReadToInsertEvent
SMT来更改快照事件的类型变换= snapshotasinsert,…transforms.snapshotasinsert.type = i开云体育官方注册网址o.debezium.connector.mysql.transforms.ReadToInsertEvent
主题名称
默认情况下,MySQL连接器为所有的插入
,更新
,删除
对特定于该表的单个Apache Kafka主题的表中发生的操作。
连接器使用以下约定来命名更改事件主题:
topicPrefix.开云体育电动老虎机databaseName.tableName
假设实现
是主题前缀,库存
数据库名称,以开云体育电动老虎机及数据库包含的表名称订单
,客户
,产品
.Debe开云体育官方注册网址zium MySQL连接器向三个Kafka主题发出事件,分别对应数据库中的每个表:开云体育电动老虎机
履行。库存。订单履行。库存。客户履行。库存。产品
下面的列表提供了默认名称组件的定义:
- topicPrefix
-
属性指定的主题前缀
topic.prefix
连接器配置属性。 - schemaName
-
发生操作的模式的名称。
- 的表
-
发生操作的表的名称。
如果默认的主题名称不满足您的需求,您可以配置自定义的主题名称。要配置自定义主题名称,需要在逻辑主题路由SMT中指定正则表达式。有关使用逻辑主题路由SMT自定义主题命名的详细信息,请参见主题的路由.
事务的元数据
开云体育官方注册网址Debezium可以生成表示事务边界的事件,并丰富数据更改事件消息。
Debezium何时接收事务元开云体育官方注册网址数据的限制
开云体育官方注册网址Debezium仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。 |
开云体育官方注册网址类生成事务边界事件开始
而且结束
每个事务中的分隔符。事务边界事件包含以下字段:
-
状态
-
开始
或结束
. -
id
-
唯一事务标识符的字符串表示形式。
-
ts_ms
-
事务边界事件的时间(
开始
或结束
事件)。如果数据源没有向Debezium提供事件时间,则该字段表示Debeziu开云体育官方注册网址m处理事件的时间。 -
event_count
(结束
事件) -
事务发出的事件总数。
-
data_collections
(结束
事件) -
对的数组
data_collection
而且event_count
元素,该元素指示连接器为源自数据集合的更改发出的事件数。
{"status": "BEGIN", "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "ts_ms": " 1486500577125 ", "event_count": null, "data_collections": null} {"status": "END", "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "ts_ms": 1486500577691, "event_count": 2, "data_collections": [{"data_collection": "s1。A ", "event_count": 1}, {"data_collection": "s2. "A ", "event_count": 1}]}
方法覆盖topic.transaction
选项时,连接器将事务事件发出到< topic.prefix >
.transaction
的话题。
启用事务元数据时,数据消息信封
是充实了新的事务
字段。这个字段以复合字段的形式提供关于每个事件的信息:
-
id
-
唯一事务标识符的字符串表示形式。
-
total_order
-
事件在事务生成的所有事件中的绝对位置。
-
data_collection_order
-
事件在事务发出的所有事件中的每数据收集位置。
下面是一个消息的例子:
{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…}, "op": "c", "ts_ms": "1580390884335", "transaction": {"id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "total_order": "1", "data_collection_order": "1"}}
对于没有启用GTID的系统,事务标识符是使用binlog文件名和binlog位置的组合构造的。例如,如果binlog文件名和对应于事务BEGIN事件的位置是mysql-bin。分别为000002和1913,则Debezium构造的事务标识符将为开云体育官方注册网址文件= mysql-bin.000002 pos = 1913
.
数据变更事件
Debe开云体育官方注册网址zium MySQL连接器为每个行级生成一个数据更改事件插入
,更新
,删除
操作。每个事件包含一个键和一个值。键和值的结构取决于所更改的表。
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。
下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式
字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器并配置它来生成所有四个基本的更改事件部分,则更改事件的结构如下:
{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
项 | 字段名 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3. |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始表相同。看到主题名称.
MySQL连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名、数据库名和表名中的每个字符必须是拉丁字母、数字或下划线(a-z、a-z、0-9或_)。开云体育电动老虎机如果存在无效字符,则将其替换为下划线字符。 如果逻辑服务器名、数据库名或表名包含无效字符,并且区分名称之间的唯一字符无效,因此用下划线替换,则可能导致意外冲突。开云体育电动老虎机 |
更改事件键
更改事件的键包含已更改表的键和已更改行的实际键的模式。模式及其对应的有效负载都为更改后的表的每一列包含一个字段主键
(或唯一约束)在连接器创建事件时。
考虑以下几点客户
表,后面是此表的更改事件键的示例。
CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY) AUTO_INCREMENT=1001;
对象的更改的每个更改事件客户
表具有相同的事件键模式。只要客户
表具有前面的定义,则捕获对的更改的每个更改事件客户
表的键结构如下。在JSON中,它看起来是这样的:
{"schema": {(1)"type": "struct", "name": "mysql-server-1.inventory.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){“字段”:“id”,“类型”:“int32”,“可选”:假}},“有效载荷”:{(5)"id": 1001}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容 |
2 |
|
定义键的有效负载结构的模式的名称。此模式描述已更改表的主键的结构。关键模式名具有该格式connector-name.开云体育电动老虎机数据库名称.表名.
|
3. |
|
指示事件键是否必须在其中包含值 |
4 |
|
属性中期望的每个字段 |
5 |
|
包含为其生成此更改事件的行的键。在本例中,键包含一个单键 |
更改事件值
change事件中的值比键稍微复杂一些。和键一样,值也有模式
Section和a有效载荷
部分。的模式
类的模式信封
的结构有效载荷
节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。
考虑用于显示更改事件键示例的同一个示例表:
CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY) AUTO_INCREMENT=1001;
更改此表的更改事件的值部分描述如下:
创建事件
对象中创建数据的操作所生成的更改事件的值部分,示例如下客户
表:
{"schema": {(1)"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "mysql-server-1.inventory.customers.Value",(2)"field": "before"}, {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "mysql-server-1.inventory.customers "。价值", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source",(3)"field": "source"}, {"type": "string", "optional": false, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "mysql-server-1.inventory. clients . envelope "(4)}, "有效载荷":{(5)“人事处”:“c”,(6)“ts_ms”:1465491411815,(7)“之前”:空,(8)"后":{(9)"id": 1004, "first_name": Anne", "last_name": Kretchmar", "email": "annek@noanswer.org"}, "source": {(10)“版本”:“2.1.2。Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 0, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 0, "gtid": null, "file": "mysql-bin "。000003", "pos": 154, "row": 0, "thread": 7, "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"}}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
值的模式,它描述值的有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值模式都是相同的。 |
2 |
|
在 |
3. |
|
|
4 |
|
|
5 |
|
该值为实际数据。这是变更事件提供的信息。 |
6 |
|
返回string,描述导致连接器产生事件的操作类型。在这个例子中,
|
7 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
8 |
|
可选字段,指定事件发生之前行的状态。当 |
9 |
|
可选字段,指定事件发生后行的状态。在本例中, |
10 |
|
描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:
如果 |
更新事件
样例中更新的更改事件的值客户
表的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。中的更新中连接器生成的事件中的更改事件值的示例客户
表:
{"schema":{…}, "payload": {"before": {(1)“id”:1004年,“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“后”:{(2)"id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "source": {(3)“版本”:“2.1.2。Final", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581029100, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"}, "op": "u",(4)“ts_ms”:1465581029523(5)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
可选字段,指定事件发生之前行的状态。在一个更新事件值,则 |
2 |
|
可选字段,指定事件发生后行的状态。你可以比较 |
3. |
|
描述事件的源元数据的必填字段。的
如果 |
4 |
|
返回string,描述操作类型。在一个更新事件值,则 |
5 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
更新一行的主键/唯一键的列将更改该行键的值。当一个键改变时,Debezium输出开云体育官方注册网址三个事件: |
主键更新
一个更新
更改行的主键字段的操作称为主键更改。对于主键更改,替换为更新
事件记录时,连接器发出删除
事件记录的旧键和创建
新的(更新的)键的事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:
的
删除
事件记录有__开云体育官方注册网址debezium.newkey
作为消息头。此标头的值是更新行的新主键。的
创建
事件记录有__开云体育官方注册网址debezium.oldkey
作为消息头。此标头的值是已更新行的前一个(旧的)主键。
删除事件
的值删除改变事件有相同之处模式
部分为创建而且更新同一表的事件。的有效载荷
的一部分删除事件。客户
表是这样的:
{"schema":{…}, "payload": {"before": {(1)"id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "after": null,(2)“源”:{(3)“版本”:“2.1.2。Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581902300, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 805, "row": 0, "thread": 7, "query": "DELETE FROM customers WHERE id=1004"}, "op": "d",(4)“ts_ms”:1465581902461(5)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
可选字段,指定事件发生之前行的状态。在一个删除事件值,则 |
2 |
|
可选字段,指定事件发生后行的状态。在一个删除事件值,则 |
3. |
|
描述事件的源元数据的必填字段。在一个删除事件值,则
如果 |
4 |
|
返回string,描述操作类型。的 |
5 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
一个删除更改事件记录为使用者提供了处理删除该行所需的信息。包含旧值是因为一些使用者可能需要它们来正确处理删除。
MySQL连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。
墓碑上的事件
删除一行时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有相同键的消息,消息值必须为零
.为了实现这一点,在Debezium的MySQL连接器发出一开云体育官方注册网址个删除事件时,连接器发出一个特殊的墓碑事件,该事件具有相同的键,但具有零
价值。
截断事件
一个截断更改事件表示表已被截断。消息键是零
在这种情况下,消息值看起来像这样:
{"schema":{…}, "payload": {"source": {(1)“版本”:“2.1.2。Final", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581029100, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"}, "op": "t",(2)“ts_ms”:1465581029523(3)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
描述事件的源元数据的必填字段。在一个截断事件值,则
|
2 |
|
返回string,描述操作类型。的 |
3. |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 +在 |
如果一个截断
语句应用于多个表截断将发出每个截断表的更改事件记录。
请注意,由于截断事件表示对整个表所做的更改,并且没有消息键,除非您使用单个分区的主题,否则与表相关的更改事件没有顺序保证(创建,更新等)和截断该表的事件。例如,消费者可能会收到一个更新事件发生后,截断事件,当从不同分区读取这些事件时。
数据类型映射
Debe开云体育官方注册网址zium MySQL连接器用事件表示行更改,事件的结构类似于行所在的表。该事件为每个列值包含一个字段。该列的MySQL数据类型决定了Debezium如何表示事件中的值。开云体育官方注册网址
存储字符串的列在MySQL中使用字符集和排序规则定义。MySQL连接器在binlog事件中读取列值的二进制表示时使用列的字符集。
连接器可以将MySQL数据类型映射到两者文字而且语义类型。
文字类型:如何使用Kafka Connect模式类型来表示值。
语义类型: Kafka Connect模式如何捕获字段的含义(模式名)。
如果默认的数据类型转换不能满足您的需求,您可以这样做创建自定义转换器对于连接器。
基本类型
下表显示了连接器如何映射基本的MySQL数据类型。
MySQL类型 | 文字类型 | 语义类型 |
---|---|---|
|
|
N/A |
|
|
N/A |
|
|
|
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
精度仅用于确定存储大小。精密 |
|
|
从MySQL 8.0.17开始,非标准的FLOAT(M,D)和DOUBLE(M,D)语法已弃用,在MySQL的未来版本中应该会移除对它的支持 |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
时间类型
不包括时间戳
的值,MySQL的时态类型取决于time.precision.mode
连接器配置属性。为时间戳
列的默认值指定为CURRENT_TIMESTAMP
或现在
,值1970-01-01就是
是Kafka Connect模式的默认值。
MySQL允许为零值日期
,DATETIME
,时间戳
列,因为零值有时比空值更可取。当列定义允许空值时,MySQL连接器将零值表示为空值,当列不允许空值时,则将零值表示为历元日。
的DATETIME
Type表示本地日期和时间,例如“2018-01-13 09:48:27”。如您所见,这里没有时区信息。这些列将使用UTC根据列的精度转换为epoch毫秒或微秒。的时间戳
类型表示不包含时区信息的时间戳。写入时,MySQL将从服务器(或会话)当前时区转换为UTC,读取时将从UTC转换为服务器(或会话)当前时区。例如:
DATETIME
值为2018-06-20 06:37:03
就变成了1529476623000
.时间戳
值为2018-06-20 06:37:03
就变成了2018 - 06 - 20 - t13:37:03z
.
这些列被转换为等效的列io.开云体育官方注册网址debezium.time.ZonedTimestamp
基于服务器(或会话)当前时区的UTC格式。默认从服务器上查询时区。如果失败,则必须由数据库显式指定开云体育电动老虎机connectionTimeZone
MySQL配置选项。例如,如果数据库的时区(全局的或为连接器开云体育电动老虎机配置的)connectionTimeZone
选项)为“America/Los_Angeles”,则TIMESTAMP值“2018-06-20 06:37:03”由ZonedTimestamp
值为“2018-06-20T13:37:03Z”。
运行Kafka Connect和Debezium的JVM的时区不会影响这些转换。开云体育官方注册网址
有关与时间值相关的属性的更多详细信息,请参阅MySQL连接器配置属性.
- time.precision.mode = adaptive_time_microseconds(默认)
-
MySQL连接器根据列的数据类型定义确定文字类型和语义类型,以便事件准确地表示数据库中的值。开云体育电动老虎机所有时间字段都以微秒为单位。只有积极的
时间
范围内的字段值00:00:00.000000
来23:59:59.999999
可以正确捕获。表16所示。映射时 time.precision.mode = adaptive_time_microseconds
MySQL类型 文字类型 语义类型 日期
INT32
io.开云体育官方注册网址debezium.time.Date
表示自纪元以来的天数。((M))
INT64
io.开云体育官方注册网址debezium.time.MicroTime
表示以微秒为单位的时间值,不包括时区信息。MySQL允许米
在…的范围内0 - 6
.Datetime, Datetime (0), Datetime (1), Datetime (2), Datetime (3)
INT64
io.开云体育官方注册网址debezium.time.Timestamp
表示经过纪元的毫秒数,不包括时区信息。Datetime (4), Datetime (5), Datetime (6)
INT64
io.开云体育官方注册网址debezium.time.MicroTimestamp
表示经过纪元的微秒数,不包括时区信息。 - time.precision.mode =连接
-
MySQL连接器使用定义好的Kafka Connect逻辑类型。此方法不如默认方法精确,如果数据库列具有开云体育电动老虎机分数秒精度值大于
3.
.值的范围00:00:00.000
来23:59:59.999
可以处理。集time.precision.mode =连接
只有你能保证时间
表中的值永远不会超过支持的范围。的连接
设置有望在Debezium的未来版本中被删除。开云体育官方注册网址表17所示。映射时 time.precision.mode =连接
MySQL类型 文字类型 语义类型 日期
INT32
org.apache.kafka.connect.data.Date
表示自纪元以来的天数。((M))
INT64
org.apache.kafka.connect.data.Time
表示自午夜以来的时间值(以微秒为单位),不包括时区信息。DATETIME ((M))
INT64
org.apache.kafka.connect.data.Timestamp
表示自纪元以来的毫秒数,不包括时区信息。
十进制类型
开云体育官方注册网址的设置来处理小数decimal.handling.mode
连接器配置属性.
- decimal.handling.mode =精确
-
表18。映射时 decimal.handling.mode =精确
MySQL类型 文字类型 语义类型 数字((M [D]))
字节
org.apache.kafka.connect.data.Decimal
的规模
模式参数包含一个整数,表示小数点移位了多少位。小数((M [D]))
字节
org.apache.kafka.connect.data.Decimal
的规模
模式参数包含一个整数,表示小数点移位了多少位。 - decimal.handling.mode =双
-
表19。映射时 decimal.handling.mode =双
MySQL类型 文字类型 语义类型 数字((M [D]))
FLOAT64
N/A
小数((M [D]))
FLOAT64
N/A
- decimal.handling.mode =字符串
-
表20。映射时 decimal.handling.mode =字符串
MySQL类型 文字类型 语义类型 数字((M [D]))
字符串
N/A
小数((M [D]))
字符串
N/A
布尔值
MySQL处理布尔
以特定的方式进行内部价值评估。的布尔
列在内部映射到非常小的整数(1)
数据类型。当在流期间创建表时,它使用proper布尔
当Debezium接收开云体育官方注册网址到原始DDL时映射。在快照期间,Debezium执行开云体育官方注册网址显示创建表
获取返回的表定义非常小的整数(1)
对于这两个布尔
而且非常小的整数(1)
列。开云体育官方注册网址Debezium没有办法获得原始类型映射,因此映射到非常小的整数(1)
.
为了使您能够将源列转换为布尔数据类型,Debezium提供了一个开云体育官方注册网址TinyIntOneToBooleanConverter
自定义转换器你可以用以下方式之一来使用:
映射所有
非常小的整数(1)
或非常小的整数(1)无符号
列布尔
类型。使用逗号分隔的正则表达式列表枚举列的子集。
要使用这种类型的转换,必须设置转换器
配置属性的选择器
参数,示例如下:converters=boolean boolean.type=io.开云体育官方注册网址debezium.connector.mysql.converters.TinyIntOneToBooleanConverter boolean.selector=db1.table1。*, db1.table2.column1
注意:MySQL8不显示的长度
非常小的整数无符号
快照执行时的类型显示创建表
,这意味着这个转换器不能工作。新选项length.checker
可以解决这个问题,默认值是真正的
.禁用length.checker
并指定需要转换为的列选择器
属性,而不是基于类型转换所有列,如下例所示:converters=boolean boolean.type=io.开云体育官方注册网址debezium.connector.mysql.converters.TinyIntOneToBooleanConverter boolean.length。检查程序= false boolean.selector = db1.table1。*, db1.table2.column1
空间类型
目前,Debezium MyS开云体育官方注册网址QL连接器支持以下空间数据类型。
MySQL类型 | 文字类型 | 语义类型 |
---|---|---|
|
|
|
设置MySQL
在安装和运行Debezium连接器之前,需要完成一些MySQL设置任务。开云体育官方注册网址
创建用户
De开云体育官方注册网址bezium MySQL连接器需要一个MySQL用户帐户。这个MySQL用户必须对Debezium MySQL连接器捕获更改的所有数据库具有适当的权限。开云体育官方注册网址开云体育电动老虎机
MySQL服务器。
基本了解SQL命令。
创建MySQL用户:
mysql> CREATE USER ' USER '@'localhost' IDENTIFIED BY 'password'
授予用户所需的权限:
mysql>授权选择,重新加载,显示数据库,复制从,复制客户端上*。开云体育电动老虎机*以“密码”识别的“用户”;
权限说明如下表所示。
如果使用不允许全局读锁的托管选项(如Amazon RDS或Amazon Aurora),则使用表级锁创建全局读锁一致的快照.在这种情况下,您还需要授予 锁表
您所创建的用户的权限。看到快照欲知详情。确定用户权限:
mysql> FLUSH PRIVILEGES;
关键字 | 描述 |
---|---|
|
允许连接器从数据库中的表中选择行。开云体育电动老虎机仅在执行快照时使用。 |
|
使连接器能够使用 |
|
命令使连接器可以查看数据库名称开云体育电动老虎机 |
|
允许连接器连接和读取MySQL服务器的binlog。 |
|
允许连接器使用以下语句:
连接器总是需要这个。 |
|
标识权限应用到的数据库。开云体育电动老虎机 |
|
指定要授予权限的用户。 |
|
指定用户的MySQL密码。 |
启用binlog
MySQL复制必须启用二进制日志记录。二进制日志记录用于复制工具传播更改的事务更新。
MySQL服务器。
适当的MySQL用户权限。
检查是否
log-bin
选项已开启:// MySqlSELECT variable_value as "BINARY LOGGING STATUS (log-bin)::" FROM information_schemaWHERE variable_name='log_bin';// MySqlSELECT variable_value as "BINARY LOGGING STATUS (log-bin)::" FROM performance_schemaWHERE variable_name='log_bin';
如果是的话
从
,用以下属性配置你的MySQL服务器配置文件,如下表所示:server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10
再次检查binlog状态,确认您的更改:
// MySqlSELECT variable_value as "BINARY LOGGING STATUS (log-bin)::" FROM information_schemaWHERE variable_name='log_bin';// MySqlSELECT variable_value as "BINARY LOGGING STATUS (log-bin)::" FROM performance_schemaWHERE variable_name='log_bin';
财产 | 描述 |
---|---|
|
的值。 |
|
的价值 |
|
的 |
|
的 |
|
这是自动删除binlog文件的天数。默认为 |
使GTIDs
全局事务标识符(gtid)唯一标识集群内服务器上发生的事务。虽然对于Debezium MySQL连接器来说不是必开云体育官方注册网址需的,但是使用gtid简化了复制,使您能够更容易地确认主服务器和复制服务器是否一致。
gtid在MySQL 5.6.5及更高版本中可用。看到MySQL文档欲知详情。
MySQL服务器。
基本了解SQL命令。
访问MySQL配置文件。
启用
gtid_mode
:mysql > gtid_mode =
启用
enforce_gtid_consistency
:mysql > enforce_gtid_consistency =
确认更改:
mysql>显示全局变量“%GTID%”;
+--------------------------+-------+ | Variable_name |值 | +--------------------------+-------+ | 上enforce_gtid_consistency | | | gtid_mode | | +--------------------------+-------+
选项 | 描述 |
---|---|
|
布尔值,指定MySQL服务器是否启用GTID模式。
|
|
布尔值,指定服务器是否通过允许以事务安全方式登录的语句执行来强制GTID一致性。使用gtid时必须。
|
配置会话超时
当为大型数据库创建初始一致快照时,您所建立的连接可能会在读取表时超时。开云体育电动老虎机可以通过配置来防止这种行为interactive_timeout
而且wait_timeout
在MySQL配置文件中。
MySQL服务器。
基本了解SQL命令。
访问MySQL配置文件。
配置
interactive_timeout
:mysql > interactive_timeout = < duration-in-seconds >
配置
wait_timeout
:mysql > wait_timeout = < duration-in-seconds >
选项 | 描述 |
---|---|
|
服务器在关闭交互连接之前等待交互连接活动的秒数。看到MySQL的文档欲知详情。 |
|
服务器在关闭非交互式连接之前等待活动的秒数。看到MySQL的文档欲知详情。 |
启用查询日志事件
你可能想看看原版SQL
语句用于每个binlog事件。使binlog_rows_query_log_events
MySQL配置文件中的选项允许你这样做。
该选项在MySQL 5.6及更高版本中可用。
MySQL服务器。
基本了解SQL命令。
访问MySQL配置文件。
启用
binlog_rows_query_log_events
:mysql > binlog_rows_query_log_events =
binlog_rows_query_log_events
是否设置为启用/禁用对包含原始文件的支持的值SQL
语句。在
=启用从
=禁用
验证binlog行值选项
检查binlog_row_value_options
变量,并确保该值为不设置为PARTIAL_JSON
,因为在这种情况下连接器可能无法消费更新事件。
MySQL服务器。
基本了解SQL命令。
访问MySQL配置文件。
检查当前变量值
Mysql >显示全局变量where variable_name = 'binlog_row_value_options';
结果
+--------------------------+-------+ | Variable_name |值 | +--------------------------+-------+ | binlog_row_value_options | | +--------------------------+-------+
case值为
PARTIAL_JSON
,通过:Mysql > set @@global。binlog_row_value_options = " ";
部署
要部署Debezium 开云体育官方注册网址MySQL连接器,您需要安装Debezium MySQL连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。
MySQL Server已经安装完成与Debezium连接器一起工作开云体育官方注册网址.
下载Debezium开云体育官方注册网址MySQL连接器插件.
将文件解压缩到Kafka Connect环境中。
将包含JAR文件的目录添加到卡夫卡连接的
plugin.path
.重新启动Kafka Connect进程以获取新的JAR文件。
如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Apache Zookeeper, Apache Kafka, MySQL和Kafka连接MySQL连接器已经安装并准备运行。
MySQL连接器配置示例
下面是一个连接器实例的配置示例,该实例从端口为192.168.99.100的3306上的MySQL服务器捕获数据,逻辑上我们将其命名为192.168.99.100fullfillment
.通常,通过设置连接器可用的配置属性,可以在JSON文件中配开云体育官方注册网址置Debezium MySQL连接器。
您可以选择为数据库中的模式和表的子集生成事件。开云体育电动老虎机可选地,您可以忽略、屏蔽或截断包含敏感数据的列、大于指定大小的列或不需要的列。
{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.mysql. mysqlconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“3306”,(4)“开云体育电动老虎机数据库。user": "debezium-user",(5)“开云体育电动老虎机数据库。密码”:“debeziu开云体育官方注册网址m-user-pw”,(6)“开云体育电动老虎机database.server。id”:“184054”,(7)”的话题。前缀”:“fullfillment”,(8)“开云体育电动老虎机database.include。列表”:“库存”,(9)“schema.history.internal.kafka.bootstrap.servers”:“卡夫卡:9092”,(10):“schema.history.internal.kafka.topic schemahistory.fullfillment”,(11)“include.schema。变化”:“真正的”(12)}}
1 | 在Kafka Connect服务中注册连接器的名称。 |
2 | 连接器的类名。 |
3. | MySQL服务器地址。 |
4 | MySQL服务器端口号。 |
5 | 具有适当权限的MySQL用户。 |
6 | MySQL用户密码。 |
7 | 连接器的唯一ID。 |
8 | MySQL服务器或集群的主题前缀。 |
9 | 指定服务器托管的开云体育电动老虎机数据库列表。 |
10 | 连接器用来将DDL语句写入并恢复到数据库模式历史主题的Kafka代理列表。开云体育电动老虎机 |
11 | 数据库模式历史记录主题的开云体育电动老虎机名称。此主题仅供内部使用,消费者不应使用。 |
12 | 标记,该标记指定连接器是否应为DDL更改生成事件并将事件发送到实现 架构更改主题供使用者使用。 |
有关可以为Debezium MySQL连接器设置的配置属性的完整列表,请参见开云体育官方注册网址MySQL连接器配置属性.
您可以使用帖子
命令到正在运行的Kafka Connect服务。该服务记录配置并启动一个执行以下操作的连接器任务:
连接MySQL数据库。开云体育电动老虎机
在捕获模式下读取更改数据表。
流将事件记录更改为Kafka主题。
添加连接器配置
要开始运行MySQL连接器,请配置一个连接器配置,并将该配置添加到Kafka Connect集群中。
已安装D开云体育官方注册网址ebezium MySQL连接器。
为MySQL连接器创建一个配置。
使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。
连接器启动后,它将执行一致性快照配置连接器的MySQL数据开云体育电动老虎机库。然后连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到Kafka主题。
连接器属性
Debe开云体育官方注册网址zium MySQL连接器有许多配置属性,您可以使用这些属性为您的应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:
开云体育电动老虎机数据库模式历史连接器配置属性控制Debezium如何处理从数据开云体育官方注册网址库模式历史主题中读取的事件。开云体育电动老虎机
传递数据库驱动程序属性开云体育电动老虎机控制数据库驱动程序的行为。开云体育电动老虎机
以下配置属性为要求除非有默认值可用。
所需的Debezi开云体育官方注册网址um MySQL连接器配置属性
财产 | 默认的 | 描述 | ||
---|---|---|---|---|
没有默认的 |
连接器的唯一名称。试图用相同的名称再次注册失败。所有Kafka Connect连接器都需要这个属性。 |
|||
没有默认的 |
连接器的Java类的名称。总是指定 |
|||
|
应该为此连接器创建的最大任务数。MySQL连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。 |
|||
没有默认的 |
MySQL数据库服务器的IP地址或主机名。开云体育电动老虎机 |
|||
|
MySQL数据库服务器的整型端口号。开云体育电动老虎机 |
|||
没有默认的 |
连接MySQL数据库服务器时使用的MySQL用户名。开云体育电动老虎机 |
|||
没有默认的 |
连接MySQL数据库服务器时使用的密码。开云体育电动老虎机 |
|||
没有默认的 |
主题前缀,为Debezium在其中捕获更改的特定MySQL数据库服务器/集群提供名称空间。开云体育官方注册网址开云体育电动老虎机主题前缀在所有其他连接器中应该是唯一的,因为它被用作接收该连接器发出的事件的所有Kafka主题名称的前缀。数据库服务器逻辑名中只能使用字母数字字符、连字符、点和下划线。开云体育电动老虎机
|
|||
没有默认的 |
这个数据库客户端的数字ID,在MySQL集开云体育电动老虎机群中所有当前运行的数据库进程中必须是唯一的。这个连接器作为另一个服务器(使用这个唯一的ID)加入MySQL开云体育电动老虎机数据库集群,因此它可以读取binlog。 |
|||
空字符串 |
一个可选的、以逗号分隔的正则表达式列表,它与要捕获更改的数据库的名称相匹配。开云体育电动老虎机连接器不会捕获名称不在中的任何数据库中的更改开云体育电动老虎机 为了匹配数据库的名称,Debezium应用您指开云体育电动老虎机定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与数据库的整个名称字符串匹配;开云体育电动老虎机它不匹配可能出现在数据库名称中的子字符串。开云体育电动老虎机 |
|||
空字符串 |
一个可选的、以逗号分隔的正则表达式列表,它与您不想捕获更改的数据库的名称相匹配。开云体育电动老虎机连接器捕获名称不在数据库中的任何数据库中的更改开云体育电动老虎机 为了匹配数据库的名称,Debezium应用您指开云体育电动老虎机定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与数据库的整个名称字符串匹配;开云体育电动老虎机它不匹配可能出现在数据库名称中的子字符串。开云体育电动老虎机 |
|||
空字符串 |
一个可选的、以逗号分隔的正则表达式列表,它匹配您想要捕获其更改的表的完全限定表标识符。连接器不会捕获未包含在中的任何表中的更改 为了匹配表的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名中可能存在的子字符串。 |
|||
空字符串 |
一个可选的、以逗号分隔的正则表达式列表,它匹配不希望捕获其更改的表的完全限定表标识符。连接器捕获未包含的任何表中的更改 为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名中可能存在的子字符串。 |
|||
空字符串 |
一个可选的、以逗号分隔的正则表达式列表,它匹配要从更改事件记录值中排除的列的完全限定名称。列的完全限定名的格式为开云体育电动老虎机数据库名.的表.columnName. 为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;它不匹配可能出现在列名中的子字符串。如果在配置中包含此属性,则不要同时设置 |
|||
空字符串 |
一个可选的、以逗号分隔的正则表达式列表,它匹配要包含在变更事件记录值中的列的完全限定名称。列的完全限定名的格式为开云体育电动老虎机数据库名.的表.columnName. 为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;它不匹配可能出现在列名中的子字符串。 |
|||
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。类指定的字符数时,如果要截断一组列中的数据,请设置此属性长度属性名。集 列的完全限定名遵循以下格式:开云体育电动老虎机数据库名.的表.columnName.为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配可能出现在列名中的子字符串。 您可以在单个配置中指定具有不同长度的多个属性。 |
|||
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。如果希望连接器屏蔽一组列的值,请设置此属性,例如,如果这些列包含敏感数据。集 列的完全限定名遵循以下格式:开云体育电动老虎机数据库名.的表.columnName.为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配可能出现在列名中的子字符串。 您可以在单个配置中指定具有不同长度的多个属性。 |
|||
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。列的完全限定名的格式为 假名由应用指定的参数产生的散列值组成hashAlgorithm而且盐.根据所使用的散列函数,将维护引用完整性,而列值将被假名替换。中描述了支持的哈希函数MessageDigest节Java密码体系结构标准算法名称文档。 column.mask.hash.sha with.salt——256.。CzQMA0cB5K =库存。订单。customerName, inventory.shipment.customerName 必要时,笔名会自动缩短为列的长度。连接器配置可以包括多个属性,这些属性指定不同的哈希算法和盐。 |
|||
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与您希望连接器为其发出表示列元数据的额外参数的列的完全限定名称匹配。设置此属性后,连接器将向事件记录模式添加以下字段:
这些参数分别传播列的原始类型名称和长度(对于变宽类型)。 列的完全限定名遵循以下格式之一:开云体育电动老虎机数据库名.的表.columnName,或开云体育电动老虎机数据库名.schemaName.的表.columnName. |
|||
N/A |
一个可选的、以逗号分隔的正则表达式列表,用于指定为数据库中的列定义的数据类型的完全限定名称。开云体育电动老虎机当设置此属性时,对于具有匹配数据类型的列,连接器将发出事件记录,其中在其模式中包含以下额外字段:
这些参数分别传播列的原始类型名称和长度(对于变宽类型)。 列的完全限定名遵循以下格式之一:开云体育电动老虎机数据库名.的表.typeName,或开云体育电动老虎机数据库名.schemaName.的表.typeName. 有关mysql特定的数据类型名称的列表,请参见MySQL数据类型映射. |
|||
|
时间、日期和时间戳可以用不同的精度表示,包括: |
|||
|
指定连接器应如何处理的值 |
|||
|
指定如何在更改事件中表示BIGINT UNSIGNED列。可能的设置有: |
|||
|
布尔值,指定连接器是否应该将数据库模式中的更改发布到与数据库服务器ID同名的Kafka主题。开云体育电动老虎机通过使用一个包含数据库名称且值包含DDL语句的键来记录每个模式更改。开云体育电动老虎机这与连接器内部记录数据库模式历史的方式无关。开云体育电动老虎机 |
|||
|
布尔值,指定连接器是否应该解析和发布元数据对象上的表和列注释。启用此选项将对内存使用带来影响。逻辑模式对象的数量和大小在很大程度上影响Debezium连接器所消耗的内存,并且向每个连接器添加可能很大的字符串数据可能会非常昂贵。开云体育官方注册网址 |
|||
|
布尔值,该值指定连接器是否应包括生成更改事件的原始SQL查询。 |
|||
|
指定连接器在反序列化binlog事件期间应对异常的方式。 |
|||
|
指定连接器应如何对与不在内部模式表示中出现的表相关的binlog事件作出反应。也就是说,内部表示与数据库不一致。开云体育电动老虎机 |
|||
|
正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。 |
|||
|
正整数值,指定阻塞队列可以容纳的最大记录数。当Debe开云体育官方注册网址zium从数据库读取事件流时,它会在将事件写入Kafka之前开云体育电动老虎机将其放在阻塞队列中。如果连接器接收消息的速度比写入Kafka的速度快,或者Kafka变得不可用,阻塞队列可以为从数据库读取更改事件提供反压力。开云体育电动老虎机当连接器定期记录偏移量时,队列中保存的事件将被忽略。总是设置的值 |
|||
|
一个长整数值,以字节为单位指定阻塞队列的最大容量。默认情况下,不为阻塞队列指定卷限制。若要指定队列可以使用的字节数,请将此属性设置为正的长值。 |
|||
|
正整数值,指定连接器在开始处理一批事件之前等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|||
|
一个正整数值,指定连接器在尝试连接到MySQL数据库服务器后超时前应该等待的最大时间(以毫秒为单位)。开云体育电动老虎机默认为30秒。 |
|||
没有默认的 |
一个用逗号分隔的正则表达式列表,它与GTID集中的源uuid匹配,连接器使用它来查找MySQL服务器上的binlog位置。设置此属性时,连接器仅使用GTID范围,该范围的源uuid与指定的uuid之一匹配 为了匹配GTID的值,Debezium应用您指定为的正则表达开云体育官方注册网址式锚定正则表达式。也就是说,指定的表达式与整个UUID字符串匹配;它不匹配UUID中可能存在的子字符串。 |
|||
没有默认的 |
一个用逗号分隔的正则表达式列表,它与GTID集中的源uuid匹配,连接器使用它来查找MySQL服务器上的binlog位置。设置此属性时,连接器仅使用源uuid不匹配任何指定uuid的GTID范围 为了匹配GTID的值,Debezium应用您指定为的正则表达开云体育官方注册网址式锚定正则表达式。也就是说,指定的表达式与整个UUID字符串匹配;它不匹配UUID中可能存在的子字符串。 |
|||
|
控制是否删除事件之后是一个墓碑事件。 |
|||
N/A |
一个表达式列表,用于指定连接器用来为发布到指定表的Kafka主题的更改事件记录形成自定义消息键的列。 默认情况下,Debezi开云体育官方注册网址um使用表的主键列作为它发出的记录的消息键。为了代替默认值,或者为缺少主键的表指定一个键,您可以基于一个或多个列配置自定义消息键。 每个全限定表名都是一个正则表达式,格式如下: 用于创建自定义消息键的列的数量没有限制。但是,最好使用指定唯一键所需的最小数目。 |
|||
字节 |
指定二进制列,例如, |
|||
没有一个 |
指定应如何调整模式名称以与连接器使用的消息转换器兼容。可能的设置:
|
高级MySQL连接器配置属性
下表描述了高级MySQL连接器属性.这些属性的默认值很少需要更改。因此,您不需要在连接器配置中指定它们。
财产 | 默认的 | 描述 |
---|---|---|
|
一个布尔值,指定是否应该使用一个单独的线程来确保与MySQL服务器/集群的连接保持活跃。 |
|
没有默认的 |
属性的符号名称的逗号分隔列表自定义转换器连接器可以使用的实例。 对于为连接器配置的每个转换器,还必须添加
例如, 布尔。类型:io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter 如果希望进一步控制已配置转换器的行为,可以添加一个或多个配置参数来将值传递给转换器。若要将这些附加配置参数与转换器关联起来,请将参数名称与转换器的符号名称作为前缀。 boolean.selector = db1.table1。*, db1.table2.column1 |
|
|
一个布尔值,指定是否应忽略内置系统表。无论表包含列表和排除列表如何,这都适用。默认情况下,系统表的更改不会被捕获,并且当对任何系统表进行更改时,不会生成任何事件。 |
|
|
指定是否使用加密连接。可能的设置有: |
|
0 |
binlog读取器使用的预读缓冲区的大小。的默认设置 |
|
|
指定在连接器启动时运行快照的条件。可能的设置有: |
|
|
控制连接器是否持有全局MySQL读锁以及持有多长时间,在连接器执行快照时,可以防止对数据库的任何更新。开云体育电动老虎机可能的设置有: |
|
中指定的所有表 |
匹配完全限定名称的可选、逗号分隔的正则表达式列表( 为了匹配表的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名中可能存在的子字符串。 |
|
没有默认的 |
指定要包含在快照中的表行。如果希望快照仅包含表中行的子集,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。 属性包含表单中以逗号分隔的全限定表名列表 从一个 “snapshot.select.statement。覆盖”:“客户。订单", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC" 在生成的快照中,连接器仅包括其记录 |
|
|
在快照期间,连接器查询每个被配置为捕获更改的表。连接器使用每个查询结果生成一个读取事件,其中包含该表中所有行的数据。这个属性决定MySQL连接器是将一个表的结果放入内存中(这是快速的,但需要大量内存),还是将结果流放入内存中(这可能较慢,但适用于非常大的表)。此属性的设置指定在连接器传输结果之前表必须包含的最小行数。 |
|
|
控制连接器向Kafka主题发送心跳消息的频率。默认行为是连接器不发送心跳消息。 |
|
没有默认的 |
指定当连接器发送心跳消息时连接器在源数据库上执行的查询。开云体育电动老虎机 |
|
没有默认的 |
当建立到数据库的JDBC连接(而不是读取事务日志的连接)时,将执行一个以分号分隔的SQL语句列表。开云体育电动老虎机要将分号指定为SQL语句中的字符而不是分隔符,请使用两个分号( |
|
没有默认的 |
在连接器启动时执行快照之前,连接器应等待的毫秒间隔。如果在集群中启动多个连接器,此属性有助于避免快照中断,因为快照中断可能导致连接器的重新平衡。 |
|
没有默认的 |
在快照期间,连接器以行为单位读取表内容。此属性指定批处理中的最大行数。 |
|
|
正整数,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果连接器在此时间间隔内无法获取表锁,则快照失败。看到MySQL连接器如何执行数据库快照开云体育电动老虎机. |
|
|
布尔值,指示连接器是否将2位年份规范转换为4位。设置为 |
|
|
的架构版本 |
|
|
指示是否对字段名进行清除以遵守Avro命名要求. |
|
|
以逗号分隔的操作类型列表,将在流处理期间跳过。操作包括: |
|
无默认值 |
用于发送的数据集合的完全限定名称信号到连接器。 |
|
|
允许在增量快照期间更改模式。启用后,连接器将在增量快照期间检测模式更改,并重新选择当前块以避免锁定ddl。 |
|
|
在增量快照块期间连接器获取并读入内存的最大行数。增加块大小可以提供更高的效率,因为快照运行更少的大大小快照查询。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。 |
|
|
切换到可选的增量快照水印实现,以避免写入信号数据收集 |
|
|
确定连接器是否生成具有事务边界的事件,并使用事务元数据丰富更改事件信封。指定 |
|
|
TopicNamingStrategy类的名称默认为,该类应用于确定数据更改、模式更改、事务、心跳事件等的主题名称 |
|
|
指定主题名称的分隔符,默认为 |
|
|
用于在有界并发散列映射中保存主题名称的大小。此缓存将帮助确定与给定数据集合对应的主题名称。 |
|
|
控制连接器向其发送心跳消息的主题的名称。主题名称有这样的模式: |
|
|
控制连接器向其发送事务元数据消息的主题的名称。主题名称有这样的模式: |
开云体育官方注册网址Debezium连开云体育电动老虎机接器数据库模式历史配置属性
开云体育官方注册网址Debezium提供了一组schema.history.internal。*
属性,这些属性控制连接器如何与模式历史主题交互。
下表描述了schema.history.internal
属性用于配置Debezium连接器。开云体育官方注册网址
财产 | 默认的 | 描述 |
---|---|---|
没有默认的 |
连接器存储数据库模式历史的Kafka主题的全称。开云体育电动老虎机 |
|
没有默认的 |
连接器用来建立到Kafka集群的初始连接的主机/端口对列表。此连接用于检索以前由连接器存储的数据库模式历史,并用于写入从源数据库读取的每个DDL语开云体育电动老虎机句。每一对都应该指向Kafka Connect进程使用的相同的Kafka集群。 |
|
|
一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认值是100ms。 |
|
|
一个整数值,指定连接器在使用Kafka管理客户端获取集群信息时应该等待的最大毫秒数。 |
|
|
一个整数值,指定连接器在使用kafka管理客户端创建kafka历史主题时应该等待的最大毫秒数。 |
|
|
在连接器恢复失败并发生错误之前,连接器应尝试读取持久历史数据的最大次数。接收不到数据后等待的最大时间为 |
|
|
一个布尔值,指定连接器是否应该忽略格式错误或未知的数据库语句,还是停止处理以便人工修复问题。开云体育电动老虎机安全默认值为 |
|
|
一个布尔值,指定连接器是否应该记录所有DDL语句
安全默认值为 |
开云体育官方注册网址Debezium依赖Kafka生成器将模式更改写入数据库模式历史主题。开云体育电动老虎机类似地,当连接器启动时,它依赖于Kafka消费者从数据库模式历史主题中读取。开云体育电动老虎机为Kafka生产者和消费者客户端定义配置,方法是将值赋给一组以schema.history.internal.producer。*
而且schema.history.internal.consumer。*
前缀。直通生产者和消费者数据库模式历史属性控制一系列行为,比如这些客户端如何保护与K开云体育电动老虎机afka代理的连接,如下例所示:
SSL schema.history.internal.producer.ssl.keystore.location schema.history.internal.producer.security.protocol = = / var /私人/ SSL / kafka.server.keystore.jks schema.history.internal.producer.ssl.keystore.password = test1234 schema.history.internal.producer.ssl.truststore.location = / var /私人/ SSL / kafka.server.truststore.jks schema.history.internal.producer.ssl.truststore.password = test1234 schema.history.internal.producer.ssl.key.password = test1234 schema.history.internal.consumer.security.protocol = SSLSchema.history.internal.consumer.ssl.keystore.location =/var/private/ssl/kafka.server.keystore.jks Schema.history.internal.consumer.ssl.keystore.location =test1234 schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks schema.history.internal.consumer.ssl.truststore.password=test1234 schema.history.internal.consumer.ssl.keystore.password=test1234
开云体育官方注册网址Debezium在将属性传递给Kafka客户端之前,会从属性名中去掉前缀。
更多细节请参阅Kafka文档Kafka生产者配置属性而且Kafka消费者配置属性.
开云体育官方注册网址Debezium连接器Kafka信号配置属性
当MySQL连接器被配置为只读时,信令表的替代方案是signals Kafka主题。
开云体育官方注册网址Debezium提供了一组信号。*
控制连接器如何与Kafka信号主题交互的属性。
下表描述了信号
属性。
财产 | 默认的 | 描述 |
---|---|---|
没有默认的 |
连接器监视临时信号的Kafka主题的名称。 |
|
没有默认的 |
连接器用来建立到Kafka集群的初始连接的主机/端口对列表。每一对都应该指向Kafka Connect进程使用的相同的Kafka集群。 |
|
|
整数值,指定连接器轮询信号时应等待的最大毫秒数。默认值是100ms。 |
开云体育官方注册网址Debezium连接器直通信号Kafka消费者客户端配置属性
Debe开云体育官方注册网址zium连接器提供了Kafka消费者信号的直通配置。直通信号属性以前缀开头signals.consumer。*
.例如,连接器传递的属性为signal.consumer.security.protocol = SSL
卡夫卡式的消费者。
的情况也是如此数据库模式历史客户端的传递属性开云体育电动老虎机, 开云体育官方注册网址Debezium从属性中剥离前缀,然后将它们传递给Kafka信号消费者。
开云体育官方注册网址Debezium连接器直通数据库驱动程序配置属性开云体育电动老虎机
Debe开云体育官方注册网址zium连接器提供数据库驱动程序的直通配置。开云体育电动老虎机传递数据库属性以前缀开始开云体育电动老虎机司机。*
.例如,连接器传递的属性为driver.foobar = false
到JDBC URL。
的情况也是如此数据库模式历史客户端的传递属性开云体育电动老虎机, 开云体育官方注册网址Debezium在将前缀传递给数据库驱动程序之前将它们从属性中剥离。开云体育电动老虎机
监控
除了Zo开云体育官方注册网址okeeper、Kafka和Kafka Connect提供的内置JMX指标支持外,Debezium MySQL连接器还提供了三种类型的指标。
开云体育官方注册网址Debezium监控文档提供了如何使用JMX公开这些指标的详细信息。
快照指标
的MBean是开云体育官方注册网址debezium.mysql: type = connector-metrics上下文=快照,server =< mysql.server.name >
.
快照度量不会公开,除非快照操作是活动的,或者自上次连接器启动以来发生了快照。
下表列出了可用的快照指标。
属性 | 类型 | 描述 |
---|---|---|
|
连接器读取的最后一个快照事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已由连接器上配置的包含/排除列表筛选规则筛选的事件数。 |
|
|
连接器捕获的表列表。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
快照中包含的表的总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否启动。 |
|
|
快照是否暂停。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。还包括快照暂停的时间。 |
|
|
快照暂停的总秒数。如果快照被暂停了多次,那么暂停的时间就会累积起来。 |
|
|
映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。 |
|
|
队列的最大缓冲区,以字节为单位。该指标是可用的,如果 |
|
|
队列中记录的当前卷,以字节为单位。 |
在执行增量快照时,连接器还提供了以下额外的快照度量:
属性 | 类型 | 描述 |
---|---|---|
|
当前快照块的标识符。 |
|
|
定义当前块的主键集的下界。 |
|
|
定义当前块的主键集的上界。 |
|
|
当前快照表的主键集的下界。 |
|
|
当前快照表的主键集的上限。 |
Debe开云体育官方注册网址zium MySQL连接器还提供HoldingGlobalLock
自定义快照度量。此指标被设置为一个布尔值,该值指示连接器当前持有的是全局锁还是表写锁。
流指标
只有启用binlog事件缓冲时,与事务相关的属性才可用。看到binlog.buffer.size
在高级连接器配置属性中获取更多详细信息。
的MBean是开云体育官方注册网址debezium.mysql: type = connector-metrics、上下文=流媒体服务器=< mysql.server.name >
.
下表列出了可用的流媒体指标。
属性 | 类型 | 描述 |
---|---|---|
|
连接器读取的最后一个流事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或度量重置以来,此连接器所看到的事件总数。 |
|
|
自上次启动或指标重置以来,此连接器所看到的创建事件的总数。 |
|
|
自上次启动或指标重置以来,此连接器所看到的更新事件总数。 |
|
|
自上次启动或指标重置以来,此连接器所看到的删除事件的总数。 |
|
|
已由连接器上配置的包含/排除列表筛选规则筛选的事件数。 |
|
|
连接器捕获的表列表。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机 |
|
|
从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机 |
|
|
提交的已处理事务的数量。 |
|
|
上次接收事件的坐标。 |
|
|
最后处理的事务的事务标识符。 |
|
|
队列的最大缓冲区,以字节为单位。该指标是可用的,如果 |
|
|
队列中记录的当前卷,以字节为单位。 |
Debe开云体育官方注册网址zium MySQL连接器还提供了以下额外的流指标:
属性 | 类型 | 描述 |
---|---|---|
|
连接器最近读取的binlog文件的名称。 |
|
|
连接器已读取的binlog中的最近位置(以字节为单位)。 |
|
|
标志,表示连接器当前是否跟踪来自MySQL服务器的gtid。 |
|
|
连接器在读取binlog时处理的最新GTID集的字符串表示形式。 |
|
|
MySQL连接器跳过的事件数。通常情况下,由于MySQL binlog中的错误或无法解析的事件,事件会被跳过。 |
|
|
MySQL连接器断开连接的次数。 |
|
|
回滚且未流化的已处理事务的数量。 |
|
|
未符合预期协议的事务数 |
|
|
未放入前瞻性缓冲区的事务数。为了获得最佳性能,这个值应该显著小于 |
架构历史度量
的MBean是开云体育官方注册网址debezium.mysql: type = connector-metrics、上下文= schema-history服务器=< mysql.server.name >
.
下表列出了可用的模式历史指标。
属性 | 类型 | 描述 |
---|---|---|
|
之一 |
|
|
恢复开始的时间(以epoch秒为单位)。 |
|
|
在恢复阶段读取的更改数。 |
|
|
在恢复和运行时应用的模式更改的总数。 |
|
|
自上次更改从历史存储区恢复以来所经过的毫秒数。 |
|
|
自应用最后一次更改以来所经过的毫秒数。 |
|
|
从历史存储中恢复的最后一次更改的字符串表示形式。 |
|
|
最后应用的更改的字符串表示形式。 |
出现问题时的行为
开云体育官方注册网址Debezium是一个分布式系统,可以捕获多个上游数据库中的所有更改;开云体育电动老虎机它从不错过或丢失任何事件。当系统正常运行或被仔细管理时,Debezium提供开云体育官方注册网址只有一天交付每个变更事件记录。
如果确实发生了故障,则系统不会丢失任何事件。然而,当它从错误中恢复时,它可能会重复一些更改事件。在这些不正常的情况下,Debezium就像Kafka一样提供开云体育官方注册网址了帮助至少一次变更事件的交付。
本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址
配置和启动错误
在以下情况下,连接器在尝试启动时失败,在日志中报告错误或异常,并停止运行:
连接器的配置无效。
连接器无法通过指定的连接参数成功连接到MySQL服务器。
连接器试图在binlog中MySQL不再有可用历史的位置重新启动。
在这些情况下,错误消息有关于问题的详细信息,可能还有建议的解决方案。在纠正配置或解决MySQL问题后,重新启动连接器。
MySQL不可用
如果您的MySQL服务器变得不可用,Debezium MySQL连接器将失败并报错,连接器将开云体育官方注册网址停止。当服务器再次可用时,重新启动连接器。
但是,如果为高可用性MySQL集群启用了gtid,则可以立即重新启动连接器。它将连接到集群中的另一个MySQL服务器,在服务器的binlog中找到代表最后一个事务的位置,并开始从该特定位置读取新服务器的binlog。
如果未启用gtid,则连接器只记录其所连接的MySQL服务器的binlog位置。要从正确的binlog位置重新启动,必须重新连接到特定的服务器。
Kafka Connect优雅地停止
当Kafka Connect优雅地停止时,当Debezium MySQL连接器任务停止并在新的Kafka Connect进程上重新启动时,会开云体育官方注册网址有一个短暂的延迟。
Kafka连接进程崩溃
如果Kafka Connect崩溃,进程将停止,任何Debezium MySQL连接器任务将终止,而不会开云体育官方注册网址记录它们最近处理的偏移量。在分布式模式下,Kafka Connect重启其他进程上的连接器任务。但是,MySQL连接器从前面进程记录的最后一个偏移量开始恢复。这意味着替换任务可能会生成一些在崩溃之前处理过的相同事件,从而创建重复的事件。
每个更改事件消息都包含特定于源的信息,您可以使用这些信息来识别重复的事件,例如:
事件的起源
MySQL服务器的事件时间
binlog文件名称和位置
gtid(如果使用)
Kafka不可用
Kafka Connect框架通过使用Kafka生产者API来记录Kaf开云体育官方注册网址ka中的Debezium变更事件。如果Kafka代理变得不可用,Debezium MySQL连接器将暂停,直到重新建立连接,开云体育官方注册网址并从断开的地方恢复。
MySQL清除binlog文件
如果Debez开云体育官方注册网址ium MySQL连接器停止太长时间,MySQL服务器会清除旧的binlog文件,连接器的最后一个位置可能会丢失。当连接器重新启动时,MySQL服务器不再拥有起点,连接器执行另一个初始快照。如果快照被禁用,连接器将失败并报错。
看到快照有关MySQL连接器如何执行初始快照的详细信息。