您正在查看过时的Debezium版本的文档。开云体育官方注册网址
如果您想查看本页最新的稳定版本,请前往在这里

开云体育官方注册网址Debezium MySQL连接器

从一开始就为Debezium MySQL连接器创建了一个新的捕获实现开云体育官方注册网址1.5.0.Alpha版本(dbz - 1865),基于Debezium中所有其他Kafka Connect连接器所使用的通用连接器框架。开云体育官方注册网址连接器的行为几乎与以前的实现相同,除了实验并行快照功能(dbz - 175),新实现中还没有提供,计划稍后以另一种形式重新引入。

如果您在新的MySQL连接器实现中遇到任何问题,请登录Jira问题;在这种情况下,您可以通过设置internal.implementation =遗留连接器配置选项。

MySQL有一个二进制日志(binlog),它按照提交到数据库的顺序记录所有操作。开云体育电动老虎机这包括对表模式的更改以及对表中数据的更改。MySQL使用binlog进行复制和恢复。

Debe开云体育官方注册网址zium MySQL连接器读取binlog,生成行级的更改事件插入更新,删除操作,并向Kafka主题发出更改事件。客户端应用程序读取这些Kafka主题。

由于MySQL通常设置为在指定的一段时间后清除二进制日志,MySQL连接器执行初始化一致的快照您的每个数据库。开云体育电动老虎机MySQL连接器从创建快照的位置读取binlog。

连接器如何工作

对连接器支持的MySQL拓扑的概述对于规划应用程序非常有用。为了优化配置和运行Debezium MySQL连接器,了解连接器如开云体育官方注册网址何跟踪表的结构、暴露模式更改、执行快照和确定Kafka主题名称是很有帮助的。

支持的MySQL拓扑

Debe开云体育官方注册网址zium MySQL连接器支持以下MySQL拓扑:

独立的

当使用单个MySQL服务器时,服务器必须启用binlog (并可选地启用gtid),这样Debez开云体育官方注册网址ium MySQL连接器就可以监视服务器。这通常是可以接受的,因为二进制日志也可以用作增量日志备份.在这种情况下,MySQL连接器总是连接并跟随这个独立的MySQL服务器实例。

主和副本

Debe开云体育官方注册网址zium MySQL连接器可以跟随一个主服务器或一个副本(如果该副本启用了binlog),但是连接器只在该服务器可见的集群中看到更改。通常,除了多主拓扑,这不是问题。

连接器记录其在服务器binlog中的位置,该位置在集群中的每个服务器上都是不同的。因此,连接器必须只跟随一个MySQL服务器实例。如果该服务器发生故障,则必须重新启动或恢复该服务器,然后连接器才能继续。

高可用集群

各种各样的高可用性MySQL的解决方案是存在的,它们使它更容易容忍和几乎立即从问题和失败中恢复。大多数HA MySQL集群使用gtid,以便副本能够跟踪任何主服务器上的所有更改。

Multi-primary

NDB (Net开云体育电动老虎机work Database)集群复制使用一个或多个MySQL复制节点,每个复制节点从多个主服务器复制。这是聚合多个MySQL集群复制的强大方法。这种拓扑要求使用gtid。

De开云体育官方注册网址bezium MySQL连接器可以使用这些多主MySQL副本作为源,并且只要新副本赶上旧副本,就可以故障转移到不同的多主MySQL副本。也就是说,新的副本拥有在第一个副本上看到的所有事务。即使连接器只使用数据库和/或表的一个子集,这也可以工作,因为当试图重新连接到一个新的多主MySQL副本并在binlog开云体育电动老虎机中找到正确的位置时,连接器可以配置为包括或排除特定的GTID源。

主持

Debezium MySQL连接器支持使用托管选项开云体育官方注册网址,如Amazon RDS和Amazon Aurora。

因为这些托管选项不允许全局读锁,所以使用表级锁创建全局读锁一致的快照

架构历史记录主题

当数据库客户端开云体育电动老虎机查询数据库时,客户端使用数据库的当前模式。但是,数据库模式可以在任何开云体育电动老虎机时候更改,这意味着连接器必须能够识别记录每个插入、更新或删除操作时的模式。此外,连接器不能只使用当前模式,因为连接器可能正在处理相对较旧的事件,并且可能在表的模式更改之前就已经记录了这些事件。

为了处理这个问题,MySQL在binlog中不仅包括对数据的行级更改,还包括应用到数据库的DDL语句。开云体育电动老虎机当连接器读取binlog并遇到这些DDL语句时,它会解析它们并更新每个表的模式的内存表示。连接器使用这种模式表示在每次插入、更新或删除操作时识别表的结构,并产生适当的更改事件。在一个单独的数据库历史Kaf开云体育电动老虎机ka主题中,连接器记录了所有DDL语句以及每个DDL语句在binlog中的出现位置。

当连接器在崩溃或正常停止后重新启动时,连接器开始从特定位置(即从特定时间点)读取binlog。连接器通过读取数据库历史Kafka主题并解析所有DDL语句,直到连接器开始的binlog点,从而重新构建此时存在的表结构。开云体育电动老虎机

此数据库历开云体育电动老虎机史记录主题仅供连接器使用。连接器可以选择See将模式更改事件发送到用于使用者应用程序的不同主题

当MySQL连接器捕获表中的更改时,模式更改工具(如gh-ostpt-online-schema-change则在迁移过程中创建了辅助表。需要配置连接器以捕获对这些帮助表的更改。如果使用者不需要为辅助表生成的记录,则可以应用单个消息转换将其过滤掉。

看到主题的默认名称接收Debezium事件记开云体育官方注册网址录。

架构更改主题

您可以配置Debezium MySQL连开云体育官方注册网址接器来生成模式更改事件,其中包括应用于MySQL服务器中的数据库的所有DDL语句。开云体育电动老虎机连接器向Kafka主题发出这些事件serverName在哪里serverName类型所指定的连接器名称是否相同开云体育电动老虎机database.server.name连接器配置属性。

如果你选择使用架构更改事件,确保您使用来自架构更改主题的记录。数据库历开云体育电动老虎机史记录主题仅供连接器使用。

发送到模式更改主题的事件的全局顺序至关重要。因此,不能对数据库历史主题进行分区。开云体育电动老虎机这意味着您必须指定的分区计数1在创建数据库历史记录主题时。开云体育电动老虎机当依赖于自动主题创建时,确保Kafka的num.partitions配置选项(用于指定默认分区数)设置为1

连接器向模式更改主题发出的每条记录都包含一个消息键,该消息键包含应用DDL语句时连接的数据库的名称,例如:开云体育电动老虎机

{"schema": {"type": "struct", "name": "io. d开云体育官方注册网址ebezum .connector.mysql. schemachangekey ", "optional": false, "fields": [{"开云体育电动老虎机field": "databaseName", "type": "string", "optional": false}]}, "payload": {"databaseName": "inventory"}}

模式更改事件记录值包含一个结构,该结构包括DDL语句、语句应用到的数据库的名称以及语句在binlog中的出现位置,例如:开云体育电动老虎机

{"模式":{“类型”:“结构”、“名称”:“io.debezium.connector开云体育官方注册网址.mysql.SchemaChangeValue”、“可选”:假的,“字段”:[{“字段”:“数据库名”、“类型”:“弦”、“可选”:假},{”字段”:“d开云体育电动老虎机dl”、“类型”:“弦”、“可选”:假},{”字段”:“源”、“类型”:“结构”、“名称”:“io.debezium.connector.mysql.Source”、“可选”:假的,“字段”:[{“类型”:“弦”、“可选”:真的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“server_id”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“弦”、“可选”:真的,“场”:“gtid”},{“类型”:“弦”、“可选”:假的,“场”:“文件”},{“类型”:“int64”、“可选”:假的,“场”:“pos”},{“类型”:“int32”、“可选”:假的,“场”:“行”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“int64”、“可选”:真的,“场”:“线程”},{“类型”:"string", "optional": true, "field": "db"}, {"type": "string", "optional": true, "field": "table"}, {"type": "string", "optional": true, "field": "query"}]}]}, "payload": {"databaseNa开云体育电动老虎机me": "inventory", "ddl": "CREATE table products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT);ALTER TABLE products AUTO_INCREMENT = 101;", "source": {"version": "1.5.4. "最后”、“名称”:“mysql-server-1”、“server_id”:0,”ts_ms”:0,”gtid”:空,“文件”:“mysql-bin。000003.", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": null, "table": null, "query": null } } }

ddlfield可能包含多个DDL语句。中的数据库应用了每条语句开云体育电动老虎机开云体育电动老虎机数据库名字段。语句按照应用到数据库的顺序出现。开云体育电动老虎机的字段的结构完全是写入特定于表的主题的标准数据更改事件。此字段用于关联不同主题的事件。

...."payload": {"d开云体育电动老虎机atabaseName": "inventory", "ddl": "CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,…)","source":{…}} ....

客户端可以提交多个DDL语句,以应用于多个数据库。开云体育电动老虎机如果MySQL原子地应用这些DDL语句,连接器将按顺序获取DDL语句,按数据库对它们进行分组,并为每个组创建一个模式更改事件。开云体育电动老虎机如果MySQL分别应用它们,连接器将为每条语句创建一个单独的模式更改事件。

快照

当Debezi开云体育官方注册网址um MySQL连接器第一次启动时,它执行初始化一致的快照你的数据库。开云体育电动老虎机下面的流程描述连接器如何创建此快照。此流程用于默认快照模式,即最初的.有关其他快照模式的信息,请参见MySQL连接器snapshot.mode配置属性

表1。使用全局读锁执行初始快照的工作流
一步 行动

1

获取一个阻塞的全局读锁通过其他数据库客户开云体育电动老虎机端。

快照本身并不阻止其他客户机应用DDL,这可能会干扰连接器读取binlog位置和表模式的尝试。连接器在读取binlog位置时保持全局读锁,并在后面的步骤中释放锁。

2

使用可重复读语义来确保事务中的所有后续读取都是针对一致的快照

3.

读取当前binlog的位置。

4

读取为其配置连接器以捕获更改的数据库和表的模式。开云体育电动老虎机

5

释放全局读锁。其他数据库客开云体育电动老虎机户端现在可以写入数据库。

6

如果适用,将DDL更改写入模式更改主题,包括所有必要的更改滴……创建…DDL语句。

7

扫描数据库表。开云体育电动老虎机对于每一行,连接器都会触发创建事件到相关的特定于表的Kafka主题。

8

提交事务。

9

在连接器偏移量中记录完成的快照。

连接器重启

操作时,如果连接器失效、停止或重新平衡初始快照,然后在连接器重新启动后,它执行一个新的快照。在那之后开始快照完成后,Debezium MySQL开云体育官方注册网址连接器将从binlog中的相同位置重新启动,因此它不会错过任何更新。

如果连接器停止的时间足够长,MySQL就会清除旧的binlog文件,连接器的位置就会丢失。如果位置丢失,连接器将恢复到初始快照它的起始位置。有关Debezium MySQL连接器故障排除的更多技巧,请参见开云体育官方注册网址出现问题时的行为

不允许全局读锁

有些环境不允许全局读锁。如果Debez开云体育官方注册网址ium MySQL连接器检测到不允许全局读锁,则连接器使用表级锁,并使用此方法执行快照。这需要Debezium连接器的数据库开云体育电动老虎机用户开云体育官方注册网址锁表特权。

表2。使用表级锁执行初始快照的工作流
一步 行动

1

获取表级锁。

2

使用可重复读语义来确保事务中的所有后续读取都是针对一致的快照

3.

读取并过滤数据库和表的名称。开云体育电动老虎机

4

读取当前binlog的位置。

5

读取为其配置连接器以捕获更改的数据库和表的模式。开云体育电动老虎机

6

如果适用,将DDL更改写入模式更改主题,包括所有必要的更改滴……创建…DDL语句。

7

扫描数据库表。开云体育电动老虎机对于每一行,连接器都会触发创建事件到相关的特定于表的Kafka主题。

8

提交事务。

9

释放表级锁。

10

在连接器偏移量中记录完成的快照。

快照事件的操作类型

MySql连接器使用“r”操作类型().如果您希望连接器以“c”事件的形式发出快照事件(创建这可以使用简单消息转换(Simple Message Transforms, SMT)实现。配置Debezium开云体育官方注册网址ReadToInsertEvent将SMT配置详细信息添加到连接器的配置中。

配置的一个例子是:

变换= snapshotasinsert,…transforms.snapshotasinsert.type = i开云体育官方注册网址o.debezium.connector.mysql.transforms.ReadToInsertEvent

主题名称

默认行为是Debezium MySQL连接器为所有人写入事件开云体育官方注册网址插入更新,删除一个表对一个Kafka主题的操作。Kafka主题命名约定如下:

serverName.开云体育电动老虎机databaseName.tableName

假设实现是服务器名,库存数据库名称,以开云体育电动老虎机及数据库包含的表名称订单客户,产品.Debe开云体育官方注册网址zium MySQL连接器向三个Kafka主题发出事件,分别对应数据库中的每个表:开云体育电动老虎机

履行。库存。订单履行。库存。客户履行。库存。产品

事务的元数据

开云体育官方注册网址Debezium可以生成表示事务边界的事件,并丰富数据更改事件消息。每笔交易开始结束, 开云体育官方注册网址Debezium生成一个包含以下字段的事件:

  • 状态-开始结束

  • id唯一事务标识符的字符串表示

  • event_count(结束事件)——事务发出的事件总数

  • data_collections(结束事件)-对的数组data_collectionevent_count它提供了由来自给定数据收集的更改所发出的事件的数量

例子
{"status": "BEGIN", "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "event_count": null, "data_collections": null} {"status": "END", "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "event_count": 2, "data_collections": [{"data_collection": "s1。A ", "event_count": 1}, {"data_collection": "s2. "A ", "event_count": 1}]}

事务事件被写入指定的主题开云体育电动老虎机database.server.name.transaction

更改数据事件丰富

启用事务元数据时,数据消息信封是充实了新的事务字段。这个字段以复合字段的形式提供关于每个事件的信息:

  • id唯一事务标识符的字符串表示

  • total_order-该事件在事务生成的所有事件中的绝对位置

  • data_collection_order-该事件在事务发出的所有事件中的每数据收集位置

下面是一个消息的例子:

{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…}, "op": "c", "ts_ms": "1580390884335", "transaction": {"id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10", "total_order": "1", "data_collection_order": "1"}}

对于没有启用GTID的系统,事务标识符是使用binlog文件名和binlog位置的组合构造的。例如,如果binlog文件名和对应于事务BEGIN事件的位置是mysql-bin。分别为000002和1913,则Debezium构造的事务标识符将为开云体育官方注册网址文件= mysql-bin.000002 pos = 1913

数据变更事件

Debe开云体育官方注册网址zium MySQL连接器为每个行级生成一个数据更改事件插入更新,删除操作。每个事件包含一个键和一个值。键和值的结构取决于所更改的表。

开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。

下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器并配置它来生成所有四个基本的更改事件部分,则更改事件的结构如下:

{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
表3。变更事件基本内容概述
字段名 描述

1

模式

第一个模式字段是事件键的一部分。它指定了一个Kafka Connect模式,用来描述事件键的内容有效载荷部分。换句话说,是第一个模式字段描述已更改表的主键的结构,如果表没有主键,则描述唯一键。

属性可以覆盖表的主键message.key.columns连接器配置属性.在本例中,第一个模式字段描述由该属性标识的键的结构。

2

有效载荷

第一个有效载荷字段是事件键的一部分。它具有前面所描述的结构模式字段,它包含已更改行的键。

3.

模式

第二个模式字段是事件值的一部分。它指定Kafka Connect模式,描述事件值中的内容有效载荷部分。换句话说,是第二种模式描述已更改行的结构。通常,这个模式包含嵌套的模式。

4

有效载荷

第二个有效载荷字段是事件值的一部分。它具有前面所描述的结构模式字段,它包含已更改行的实际数据。

默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始表相同。看到主题名称

MySQL连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名、数据库名和表名中的每个字符必须是拉丁字母、数字或下划线(a-z、a-z、0-9或_)。开云体育电动老虎机如果存在无效字符,则将其替换为下划线字符。

如果逻辑服务器名、数据库名或表名包含无效字符,并且区分名称之间的唯一字符无效,因此用下划线替换,则可能导致意外冲突。开云体育电动老虎机

更改事件键

更改事件的键包含已更改表的键和已更改行的实际键的模式。模式及其对应的有效负载都为更改后的表的每一列包含一个字段主键(或唯一约束)在连接器创建事件时。

考虑以下几点客户表,后面是此表的更改事件键的示例。

CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY) AUTO_INCREMENT=1001;

对象的更改的每个更改事件客户表具有相同的事件键模式。只要客户表具有前面的定义,则捕获对的更改的每个更改事件客户表的键结构如下。在JSON中,它看起来是这样的:

{"schema": {(1)"type": "struct", "name": "mysql-server-1.inventory.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){“字段”:“id”,“类型”:“int32”,“可选”:假}},“有效载荷”:{(5)"id": 1001}}
表4。更改事件键的描述
字段名 描述

1

模式

密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容有效载荷部分。

2

mysql服务器- 1. inventory.customers.key

定义键的有效负载结构的模式的名称。此模式描述已更改表的主键的结构。关键模式名具有该格式connector-name开云体育电动老虎机数据库名称表名关键.在这个例子中:

  • mysql-server-1生成此事件的连接器的名称。

  • 库存包含已更改的表开云体育电动老虎机的数据库。

  • 客户已更新的表。

3.

可选

指示事件键是否必须在其中包含值有效载荷字段。在本例中,需要键的有效负载中的值。当表没有主键时,键的有效载荷字段中的值是可选的。

4

字段

属性中期望的每个字段有效载荷,包括每个字段的名称、类型以及是否需要。

5

有效载荷

包含为其生成此更改事件的行的键。在本例中,键包含一个单键id字段,其值为1001

更改事件值

change事件中的值比键稍微复杂一些。和键一样,值也有模式Section和a有效载荷部分。的模式类的模式信封的结构有效载荷节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。

考虑用于显示更改事件键示例的同一个示例表:

CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY) AUTO_INCREMENT=1001;

更改此表的更改事件的值部分描述如下:

创建事件

对象中创建数据的操作所生成的更改事件的值部分,示例如下客户表:

{"schema": {(1)"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "mysql-server-1.inventory.customers.Value",(2)"field": "before"}, {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "mysql-server-1.inventory.customers "。值”、“场”:“后”},{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“弦”、“可选”:假的,“场”:“分贝”},{“类型”:“弦”、“可选”:真的,“场”:“表”},{“类型”:“int64”、“可选”:假的,“场”:“server_id”},{“类型”:“弦”、“可选”:真的,“场”:“gtid”},{“类型”:“弦”、“可选”:假的,“场”:“文件”},{“类型”:“int64”、“可选”:假的,“场”:“pos”},{“类型”:“int32”、“可选”:假的,“场”:“行”},{“类型”:“int64”、“可选”:真的,“场”:“线程”},{“类型”:“弦”、“可选”:真的,“场”:“查询”}],“可选”:假的,“名字”:“io.debezium.connector.mysql.Source”,开云体育官方注册网址(3)"field": "source"}, {"type": "string", "optional": false, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "mysql-server-1.inventory. clients . envelope "(4)}, "有效载荷":{(5)“人事处”:“c”,(6)“ts_ms”:1465491411815,(7)“之前”:空,(8)"后":{(9)"id": 1004, "first_name": Anne", "last_name": Kretchmar", "email": "annek@noanswer.org"}, "source": {(10)“版本”:“1.5.4。Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 0, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 0, "gtid": null, "file": "mysql-bin "。000003", "pos": 154, "row": 0, "thread": 7, "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"}}}
表5所示。的描述创建事件值字段
字段名 描述

1

模式

值的模式,它描述值的有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值模式都是相同的。

2

的名字

模式节中,每的名字Field指定值的有效负载中字段的模式。

mysql服务器- 1. inventory.customers.value有效负载的模式是之前字段。此模式特定于客户表格

的模式名称之前字段的格式为logicalName的表value,确保模式名在数据库中是唯一的。开云体育电动老虎机这意味着当使用Avro转换器,每个逻辑源中每个表的结果Avro模式都有自己的演变和历史。

3.

的名字

io.开云体育官方注册网址debezium.connector.mysql.Source有效负载的模式是字段。这个模式是特定于MySQL连接器的。连接器将其用于生成的所有事件。

4

的名字

mysql服务器- 1. inventory.customers.envelope有效负载的总体结构的模式在哪里mysql-server-1是连接器名称,库存是数据库,和开云体育电动老虎机客户是桌子。

5

有效载荷

该值为实际数据。这是变更事件提供的信息。

事件的JSON表示形式似乎比它们描述的行要大得多。这是因为JSON表示必须包括消息的模式和有效负载部分。然而,通过使用Avro转换器,你可以显著减少连接器流到Kafka主题的消息的大小。

6

人事处

返回string,描述导致连接器产生事件的操作类型。在这个例子中,c表示该操作创建了一行。有效值为:

  • c=创建

  • u=更新

  • d=删除

  • r= read(仅适用于快照)

7

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

8

之前

可选字段,指定事件发生之前行的状态。当人事处字段是c对于create,就像在本例中一样之前字段是因为这个更改事件是针对新内容的。

9

可选字段,指定事件发生后行的状态。在本例中,字段包含新行的值idfirst_namelast_name,电子邮件列。

10

描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:

  • 开云体育官方注册网址Debezium版本

  • 连接器的名字

  • 记录事件的Binlog名称

  • binlog位置

  • 事件中的行

  • 如果事件是快照的一部分

  • 包含新行的数据库和表的名开云体育电动老虎机称

  • 创建事件的MySQL线程的ID(仅限非快照)

  • MySQL服务器ID(如果可用)

  • 在数据库中进行更改的时间戳开云体育电动老虎机

如果binlog_rows_query_log_events启用MySQL配置选项和连接器配置include.query属性启用时,则字段还提供查询字段,其中包含导致更改事件的原始SQL语句。

更新事件

样例中更新的更改事件的值客户表的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。中的更新中连接器生成的事件中的更改事件值的示例客户表:

{"schema":{…}, "payload": {"before": {(1)“id”:1004年,“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“后”:{(2)"id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "source": {(3)“版本”:“1.5.4。Final", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581029100, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"}, "op": "u",(4)“ts_ms”:1465581029523(5)}}
表6所示。的描述更新事件值字段
字段名 描述

1

之前

可选字段,指定事件发生之前行的状态。在一个更新事件值,则之前字段包含每个表列的字段以及数据库提交前该列中的值。开云体育电动老虎机在本例中,first_name值是安妮。

2

可选字段,指定事件发生后行的状态。你可以比较之前结构来确定对该行的更新内容。在本例中,first_name价值就是现在安妮玛丽

3.

描述事件的源元数据的必填字段。的字段结构中的字段与创建事件,但有些值是不同的,例如,示例更新事件在binlog中的不同位置。源元数据包括:

  • 开云体育官方注册网址Debezium版本

  • 连接器的名字

  • 记录事件的Binlog名称

  • binlog位置

  • 事件中的行

  • 如果事件是快照的一部分

  • 包含已更新行的数据库和表开云体育电动老虎机的名称

  • 创建事件的MySQL线程的ID(仅限非快照)

  • MySQL服务器ID(如果可用)

  • 在数据库中进行更改的时间戳开云体育电动老虎机

如果binlog_rows_query_log_events启用MySQL配置选项和连接器配置include.query属性启用时,则字段还提供查询字段,其中包含导致更改事件的原始SQL语句。

4

人事处

返回string,描述操作类型。在一个更新事件值,则人事处字段值为u,表示该行因更新而更改。

5

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

更新一行的主键/唯一键的列将更改该行键的值。当一个键改变时,Debezium输出开云体育官方注册网址三个事件:删除事件和墓碑上的事件使用该行的旧键,然后使用该行的新键执行一个事件。详情见下一节。

主键更新

一个更新更改行的主键字段的操作称为主键更改。对于主键更改,替换为更新事件记录时,连接器发出删除事件记录的旧键和创建新的(更新的)键的事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:

  • 删除事件记录有__开云体育官方注册网址debezium.newkey作为消息头。此标头的值是更新行的新主键。

  • 创建事件记录有__开云体育官方注册网址debezium.oldkey作为消息头。此标头的值是已更新行的前一个(旧的)主键。

删除事件

的值删除改变事件有相同之处模式部分为创建更新同一表的事件。的有效载荷的一部分删除事件。客户表是这样的:

{"schema":{…}, "payload": {"before": {(1)"id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "after": null,(2)“源”:{(3)“版本”:“1.5.4。Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581902300, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 805, "row": 0, "thread": 7, "query": "DELETE FROM customers WHERE id=1004"}, "op": "d",(4)“ts_ms”:1465581902461(5)}}
表7所示。的描述删除事件值字段
字段名 描述

1

之前

可选字段,指定事件发生之前行的状态。在一个删除事件值,则之前字段包含在数据库提交时删除该行之前该行中的值。开云体育电动老虎机

2

可选字段,指定事件发生后行的状态。在一个删除事件值,则字段是,表示该行已不存在。

3.

描述事件的源元数据的必填字段。在一个删除事件值,则字段结构与for相同创建更新同一表的事件。许多字段值也相同。在一个删除事件值,则ts_mspos字段值以及其他值可能已经更改。但是,字段在删除事件值提供相同的元数据:

  • 开云体育官方注册网址Debezium版本

  • 连接器的名字

  • 记录事件的Binlog名称

  • binlog位置

  • 事件中的行

  • 如果事件是快照的一部分

  • 包含已更新行的数据库和表开云体育电动老虎机的名称

  • 创建事件的MySQL线程的ID(仅限非快照)

  • MySQL服务器ID(如果可用)

  • 在数据库中进行更改的时间戳开云体育电动老虎机

如果binlog_rows_query_log_events启用MySQL配置选项和连接器配置include.query属性启用时,则字段还提供查询字段,其中包含导致更改事件的原始SQL语句。

4

人事处

返回string,描述操作类型。的人事处字段值为d,表示该行已被删除。

5

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

一个删除更改事件记录为使用者提供了处理删除该行所需的信息。包含旧值是因为一些使用者可能需要它们来正确处理删除。

MySQL连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。

墓碑上的事件

删除一行时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有相同键的消息,消息值必须为.为了实现这一点,在Debezium的MySQL连接器发出一开云体育官方注册网址个删除事件时,连接器发出一个特殊的墓碑事件,该事件具有相同的键,但具有价值。

数据类型映射

Debe开云体育官方注册网址zium MySQL连接器用事件表示行更改,事件的结构类似于行所在的表。该事件为每个列值包含一个字段。该列的MySQL数据类型决定了Debezium如何表示事件中的值。开云体育官方注册网址

存储字符串的列在MySQL中使用字符集和排序规则定义。MySQL连接器在binlog事件中读取列值的二进制表示时使用列的字符集。

连接器可以将MySQL数据类型映射到两者文字语义类型。

  • 文字类型:如何使用Kafka Connect模式类型来表示值

  • 语义类型: Kafka Connect模式如何捕获字段的含义(模式名)

基本类型

下表显示了连接器如何映射基本的MySQL数据类型。

表8所示。基本类型映射的描述
MySQL类型 文字类型 语义类型

布尔,布尔值

布尔

N/A

位(1)

布尔

N/A

位(> 1)

字节

io.开云体育官方注册网址debezium.data.Bits
长度模式参数包含一个表示比特数的整数。的byte []包含的位低位优先形式,并调整大小以包含指定的位数。例如,其中n位:
numBytes = n/8 + (n%8== 0 ?0: 1)

非常小的整数

INT16

N/A

短整型((M))

INT16

N/A

MEDIUMINT ((M))

INT32

N/A

整数,整数((M))

INT32

N/A

长整型数字((M))

INT64

N/A

真正的((M, D))

FLOAT32

N/A

浮动((M, D))

FLOAT64

N/A

双((M, D))

FLOAT64

N/A

CHAR (M))

字符串

N/A

VARCHAR (M))

字符串

N/A

二进制(M))

字节字符串

N/A
原始字节(默认值)、base64编码的字符串或十六进制编码的字符串binary.handling.mode连接器配置属性设置。

VARBINARY (M))

字节字符串

N/A
原始字节(默认值)、base64编码的字符串或十六进制编码的字符串binary.handling.mode连接器配置属性设置。

TINYBLOB

字节字符串

N/A
原始字节(默认值)、base64编码的字符串或十六进制编码的字符串binary.handling.mode连接器配置属性设置。

非常小的文本串

字符串

N/A

字节字符串

N/A
原始字节(默认值)、base64编码的字符串或十六进制编码的字符串binary.handling.mode连接器配置属性设置。

文本

字符串

N/A

MEDIUMBLOB

字节字符串

N/A
原始字节(默认值)、base64编码的字符串或十六进制编码的字符串binary.handling.mode连接器配置属性设置。

简单

字符串

N/A

LONGBLOB

字节字符串

N/A
原始字节(默认值)、base64编码的字符串或十六进制编码的字符串binary.handling.mode连接器配置属性设置。

量变

字符串

N/A

JSON

字符串

io.开云体育官方注册网址debezium.data.Json
的字符串表示形式JSON文档、数组或标量。

枚举

字符串

io.开云体育官方注册网址debezium.data.Enum
允许Schema参数包含以逗号分隔的允许值列表。

字符串

io.开云体育官方注册网址debezium.data.EnumSet
允许Schema参数包含以逗号分隔的允许值列表。

年[(2 | 4)]

INT32

io.开云体育官方注册网址debezium.time.Year

时间戳((M))

字符串

io.开云体育官方注册网址debezium.time.ZonedTimestamp
ISO 8601格式微秒精度。MySQL允许在…的范围内0 - 6

时间类型

不包括时间戳的值,MySQL的时态类型取决于time.precision.mode连接器配置属性。为时间戳列的默认值指定为CURRENT_TIMESTAMP现在,值1970-01-01就是是Kafka Connect模式的默认值。

MySQL允许为零值日期DATETIME,时间戳列,因为零值有时比空值更可取。当列定义允许空值时,MySQL连接器将零值表示为空值,当列不允许空值时,则将零值表示为历元日。

没有时区的时间值

DATETIMEType表示本地日期和时间,例如“2018-01-13 09:48:27”。如您所见,这里没有时区信息。这些列将使用UTC根据列的精度转换为epoch毫秒或微秒。的时间戳类型表示不包含时区信息的时间戳。写入时,MySQL将从服务器(或会话)当前时区转换为UTC,读取时将从UTC转换为服务器(或会话)当前时区。例如:

  • DATETIME值为2018-06-20 06:37:03就变成了1529476623000

  • 时间戳值为2018-06-20 06:37:03就变成了2018 - 06 - 20 - t13:37:03z

这些列被转换为等效的列io.开云体育官方注册网址debezium.time.ZonedTimestamp基于服务器(或会话)当前时区的UTC格式。默认从服务器上查询时区。如果失败,则必须由数据库显式指定开云体育电动老虎机serverTimezoneMySQL配置选项。例如,如果数据库的时区(全局的或为连接器开云体育电动老虎机配置的)serverTimezone选项)为“America/Los_Angeles”,则TIMESTAMP值“2018-06-20 06:37:03”由ZonedTimestamp值为“2018-06-20T13:37:03Z”。

运行Kafka Connect和Debezium的JVM的时区不会影响这些转换。开云体育官方注册网址

有关与时间值相关的属性的更多详细信息,请参阅MySQL连接器配置属性

time.precision.mode = adaptive_time_microseconds(默认)

MySQL连接器根据列的数据类型定义确定文字类型和语义类型,以便事件准确地表示数据库中的值。开云体育电动老虎机所有时间字段都以微秒为单位。只有积极的时间范围内的字段值00:00:00.00000023:59:59.999999可以正确捕获。

表9所示。映射时time.precision.mode = adaptive_time_microseconds
MySQL类型 文字类型 语义类型

日期

INT32

io.开云体育官方注册网址debezium.time.Date
表示自纪元以来的天数。

((M))

INT64

io.开云体育官方注册网址debezium.time.MicroTime
表示以微秒为单位的时间值,不包括时区信息。MySQL允许在…的范围内0 - 6

Datetime, Datetime (0), Datetime (1), Datetime (2), Datetime (3)

INT64

io.开云体育官方注册网址debezium.time.Timestamp
表示经过纪元的毫秒数,不包括时区信息。

Datetime (4), Datetime (5), Datetime (6)

INT64

io.开云体育官方注册网址debezium.time.MicroTimestamp
表示经过纪元的微秒数,不包括时区信息。

time.precision.mode =连接

MySQL连接器使用定义好的Kafka Connect逻辑类型。此方法不如默认方法精确,如果数据库列具有开云体育电动老虎机分数秒精度值大于3..值的范围00:00:00.00023:59:59.999可以处理。集time.precision.mode =连接只有你能保证时间表中的值永远不会超过支持的范围。的连接设置有望在Debezium的未来版本中被删除。开云体育官方注册网址

表10。映射时time.precision.mode =连接
MySQL类型 文字类型 语义类型

日期

INT32

org.apache.kafka.connect.data.Date
表示自纪元以来的天数。

((M))

INT64

org.apache.kafka.connect.data.Time
表示自午夜以来的时间值(以微秒为单位),不包括时区信息。

DATETIME ((M))

INT64

org.apache.kafka.connect.data.Timestamp
表示自纪元以来的毫秒数,不包括时区信息。

十进制类型

开云体育官方注册网址的设置来处理小数decimal.handling.mode连接器配置属性

decimal.handling.mode =精确
表11所示。映射时decimal.handing.mode =精确
MySQL类型 文字类型 语义类型

数字((M [D]))

字节

org.apache.kafka.connect.data.Decimal
规模模式参数包含一个整数,表示小数点移位了多少位。

小数((M [D]))

字节

org.apache.kafka.connect.data.Decimal
规模模式参数包含一个整数,表示小数点移位了多少位。

decimal.handling.mode =双
表12。映射时decimal.handing.mode =双
MySQL类型 文字类型 语义类型

数字((M [D]))

FLOAT64

N/A

小数((M [D]))

FLOAT64

N/A

decimal.handling.mode =字符串
表13。映射时decimal.handing.mode =字符串
MySQL类型 文字类型 语义类型

数字((M [D]))

字符串

N/A

小数((M [D]))

字符串

N/A

布尔值

MySQL处理布尔以特定的方式进行内部价值评估。的布尔列在内部映射到非常小的整数(1)数据类型。当在流期间创建表时,它使用proper布尔当Debezium接收开云体育官方注册网址到原始DDL时映射。在快照期间,Debezium执行开云体育官方注册网址显示创建表获取返回的表定义非常小的整数(1)对于这两个布尔非常小的整数(1)列。开云体育官方注册网址Debezium没有办法获得原始类型映射,因此映射到非常小的整数(1)

操作员可以配置开箱即用TinyIntOneToBooleanConverter自定义转换器这将映射所有非常小的整数(1)布尔或者如果选择器参数,则可以使用逗号分隔的正则表达式枚举列的子集。

配置示例如下:

converters=boolean boolean.type=io.开云体育官方注册网址debezium.connector.mysql.converters.TinyIntOneToBooleanConverter boolean.selector=db1.table1。*, db1.table2.column1

空间类型

目前,Debezium MyS开云体育官方注册网址QL连接器支持以下空间数据类型。

表14。空间类型映射的描述
MySQL类型 文字类型 语义类型

几何,
LINESTRING,
多边形,
多点,
MULTILINESTRING,
多个多边形,
GEOMETRYCOLLECTION

结构体

io.开云体育官方注册网址debezium.data.geometry.Geometry
包含两个字段的结构:

  • srid (INT32:空间引用系统ID,用于定义存储在结构中的几何对象类型

  • wkb(字节):以wkb格式编码的几何对象的二进制表示形式。看到开放地理空间联盟欲知详情。

设置

在安装和运行Debezium连接器之前,需要完成一些MySQL设置任务。开云体育官方注册网址

创建用户

De开云体育官方注册网址bezium MySQL连接器需要一个MySQL用户帐户。这个MySQL用户必须对Debezium MySQL连接器捕获更改的所有数据库具有适当的权限。开云体育官方注册网址开云体育电动老虎机

先决条件
  • MySQL服务器。

  • 基本了解SQL命令。

过程
  1. 创建MySQL用户:

    mysql> CREATE USER ' USER '@'localhost' IDENTIFIED BY 'password'
  2. 授予用户所需的权限:

    mysql>授权选择,重新加载,显示数据库,复制从,复制客户端上*。开云体育电动老虎机*以“密码”识别的“用户”;

    权限说明如下表所示。

    如果使用不允许全局读锁的托管选项(如Amazon RDS或Amazon Aurora),则使用表级锁创建全局读锁一致的快照.在这种情况下,您还需要授予锁表您所创建的用户的权限。看到快照欲知详情。
  3. 确定用户权限:

    mysql> FLUSH PRIVILEGES;
表15。用户权限说明
关键字 描述

选择

允许连接器从数据库中的表中选择行。开云体育电动老虎机仅在执行快照时使用。

重新加载

使连接器能够使用冲洗语句来清除或重新加载内部缓存、刷新表或获取锁。仅在执行快照时使用。

显示数据库开云体育电动老虎机

命令使连接器可以查看数据库名称开云体育电动老虎机显示数据库开云体育电动老虎机声明。仅在执行快照时使用。

复制的奴隶

允许连接器连接和读取MySQL服务器的binlog。

复制客户端

允许连接器使用以下语句:

  • 显示主机状态

  • 显示奴隶状态

  • 显示二进制日志

连接器总是需要这个。

标识权限应用到的数据库。开云体育电动老虎机

“用户”

指定要授予权限的用户。

通过“密码”识别

指定用户的MySQL密码。

启用binlog

MySQL复制必须启用二进制日志记录。二进制日志记录用于复制工具传播更改的事务更新。

先决条件
  • MySQL服务器。

  • 适当的MySQL用户权限。

过程
  1. 检查是否log-bin选项已开启:

    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin)::" FROM information_schemaWHERE variable_name='log_bin';
  2. 如果是的话,用以下属性配置你的MySQL服务器配置文件,如下表所示:

    server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10
  3. 再次检查binlog状态,确认您的更改:

    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin)::" FROM information_schemaWHERE variable_name='log_bin';
表16所示。MySQL binlog配置属性说明
财产 描述

服务器id

的值。服务器idMySQL集群中的每个服务器和复制客户端必须是唯一的。在MySQL连接器设置期间,Debezium为连接器分配一个唯开云体育官方注册网址一的服务器ID。

log_bin

的价值log_binbinlog文件序列的基名称。

binlog_format

binlog-format必须设置为

binlog_row_image

binlog_row_image必须设置为完整的完整的

expire_logs_days

这是自动删除binlog文件的天数。默认为0,这意味着不会自动移除。将该值设置为与您的环境需求相匹配。看到MySQL清除binlog文件

使GTIDs

全局事务标识符(gtid)唯一标识集群内服务器上发生的事务。虽然对于Debezium MySQL连接器来说不是必开云体育官方注册网址需的,但是使用gtid简化了复制,使您能够更容易地确认主服务器和复制服务器是否一致。

gtid在MySQL 5.6.5及更高版本中可用。看到MySQL文档欲知详情。

先决条件
  • MySQL服务器。

  • 基本了解SQL命令。

  • 访问MySQL配置文件。

过程
  1. 启用gtid_mode

    mysql > gtid_mode =
  2. 启用enforce_gtid_consistency

    mysql > enforce_gtid_consistency =
  3. 确认更改:

    mysql>显示全局变量“%GTID%”;
结果
+--------------------------+-------+ | Variable_name |值  | +--------------------------+-------+ | 上enforce_gtid_consistency | | | gtid_mode |  | +--------------------------+-------+
表17所示。GTID选项说明
选项 描述

gtid_mode

布尔值,指定MySQL服务器是否启用GTID模式。

  • =启用

  • =禁用

enforce_gtid_consistency

布尔值,指定服务器是否通过允许以事务安全方式登录的语句执行来强制GTID一致性。使用gtid时必须。

  • =启用

  • =禁用

配置会话超时

当为大型数据库创建初始一致快照时,您所建立的连接可能会在读取表时超时。开云体育电动老虎机可以通过配置来防止这种行为interactive_timeoutwait_timeout在MySQL配置文件中。

先决条件
  • MySQL服务器。

  • 基本了解SQL命令。

  • 访问MySQL配置文件。

过程
  1. 配置interactive_timeout

    mysql > interactive_timeout = < duration-in-seconds >
  2. 配置wait_timeout

    mysql > wait_timeout = < duration-in-seconds >
表18。MySQL会话超时选项说明
选项 描述

interactive_timeout

服务器在关闭交互连接之前等待交互连接活动的秒数。看到MySQL的文档欲知详情。

wait_timeout

服务器在关闭非交互式连接之前等待活动的秒数。看到MySQL的文档欲知详情。

启用查询日志事件

你可能想看看原版SQL语句用于每个binlog事件。使binlog_rows_query_log_eventsMySQL配置文件中的选项允许你这样做。

该选项在MySQL 5.6及更高版本中可用。

先决条件
  • MySQL服务器。

  • 基本了解SQL命令。

  • 访问MySQL配置文件。

过程
  • 启用binlog_rows_query_log_events

    mysql > binlog_rows_query_log_events =

    binlog_rows_query_log_events是否设置为启用/禁用对包含原始文件的支持的值SQL语句。

    • =启用

    • =禁用

部署

要部署Debezium 开云体育官方注册网址MySQL连接器,您需要安装Debezium MySQL连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。

过程
  1. 下载Debezium开云体育官方注册网址MySQL连接器插件

  2. 将文件解压缩到Kafka Connect环境中。

  3. 将包含JAR文件的目录添加到卡夫卡连接的plugin.path

  4. 配置连接器将配置添加到Kafka Connect集群中。

  5. 重新启动Kafka Connect进程以获取新的JAR文件。

如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Apache Zookeeper, Apache Kafka, MySQL和Kafka连接MySQL连接器已经安装并准备运行。

MySQL连接器配置示例

下面是一个连接器实例的配置示例,该实例从端口为192.168.99.100的3306上的MySQL服务器捕获数据,逻辑上我们将其命名为192.168.99.100fullfillment.通常,通过设置连接器可用的配置属性,可以在JSON文件中配开云体育官方注册网址置Debezium MySQL连接器。

您可以选择为数据库中的模式和表的子集生成事件。开云体育电动老虎机可选地,您可以忽略、屏蔽或截断包含敏感数据的列、大于指定大小的列或不需要的列。

{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.mysql. mysqlconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“3306”,(4)“开云体育电动老虎机数据库。使用r": "debezium-user",(5)“开云体育电动老虎机数据库。密码”:“debeziu开云体育官方注册网址m-user-pw”,(6)“开云体育电动老虎机database.server。id”:“184054”,(7):开云体育电动老虎机“database.server.name fullfillment”,(8)“开云体育电动老虎机database.include。列表”:“库存”,(9)“开云体育电动老虎机database.history.kafka.bootstrap。服务器”:“卡夫卡:9092”,(10)“开云体育电动老虎机database.history.kafka。来pic": "dbhistory.fullfillment",(11)“include.schema。变化”:“真正的”(12)}}
1 在Kafka Connect服务中注册连接器的名称。
2 连接器的类名。
3. MySQL服务器地址。
4 MySQL服务器端口号。
5 具有适当权限的MySQL用户。
6 MySQL用户密码。
7 连接器的唯一ID。
8 MySQL服务器或集群的逻辑名称。
9 指定服务器托管的开云体育电动老虎机数据库列表。
10 连接器用来将DDL语句写入并恢复到数据库历史主题的Kafka代理列表。开云体育电动老虎机
11 数据库历史记录主题的名称开云体育电动老虎机。此主题仅供内部使用,消费者不应使用。
12 标记,该标记指定连接器是否应为DDL更改生成事件并将事件发送到实现架构更改主题供使用者使用。

有关可以为Debezium MySQL连接器设置的配置属性的完整列表,请参见开云体育官方注册网址MySQL连接器配置属性

您可以使用帖子命令到正在运行的Kafka Connect服务。该服务记录配置并启动一个执行以下操作的连接器任务:

  • 连接MySQL数据库。开云体育电动老虎机

  • 在捕获模式下读取更改数据表。

  • 流将事件记录更改为Kafka主题。

添加连接器配置

要开始运行MySQL连接器,请配置一个连接器配置,并将该配置添加到Kafka Connect集群中。

先决条件
过程
  1. 为MySQL连接器创建一个配置。

  2. 使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。

结果

当连接器启动时,它将执行一致性快照配置连接器的MySQL数据开云体育电动老虎机库。然后连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到Kafka主题。

连接器属性

Debe开云体育官方注册网址zium MySQL连接器有许多配置属性,您可以使用这些属性为您的应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:

以下配置属性为要求除非有默认值可用。

所需的Debezi开云体育官方注册网址um MySQL连接器配置属性

财产 默认的 描述

连接器的唯一名称。试图用相同的名称再次注册失败。所有Kafka Connect连接器都需要这个属性。

连接器的Java类的名称。总是指定io.开云体育官方注册网址debezium.connector.mysql.MySqlConnectorMySQL连接器。

1

应该为此连接器创建的最大任务数。MySQL连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。

MySQL数据库服务器的IP地址或主机名。开云体育电动老虎机

3306

MySQL数据库服务器的整型端口号。开云体育电动老虎机

连接MySQL数据库服务器时使用的MySQL用户名。开云体育电动老虎机

连接MySQL数据库服务器时使用的密码。开云体育电动老虎机

逻辑名称,用于标识特定的MySQL数据库服务器/集群并为其提供命名空间,Debezium在其中捕获更改。开云体育官方注册网址开云体育电动老虎机逻辑名在所有其他连接器中应该是唯一的,因为它被用作接收该连接器发出的事件的所有Kafka主题名的前缀。该名称只允许使用字母数字字符和下划线。

随机

这个数据库客户端的数字ID,在MySQL集开云体育电动老虎机群中所有当前运行的数据库进程中必须是唯一的。这个连接器作为另一个服务器(使用这个唯一的ID)加入MySQL开云体育电动老虎机数据库集群,因此它可以读取binlog。默认情况下,会生成一个介于5400和6400之间的随机数,不过建议显式地设置一个值。

空字符串

一个可选的、以逗号分隔的正则表达式列表,它与要捕获更改的数据库的名称相匹配。开云体育电动老虎机连接器不会捕获名称不在中的任何数据库中的更改开云体育电动老虎机开云体育电动老虎机database.include.list.默认情况下,连接器捕获所有数据库中的更改。开云体育电动老虎机不也定了吗开云体育电动老虎机database.exclude.list连接器配置属性。

空字符串

一个可选的、以逗号分隔的正则表达式列表,它与您不想捕获更改的数据库的名称相匹配。开云体育电动老虎机连接器捕获名称不在数据库中的任何数据库中的更改开云体育电动老虎机开云体育电动老虎机database.exclude.list.不也定了吗开云体育电动老虎机database.include.list连接器配置属性。

空字符串

一个可选的、以逗号分隔的正则表达式列表,它匹配您想要捕获其更改的表的完全限定表标识符。连接器不会捕获未包含的任何表中的更改table.include.list.每个标识符都是这样的开云体育电动老虎机数据库名的表.默认情况下,连接器捕获每个数据库中每个非系统表中的更改,这些表的更改正在被捕获。开云体育电动老虎机不也指定table.exclude.list连接器配置属性。

空字符串

一个可选的、以逗号分隔的正则表达式列表,它匹配不希望捕获其更改的表的完全限定表标识符。连接器捕获未包含的任何表中的更改table.exclude.list.每个标识符都是这样的开云体育电动老虎机数据库名的表.不也指定table.include.list连接器配置属性。

空字符串

一个可选的、以逗号分隔的正则表达式列表,它匹配要从更改事件记录值中排除的列的完全限定名称。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName

空字符串

一个可选的、以逗号分隔的正则表达式列表,它匹配要包含在变更事件记录值中的列的完全限定名称。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName

N/A

一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配,如果字段值长于指定的字符数,则这些列的值应该在更改事件记录值中被截断。您可以在单个配置中配置具有不同长度的多个属性。长度必须为正整数。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName

N/A

一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配,这些列的值应该在更改事件消息值中被替换为由指定数量的星号()字符。您可以在单个配置中配置具有不同长度的多个属性。每个长度必须为正整数或零。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName

N/A

一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName.在产生的更改事件记录中,指定列的值将被假名替换。

假名由应用指定的参数产生的散列值组成hashAlgorithm.根据所使用的散列函数,将维护引用完整性,而列值将被假名替换。中描述了支持的哈希函数MessageDigest节Java密码体系结构标准算法名称文档。

在下面的例子中,CzQMA0cB5K是随机选择的盐。

column.mask.hash.sha with.salt——256.。CzQMA0cB5K =库存。订单。customerName, inventory.shipment.customerName

必要时,笔名会自动缩短为列的长度。连接器配置可以包括多个属性,这些属性指定不同的哈希算法和盐。

取决于hashAlgorithm使用,选中的数据集与实际数据集相匹配时,得到的数据集可能不会被完全屏蔽。

N/A

一个可选的、以逗号分隔的正则表达式列表,它匹配列的完全限定名称,这些列的原始类型和长度应该作为参数添加到发出的更改事件记录中的相应字段模式中。这些模式参数:

__开云体育官方注册网址Debezium.source.column.type

__开云体育官方注册网址Debezium.source.column.length

__开云体育官方注册网址Debezium.source.column.scale

分别用于传播变宽类型的原始类型名和长度。这对于接收器数据库中相应列的适当大小非常有用。开云体育电动老虎机列的完全限定名是以下形式之一:

开云体育电动老虎机数据库名的表columnName

开云体育电动老虎机数据库名schemaName的表columnName

N/A

一个可选的、以逗号分隔的正则表达式列表,它与特定于数据库的数据类型名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改事件记录中的相应开云体育电动老虎机字段模式中。这些模式参数:

__开云体育官方注册网址debezium.source.column.type

__开云体育官方注册网址debezium.source.column.length

__开云体育官方注册网址debezium.source.column.scale

分别用于传播变宽类型的原始类型名和长度。这对于接收器数据库中相应列的适当大小非常有用。开云体育电动老虎机完全限定数据类型名是以下形式之一:

开云体育电动老虎机数据库名的表typeName

开云体育电动老虎机数据库名schemaName的表typeName

看到MySQL连接器如何映射数据类型查看mysql特定的数据类型名称列表。

adaptive_time_microseconds

时间、日期和时间戳可以用不同的精度表示,包括:

adaptive_time_microseconds(默认值)根据数据库列的类型使用毫秒、微秒或纳秒精度值捕获数据库中的日期、datetime和时间戳值,TIME类型字段除外,它总是以微秒为单位捕获。开云体育电动老虎机

自适应(已弃用)根据数据库列的类型,使用毫秒、微秒或纳秒精度值捕获与数据库中完全相同的时间和时间戳值。开云体育电动老虎机

连接总是使用Kafka Connect内置的时间、日期和时间戳表示来表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。开云体育电动老虎机

精确的

指定连接器应如何处理的值小数数字列:

精确的(默认值)精确地表示它们使用java.math.BigDecimal变更事件中以二进制形式表示的值。

表示它们使用值,这可能会导致精度损失,但更容易使用。

字符串将值编码为格式化的字符串,这很容易使用,但关于真实类型的语义信息会丢失。

指定如何在更改事件中表示BIGINT UNSIGNED列。可能的设置有:

用Java的表示值这种方法可能无法提供精准度,但在消费者中很容易使用。通常是首选的设置。

精确的使用java.math.BigDecimal来表示值,这些值通过使用二进制表示和Kafka Connect的方式编码在更改事件中org.apache.kafka.connect.data.Decimal类型。当处理大于2^63的值时使用此设置,因为这些值不能通过使用

真正的

布尔值,指定连接器是否应该将数据库模式中的更改发布到与数据库服务器ID同名的Kafka主题。开云体育电动老虎机通过使用一个包含数据库名称且值包含DDL语句的键来记录每个模式更改。开云体育电动老虎机这与连接器内部记录数据库历史的方式无关。开云体育电动老虎机

布尔值,该值指定连接器是否应包括生成更改事件的原始SQL查询。

如果将此选项设置为真正的那么你还必须配置MySQLbinlog_rows_query_log_events选项设置为.当include.query真正的,对于快照进程生成的事件,该查询不存在。

设置include.query真正的可能会暴露通过在change事件中包含原始SQL语句而显式排除或掩盖的表或字段。因此,默认设置为

失败

指定连接器在反序列化binlog事件期间应对异常的方式。

失败传播异常,该异常指示有问题的事件及其binlog偏移量,并导致连接器停止。

警告记录有问题的事件及其binlog偏移量,然后跳过该事件。

忽略忽略有问题的事件,不记录任何内容。

失败

指定连接器应如何对与不在内部模式表示中出现的表相关的binlog事件作出反应。也就是说,内部表示与数据库不一致。开云体育电动老虎机

失败抛出一个异常,指示有问题的事件及其binlog偏移量,并导致连接器停止。

警告记录有问题的事件及其binlog偏移量,并跳过该事件。

跳过忽略有问题的事件,不记录任何内容。

8192

正整数值,指定阻塞队列的最大大小,从数据库日志中读取的更改事件在写入Kafka之前被放置在其中。开云体育电动老虎机例如,当写入Kafka很慢或者Kafka不可用时,这个队列可以为binlog阅读器提供反压力。出现在队列中的事件不包括在此连接器定期记录的偏移量中。属性指定的最大批处理大小,默认为8192max.batch.size财产。

2048

正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。

0

阻塞队列最大大小(以字节为单位)的长值。默认情况下,该功能是禁用的,如果设置为正长值,则该功能将激活。

1000

正整数值,指定连接器在开始处理一批事件之前等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。

30000

一个正整数值,指定连接器在尝试连接到MySQL数据库服务器后超时前应该等待的最大时间(以毫秒为单位)。开云体育电动老虎机默认为30秒。

一个用逗号分隔的正则表达式列表,它匹配GTID集中的源uuid,用于查找MySQL服务器中的binlog位置。只有源匹配其中一个包含模式的GTID范围才会被使用。不也指定一个设置gtid.source.excludes

一个用逗号分隔的正则表达式列表,它匹配GTID集中的源uuid,用于查找MySQL服务器中的binlog位置。只有源不匹配任何这些排除模式的GTID范围才会被使用。不也指定一个值gtid.source.includes

gtid.new.channel.position
已弃用并计划删除

最早的

当设置为最新的,当连接器看到一个新的GTID通道时,它开始使用该GTID通道中最后执行的事务。如果设置为最早的(默认),连接器从第一个可用的(未清除的)GTID位置开始读取该通道。最早的当你有一个主-被动MySQL设置,Debezium连接到主服务器时,它是有用的。开云体育官方注册网址在这种情况下,在故障转移期间,具有新UUID(和GTID通道)的副本在Debezium连接之前开始接收写操作。开云体育官方注册网址这些写操作在使用时会丢失最新的

真正的

控制是否删除事件之后是一个墓碑事件。

真正的-删除操作用a表示删除事件和后续的墓碑事件。

-只有一个删除事件被触发。

当一个源记录被删除后,触发一个墓碑事件(默认行为)允许Kafka完全删除与被删除行的键相关的所有事件日志压实已为主题启用。

N/A

使用正则表达式匹配表列名的分号分隔的表列表。连接器将匹配列中的值映射到它发送给Kafka主题的更改事件记录中的关键字段。当一个表没有主键,或者当你想根据一个不是主键的字段来排序Kafka主题中的更改事件记录时,这是很有用的。

用分号分隔条目。在完全限定表名与其正则表达式之间插入冒号。格式(为清楚起见,用空格显示)为:

开云体育电动老虎机数据库名称表名正则表达式...

例如:

dbA.table_a: regex_1; dbB.table_b: regex_2; dbC.table_c: regex_3

如果table_a有一个id列,regex_1^我匹配以。开头的任何列对象中的值进行映射id列的table_a到连接器发送给Kafka的change事件中的关键字段。

字节

指定二进制列,例如,二进制varbinary,应该在变更事件中表示。可能的设置:

字节将二进制数据表示为字节数组。

base64将二进制数据表示为base64编码的字符串。

十六进制表示二进制数据为十六进制编码(base16)字符串。

高级MySQL连接器配置属性

下表描述了高级MySQL连接器属性.这些属性的默认值很少需要更改。因此,您不需要在连接器配置中指定它们。

表19。MySQL连接器高级配置属性说明
财产 默认的 描述

真正的

一个布尔值,指定是否应该使用一个单独的线程来确保与MySQL服务器/集群的连接保持活跃。

真正的

一个布尔值,指定是否应忽略内置系统表。无论表包含列表和排除列表如何,这都适用。默认情况下,系统表的更改不会被捕获,并且当对任何系统表进行更改时,不会生成任何事件。

禁用

指定是否使用加密连接。可能的设置有:

禁用指定使用未加密连接。

首选如果服务器支持安全连接,则建立加密连接。如果服务器不支持安全连接,则退回到未加密的连接。

要求建立加密连接,如果由于任何原因无法建立连接,则会失败。

verify_ca表现得像要求但是另外,它会根据配置的证书颁发机构(CA)证书验证服务器TLS证书,如果服务器TLS证书与任何有效的CA证书不匹配,则会失败。

verify_identity表现得像verify_ca但是还要验证服务器证书是否与远程连接的主机相匹配。

0

binlog读取器使用的预读缓冲区的大小。的默认设置0禁用缓冲。

在特定的条件下,MySQL binlog可能包含未提交的数据回滚声明。典型的例子是在单个事务中使用保存点或混合临时和常规的表更改。

当检测到事务的开始时,Debezium尝试前滚binlog位置并找到其中任何一个开云体育官方注册网址提交回滚因此,它可以确定是否从事务中传输更改。binlog缓冲区的大小定义了Debezium在搜索事务边界时可以缓冲的事务更改的最大数量。开云体育官方注册网址如果事务的大小大于缓冲区,则Debezium必须在流处理时倒带并重新读取缓冲区中不适合的事件。开云体育官方注册网址

注意:此功能正在孵化中。鼓励反馈。预计这个功能还没有完全完善。

最初的

指定在连接器启动时运行快照的条件。可能的设置有:

最初的—只有在逻辑服务器名没有记录偏移时,连接器才会运行快照。

when_needed-连接器在启动时运行快照,只要它认为有必要。也就是说,没有可用的偏移量,或者以前记录的偏移量指定了服务器中不可用的binlog位置或GTID。

从来没有—连接器从不使用快照。在第一次使用逻辑服务器名启动时,连接器从binlog的开头读取数据。小心配置这种行为。只有当binlog保证包含数据库的整个历史时,它才有效。开云体育电动老虎机

schema_only连接器运行模式的快照,而不是数据的快照。当您不需要主题包含数据的一致快照,而只需要主题具有自连接器启动以来的更改时,此设置非常有用。

schema_only_recovery-这是一个已经捕获更改的连接器的恢复设置。重新启动连接器时,此设置允许恢复损坏或丢失的数据库历史主题。开云体育电动老虎机您可以定期设置它,以“清理”意外增长的数据库历史主题。开云体育电动老虎机开云体育电动老虎机数据库历史主题需要无限保留。

最小的

控制连接器是否持有全局MySQL读锁以及持有多长时间,在连接器执行快照时,可以防止对数据库的任何更新。开云体育电动老虎机可能的设置有:

最小的-连接器仅对快照的初始部分持有全局读锁,在此期间连接器读取数据库模式和其他元数据。开云体育电动老虎机快照中的其余工作包括从每个表中选择所有行。连接器可以通过使用REPEATABLE READ事务以一致的方式实现这一点。即使全局读锁不再持有,其他MySQL客户端正在更新数据库,也会出现这种情况。开云体育电动老虎机

minimal_percona-连接器保持全局备份锁仅用于快照的初始部分,在此期间连接器读取数据库模式和其他元数据。开云体育电动老虎机快照中的其余工作包括从每个表中选择所有行。连接器可以通过使用REPEATABLE READ事务以一致的方式实现这一点。即使不再持有全局备份锁,而其他MySQL客户端正在更新数据库,也会出现这种情况。开云体育电动老虎机此模式不会将表刷新到磁盘,也不会被长时间的读取阻塞,并且仅在Percona Server中可用。

扩展-在快照期间阻塞所有写操作。如果有客户端正在提交MySQL从REPEATABLE READ语义中排除的操作,请使用此设置。

没有一个-防止连接器在快照期间获取任何表锁。虽然所有快照模式都允许此设置,但如果和使用该设置是安全的只有如果快照运行时没有发生模式更改。对于用MyISAM引擎定义的表,尽管设置了这个属性,但当MyISAM获得表锁时,表仍然是锁定的。这种行为与InnoDB引擎不同,InnoDB引擎需要行级锁。

中指定的所有表table.include.list

中指定的模式名称匹配的可选的、以逗号分隔的正则表达式列表table.include.list为此你想要进行快照。

控制快照中包含哪些表行。此属性仅影响快照。它不会影响从binlog中捕获的事件。在表单中指定以逗号分隔的全限定表名列表开云体育电动老虎机databaseName.tableName

对于您指定的每个表,还要指定另一个配置属性:snapshot.select.statement.overrides。DB_NAMETABLE_NAME.例如,另一个配置属性的名称可能是:snapshot.select.statement.overrides.customers.orders.将此属性设置为a选择语句,只获取快照中需要的行。当连接器执行快照时,它执行此操作选择语句从该表检索数据。

设置这些属性的一个可能用例是只能追加的大型表。您可以指定选择语句,用于设置从何处开始快照的特定点,或者如果前一个快照中断,则从何处恢复快照。

1000

在快照期间,连接器查询每个被配置为捕获更改的表。连接器使用每个查询结果生成一个读取事件,其中包含该表中所有行的数据。这个属性决定MySQL连接器是将一个表的结果放入内存中(这是快速的,但需要大量内存),还是将结果流放入内存中(这可能较慢,但适用于非常大的表)。此属性的设置指定在连接器传输结果之前表必须包含的最小行数。

若要跳过所有表大小检查并在快照期间始终流所有结果,请将此属性设置为0

0

控制连接器向Kafka主题发送心跳消息的频率。默认行为是连接器不发送心跳消息。

心跳消息对于监视连接器是否正在接收来自数据库的更改事件非常有用。开云体育电动老虎机心跳消息可能有助于减少连接器重新启动时需要重新发送的更改事件的数量。要发送心跳消息,请将此属性设置为正整数,表示心跳消息之间的毫秒数。

__开云体育官方注册网址debezium-heartbeat

控制连接器向其发送心跳消息的主题的名称。主题名称有这样的模式:

heartbeat.topics.prefixserver.name

例如,数据库服务器名称为开云体育电动老虎机实现,默认主题名为__开云体育官方注册网址debezium-heartbeat.fulfillment

当建立到数据库的JDBC连接(而不是读取事务日志的连接)时,将执行一个以分号分隔的SQL语句列表。开云体育电动老虎机要将分号指定为SQL语句中的字符而不是分隔符,请使用两个分号(;;).

连接器可以根据自己的判断建立JDBC连接,因此此属性仅用于配置会话参数。它不是用来执行DML语句的。

在连接器启动时执行快照之前,连接器应等待的毫秒间隔。如果在集群中启动多个连接器,此属性有助于避免快照中断,因为快照中断可能导致连接器的重新平衡。

在快照期间,连接器以行为单位读取表内容。此属性指定批处理中的最大行数。

10000

正整数,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果连接器在此时间间隔内无法获取表锁,则快照失败。看到MySQL连接器如何执行数据库快照开云体育电动老虎机

真正的

布尔值,指示连接器是否将2位年份规范转换为4位。设置为当转换完全委托给数据库时。开云体育电动老虎机

MySQL允许用户插入2位或4位的年份值。对于2位数的值,值被映射到1970 - 2069之间的年份。默认的行为是连接器执行转换。

v2

的架构版本块在Debeziu开云体育官方注册网址m事件。开云体育官方注册网址Debezium 0.10在结构上引入了一些突破性的变化块,以便在所有连接器之间统一暴露的结构。

通过将此选项设置为v1,可以生产早期版本中使用的结构。但是,不建议使用这种设置,并计划在未来的Debezium版本中删除。开云体育官方注册网址

真正的如果连接器配置设置key.convertervalue.converter属性到Avro转换器。
如果不是。

指示是否对字段名进行清除以遵守Avro命名要求

在流处理期间要跳过的操作类型的逗号分隔列表。可能取值为:c插入/创建、u的更新,d删除。缺省情况下,不跳过任何操作。

确定连接器是否生成具有事务边界的事件,并使用事务元数据丰富更改事件信封。指定真正的如果您希望连接器执行此操作。看到事务的元数据获取详细信息。

开云体育官方注册网址Debezium连开云体育电动老虎机接器数据库历史配置属性

开云体育官方注册网址Debezium提供了一组开云体育电动老虎机database.history。*属性,这些属性控制连接器如何与模式历史主题交互。

下表描述了开云体育电动老虎机database.history属性用于配置Debezium连接器。开云体育官方注册网址

表20。连接器数据库历史配置开云体育电动老虎机属性
财产 默认的 描述

连接器存储数据库模式历史的Kafka主题的全称。开云体育电动老虎机

连接器用来建立到Kafka集群的初始连接的主机/端口对列表。此连接用于检索以前由连接器存储的数据库模式历史,并用于写入从源数据库读取的每个DDL语开云体育电动老虎机句。每一对都应该指向Kafka Connect进程使用的相同的Kafka集群。

One hundred.

一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认值是100ms。

4

在连接器恢复失败并发生错误之前,连接器应尝试读取持久历史数据的最大次数。接收不到数据后等待的最大时间为recovery.attemptsxrecovery.poll.interval.ms

一个布尔值,指定连接器是否应该忽略格式错误或未知的数据库语句,还是停止处理以便人工修复问题。开云体育电动老虎机安全默认值为.在处理binlog时,应该小心使用跳过,因为它可能导致数据丢失或损坏。

一个布尔值,指定连接器是否应该记录所有DDL语句

真正的只记录那些DDL语句,这些语句与Debezium正在捕获的表的更改相关。开云体育官方注册网址设置为真正的要小心,因为如果您更改捕获了哪些表的更改,则可能需要丢失数据。

安全默认值为

一个布尔值,指定连接器是否应该记录所有DDL语句

真正的只记录那些DDL语句,这些语句与Debezium正在捕获的表的更改相关。开云体育官方注册网址设置为真正的要小心,因为如果您更改捕获了哪些表的更改,则可能需要丢失数据。

安全默认值为

用于配置生产者和使用者客户开云体育电动老虎机机的传递数据库历史属性


开云体育官方注册网址Debezium依赖于Kafka生成器将模式更改写入数据库历史主题。开云体育电动老虎机类似地,当连接器启动时,它依赖于Kafka消费者从数据库历史主题中读取。开云体育电动老虎机为Kafka生产者和消费者客户端定义配置,方法是将值赋给一组以开云体育电动老虎机database.history.producer。*开云体育电动老虎机database.history.consumer。*前缀。直通的生产者和消费者数据库历史属性控制了一系列行为,比如这些客户端如何保护与K开云体育电动老虎机afka代理的连接,如下例所示:

开云体育电动老虎机database.history.producer.security。协议SSL databas开云体育电动老虎机e.history.producer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.producer.ssl.keystore。密码= test1234 datab开云体育电动老虎机ase.history.producer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks 开云体育电动老虎机database.history.producer.ssl.truststore。密码= test1234 datab开云体育电动老虎机ase.history.producer.ssl.key。密码= test1234 datab开云体育电动老虎机ase.history.consumer.security。协议SSL databas开云体育电动老虎机e.history.consumer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.consumer.ssl.keystore。密码= test1234 datab开云体育电动老虎机ase.history.consumer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks 开云体育电动老虎机database.history.consumer.ssl.truststore。密码= test1234 datab开云体育电动老虎机ase.history.consumer.ssl.key.password = test1234

开云体育官方注册网址Debezium在将属性传递给Kafka客户端之前,会从属性名中去掉前缀。

更多细节请参阅Kafka文档Kafka生产者配置属性Kafka消费者配置属性

开云体育官方注册网址Debezium连接器直通数据库驱动程序配置属性开云体育电动老虎机

Debe开云体育官方注册网址zium连接器提供数据库驱动程序的直通配置。开云体育电动老虎机传递数据库属性以前缀开始开云体育电动老虎机开云体育电动老虎机数据库。*.例如,连接器传递的属性为开云体育电动老虎机database.foobar = false到JDBC URL。

的情况也是如此数据库历史客户端的传递属性开云体育电动老虎机, 开云体育官方注册网址Debezium在将前缀传递给数据库驱动程序之前将它们从属性中剥离。开云体育电动老虎机

监控

除了Zo开云体育官方注册网址okeeper、Kafka和Kafka Connect提供的内置JMX指标支持外,Debezium MySQL连接器还提供了三种类型的指标。

  • 快照指标在执行快照时提供有关连接器操作的信息。

  • 流指标当连接器读取binlog时,提供有关连接器操作的信息。

  • 架构历史度量提供关于连接器的模式历史记录的状态的信息。

开云体育官方注册网址Debezium监控文档提供了如何使用JMX公开这些指标的详细信息。

快照指标

MBean开云体育官方注册网址debezium.mysql: type = connector-metrics上下文=快照,server =<开云体育电动老虎机 database.server.name >

属性 类型 描述

字符串

连接器读取的最后一个快照事件。

自连接器读取并处理最近事件以来的毫秒数。

自上次启动或重置以来,此连接器已看到的事件总数。

已由连接器上配置的包含/排除列表筛选规则筛选的事件数。

string []

连接器监视的表的列表。

int

用于在快照和主Kafka Connect循环之间传递事件的队列长度。

int

用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。

int

快照中包含的表的总数。

int

快照尚未复制的表数。

布尔

快照是否启动。

布尔

快照是否中止。

布尔

快照是否完成。

快照到目前为止所花费的总秒数,即使没有完成。

Map < String,长>

映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。

队列的最大缓冲区,以字节为单位。如果max.queue.size.in.bytes以一个正的长值传递。

队列中记录的当前数据,以字节为单位。

Debe开云体育官方注册网址zium MySQL连接器还提供HoldingGlobalLock自定义快照度量。此指标被设置为一个布尔值,该值指示连接器当前持有的是全局锁还是表写锁。

流指标

MBean开云体育官方注册网址debezium.mysql: type = connector-metrics、上下文=流媒体服务器= <开云体育电动老虎机 database.server.name >

只有启用binlog事件缓冲时,与事务相关的属性才可用。看到binlog.buffer.size在高级连接器配置属性中获取更多详细信息。

属性 类型 描述

字符串

连接器读取的最后一个流事件。

自连接器读取并处理最近事件以来的毫秒数。

自上次启动或重置以来,此连接器已看到的事件总数。

已由连接器上配置的包含/排除列表筛选规则筛选的事件数。

string []

连接器监视的表的列表。

int

用于在streamer和主Kafka Connect循环之间传递事件的队列长度。

int

用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。

布尔

标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机

从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机

提交的已处理事务的数量。

Map < String, String >

上次接收事件的坐标。

字符串

最后处理的事务的事务标识符。

队列的最大缓冲区,以字节为单位。

队列中记录的当前数据,以字节为单位。

Debe开云体育官方注册网址zium MySQL连接器还提供了以下额外的流指标:

表21。附加流度量的描述
属性 类型 描述

BinlogFilename

字符串

连接器最近读取的binlog文件的名称。

BinlogPosition

连接器已读取的binlog中的最近位置(以字节为单位)。

IsGtidModeEnabled

布尔

标志,表示连接器当前是否跟踪来自MySQL服务器的gtid。

GtidSet

字符串

连接器在读取binlog时处理的最新GTID集的字符串表示形式。

NumberOfSkippedEvents

MySQL连接器跳过的事件数。通常情况下,由于MySQL binlog中的错误或无法解析的事件,事件会被跳过。

NumberOfDisconnects

MySQL连接器断开连接的次数。

NumberOfRolledBackTransactions

回滚且未流化的已处理事务的数量。

NumberOfNotWellFormedTransactions

未符合预期协议的事务数开始+提交/回滚.这个值应该是0在正常情况下。

NumberOfLargeTransactions

未放入前瞻性缓冲区的事务数。为了获得最佳性能,这个值应该显著小于NumberOfCommittedTransactionsNumberOfRolledBackTransactions

架构历史度量

MBean开云体育官方注册网址debezium.mysql: type = connector-metrics、上下文= schema-history服务器=<开云体育电动老虎机 database.server.name >

属性 类型 描述

字符串

之一停止恢复(从存储中恢复历史),运行描述数据库历史记录的状态。开云体育电动老虎机

恢复开始的时间(以epoch秒为单位)。

在恢复阶段读取的更改数。

在恢复和运行时应用的模式更改的总数。

自上次更改从历史存储区恢复以来所经过的毫秒数。

自应用最后一次更改以来所经过的毫秒数。

字符串

从历史存储中恢复的最后一次更改的字符串表示形式。

字符串

最后应用的更改的字符串表示形式。

出现问题时的行为

开云体育官方注册网址Debezium是一个分布式系统,可以捕获多个上游数据库中的所有更改;开云体育电动老虎机它从不错过或丢失任何事件。当系统正常运行或被仔细管理时,Debezium提供开云体育官方注册网址只有一天交付每个变更事件记录。

如果确实发生了故障,则系统不会丢失任何事件。然而,当它从错误中恢复时,它可能会重复一些更改事件。在这些不正常的情况下,Debezium就像Kafka一样提供开云体育官方注册网址了帮助至少一次变更事件的交付。

本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址

配置和启动错误

在以下情况下,连接器在尝试启动时失败,在日志中报告错误或异常,并停止运行:

  • 连接器的配置无效。

  • 连接器无法通过指定的连接参数成功连接到MySQL服务器。

  • 连接器试图在binlog中MySQL不再有可用历史的位置重新启动。

在这些情况下,错误消息有关于问题的详细信息,可能还有建议的解决方案。在纠正配置或解决MySQL问题后,重新启动连接器。

MySQL不可用

如果您的MySQL服务器变得不可用,Debezium MySQL连接器将失败并报错,连接器将开云体育官方注册网址停止。当服务器再次可用时,重新启动连接器。

但是,如果为高可用性MySQL集群启用了gtid,则可以立即重新启动连接器。它将连接到集群中的另一个MySQL服务器,在服务器的binlog中找到代表最后一个事务的位置,并开始从该特定位置读取新服务器的binlog。

如果未启用gtid,则连接器只记录其所连接的MySQL服务器的binlog位置。要从正确的binlog位置重新启动,必须重新连接到特定的服务器。

Kafka Connect优雅地停止

当Kafka Connect优雅地停止时,当Debezium MySQL连接器任务停止并在新的Kafka Connect进程上重新启动时,会开云体育官方注册网址有一个短暂的延迟。

Kafka连接进程崩溃

如果Kafka Connect崩溃,进程将停止,任何Debezium MySQL连接器任务将终止,而不会开云体育官方注册网址记录它们最近处理的偏移量。在分布式模式下,Kafka Connect重启其他进程上的连接器任务。但是,MySQL连接器从前面进程记录的最后一个偏移量开始恢复。这意味着替换任务可能会生成一些在崩溃之前处理过的相同事件,从而创建重复的事件。

每个更改事件消息都包含特定于源的信息,您可以使用这些信息来识别重复的事件,例如:

  • 事件的起源

  • MySQL服务器的事件时间

  • binlog文件名称和位置

  • gtid(如果使用)

Kafka不可用

Kafka Connect框架通过使用Kafka生产者API来记录Kaf开云体育官方注册网址ka中的Debezium变更事件。如果Kafka代理变得不可用,Debezium MySQL连接器将暂停,直到重新建立连接,开云体育官方注册网址并从断开的地方恢复。

MySQL清除binlog文件

如果Debez开云体育官方注册网址ium MySQL连接器停止太长时间,MySQL服务器会清除旧的binlog文件,连接器的最后一个位置可能会丢失。当连接器重新启动时,MySQL服务器不再拥有起点,连接器执行另一个初始快照。如果快照被禁用,连接器将失败并报错。

看到快照有关MySQL连接器如何执行初始快照的详细信息。