您正在查看过时的Debezium版本的文档。开云体育官方注册网址
如果您想查看本页最新的稳定版本,请前往在这里

开云体育官方注册网址Debezium MySQL连接器

MySQL有一个二进制日志(binlog),它按照提交到数据库的顺序记录所有操作。开云体育电动老虎机这包括对表模式和表内数据的更改。MySQL使用binlog进行复制和恢复。

Debe开云体育官方注册网址zium MySQL连接器读取binlog并为行级生成更改事件插入更新,删除在Kafka主题中记录更改事件。客户端应用程序读取这些Kafka主题。

由于MySQL通常设置为在指定的一段时间后清除二进制日志,因此MySQL连接器执行并初始化一致的快照您的每个数据库。开云体育电动老虎机MySQL连接器从创建快照的位置读取binlog。

下面几节提供了关于Debezium MySQL连接器如何工作的更详细信息,指导您如何设置它,以及如何部署和排除连接器故障。开云体育官方注册网址

MySQL连接器如何工作的概述

Debe开云体育官方注册网址zium MySQL连接器跟踪表的结构,执行快照,将binlog事件转换为Debezium更改事件,并记录这些事件在Kafka中记录的位置。

MySQL连接器如何使用数据库模式开云体育电动老虎机

当数据库客户端开云体育电动老虎机查询数据库时,客户端使用数据库的当前模式。但是,数据库模式可以在任何开云体育电动老虎机时候更改,这意味着连接器必须能够识别记录每个插入、更新或删除操作时的模式。此外,连接器不能只使用当前模式,因为连接器可能正在处理相对较旧的事件,并且可能在表的模式更改之前就已经记录了这些事件。

为了处理这个问题,MySQL在binlog中包含了对数据的行级更改和应用到数据库的DDL语句。开云体育电动老虎机当连接器读取binlog并遇到这些DDL语句时,它会解析它们并更新每个表的模式的内存表示。连接器使用这种模式表示在每次插入、更新或删除时识别表的结构,并产生适当的更改事件。在一个单独的数据库历史Kaf开云体育电动老虎机ka主题中,连接器还记录了所有DDL语句以及每个DDL语句在binlog中的出现位置。

当连接器在崩溃或正常停止后重新启动时,连接器开始从特定位置(即从特定时间点)读取binlog。连接器通过读取数据库历史Kafka主题并解析所有DDL语句,直到连接器开始的binlog点,从而重新构建此时存在的表结构。开云体育电动老虎机

此数据库历开云体育电动老虎机史记录主题仅供连接器使用。连接器可以有选择地在针对使用者应用程序的不同主题上生成模式更改事件。这在MySQL连接器如何处理模式更改主题

当MySQL连接器捕获表中的更改时,模式更改工具(如gh-ostpt-online-schema-change则在迁移过程中创建的帮助表需要包含在白名单表中。

如果下游系统不需要临时表生成的消息,那么可以编写并应用一个简单的消息转换来过滤掉它们。

有关主题命名约定的信息,请参见MySQL连接器和Kafka主题

MySQL连接器如何执行数据库快照开云体育电动老虎机

当您的Debeziu开云体育官方注册网址m MySQL连接器第一次启动时,它执行初始化一致的快照你的数据库。开云体育电动老虎机下面的流程描述了快照是如何完成的。

这是默认的快照模式,设置为最初的snapshot.mode财产。有关其他快照模式,请参阅MySQL连接器配置属性
连接器…
一步 行动

1

抓住一个全局读锁那块通过其他数据库客户开云体育电动老虎机端。

快照本身并不阻止其他客户端应用DDL,这可能会干扰连接器读取binlog位置和表模式的尝试。全局读锁被保留,同时binlog位置被读取,然后在后面的步骤中释放。

2

使用可重复读语义来确保事务中的所有后续读取都是针对一致的快照

3.

读取当前binlog的位置。

4

读取连接器配置允许的数据库和表的模式。开云体育电动老虎机

5

释放全局读锁.这现在允许其他数据库客户端写入数据库。开云体育电动老虎机

6

将DDL更改写入模式更改主题,包括所有必要的更改滴……而且创建…DDL语句。

如果适用,会发生这种情况。

7

扫描数据库表并生成开云体育电动老虎机创建事件的相关表特定的Kafka主题的每一行。

8

提交事务。

9

在连接器偏移量中记录完成的快照。

如果连接器失效会发生什么?

时,如果连接器失效、停止或重新平衡初始快照,连接器将在重新启动后创建一个新的快照。一旦开始快照完成后,Debezium MySQL开云体育官方注册网址连接器将从binlog中的相同位置重新启动,因此它不会错过任何更新。

如果连接器停止的时间足够长,MySQL就会清除旧的binlog文件,连接器的位置就会丢失。如果位置丢失,连接器将恢复到初始快照它的起始位置。有关Debezium MySQL连接器故障排除的更多技巧,请参见开云体育官方注册网址MySQL连接器常见问题

如果不允许全局读锁怎么办?

某些环境不允许全局读锁.如果Debez开云体育官方注册网址ium MySQL连接器检测到不允许全局读锁,则连接器使用表级锁,并使用此方法执行快照。

用户必须有LOCK_TABLES特权。
连接器…
一步 行动

1

使用可重复读语义来确保事务中的所有后续读取都是针对一致的快照

2

读取并过滤数据库和表的名称。开云体育电动老虎机

3.

读取当前binlog的位置。

4

读取连接器配置允许的数据库和表的模式。开云体育电动老虎机

5

将DDL更改写入模式更改主题,包括所有必要的更改滴……而且创建…DDL语句。

如果适用,会发生这种情况。

6

扫描数据库表并生成开云体育电动老虎机创建事件的相关表特定的Kafka主题的每一行。

7

提交事务。

8

释放表级锁。

9

在连接器偏移量中记录完成的快照。

MySQL连接器如何处理模式更改主题

您可以配置Debezium开云体育官方注册网址MySQL连接器生成模式更改事件,其中包括应用于MySQL服务器中的数据库的所有DDL语句。开云体育电动老虎机连接器将所有这些事件写入一个名为< serverName >在哪里serverName连接器名称是否与开云体育电动老虎机database.server.name配置属性。

如果你选择使用架构更改事件,使用模式更改主题和使用数据库历史记录主题。开云体育电动老虎机
数据库模式历史中事件的全局顺序是至关重要的。开云体育电动老虎机因此,不能对数据库历史主题进行开云体育电动老虎机分区。这意味着在创建此主题时必须指定分区计数为1。当依赖于自动主题创建时,确保Kafka的num.partitions配置选项(默认分区数)设置为1

架构更改主题结构

写入模式更改主题的每个消息都包含一个消息键,其中包含应用DDL语句时使用的连接数据库的名称:开云体育电动老虎机

{"schema": {"type": "struct", "name": "io. d开云体育官方注册网址ebezum .connector.mysql. schemachangekey ", "optional": false, "fields": [{"开云体育电动老虎机field": "databaseName", "type": "string", "optional": false}]}, "payload": {"databaseName": "inventory"}}

架构更改事件消息值包含一个结构,该结构包括DDL语句、语句应用到的数据库以及语句在binlog中的位置:开云体育电动老虎机

{"模式":{“类型”:“结构”、“名称”:“io.debezium.connector开云体育官方注册网址.mysql.SchemaChangeValue”、“可选”:假的,“字段”:[{“字段”:“数据库名”、“类型”:“弦”、“可选”:假},{”字段”:“d开云体育电动老虎机dl”、“类型”:“弦”、“可选”:假},{”字段”:“源”、“类型”:“结构”、“名称”:“io.debezium.connector.mysql.Source”、“可选”:假的,“字段”:[{“类型”:“弦”、“可选”:真的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“server_id”},{“类型”:“int64”、“可选”:假的,“场”:“ts_sec”},{“类型”:“弦”、“可选”:真的,“场”:“gtid”},{“类型”:“弦”、“可选”:假的,“场”:“文件”},{“类型”:“int64”、“可选”:假的,“场”:“pos”},{“类型”:“int32”、“可选”:假的,“场”:“行”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“int64”、“可选”:真的,“场”:“线程”},{“类型”:"string", "optional": true, "field": "db"}, {"type": "string", "optional": true, "field": "table"}, {"type": "string", "optional": true, "field": "query"}]}]}, "payload": {"databaseNa开云体育电动老虎机me": "inventory", "ddl": "CREATE table products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT);ALTER TABLE products AUTO_INCREMENT = 101;", "source": {"version": "0.10.0. "Beta4”、“名称”:“mysql-server-1”、“server_id”:0,”ts_sec”:0,”gtid”:空,“文件”:“mysql-bin。000003.", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": null, "table": null, "query": null } } }
关于模式更改主题的重要提示

ddlfield可能包含多个DDL语句。中的每个语句都应用于数据库开云体育电动老虎机开云体育电动老虎机数据库名字段,并以与在数据库中应用时相同的顺序显示。开云体育电动老虎机的字段的结构完全是写入特定于表的主题的标准数据更改事件。此字段用于关联不同主题的事件。

...."payload": {"d开云体育电动老虎机atabaseName": "inventory", "ddl": "CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,…“来源”:{....}} ....
如果客户端将DDL语句提交给多个数据库开云体育电动老虎机?
  • 如果MySQL原子地应用这些DDL语句,连接器将按顺序获取DDL语句,按数据库对它们进行分组,并为每个组创建一个模式更改事件。开云体育电动老虎机

  • 如果MySQL分别应用它们,连接器将为每条语句创建一个单独的模式更改事件。

额外的资源

MySQL连接器事件

Debezium MySQL连接器产生的所有数据更改事件都包含一个键和一个值。开云体育官方注册网址更改事件键和更改事件值各包含一个模式和一个有效载荷其中,模式描述有效负载的结构,有效负载包含数据。

MySQL连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这一点很重要,因为任何不是拉丁字母或下划线的字符都被下划线替换,当逻辑服务器名、数据库名和表名包含用这些下划线替换的其他字符时,这可能会导致模式名中发生意外的冲突。开云体育电动老虎机

更改事件键

对于任何给定的表,更改事件的键具有一个结构,该结构包含针对中的每列的字段主键(或唯一约束)在事件创建时。让我们看一个示例表,然后看看表的模式和有效负载是如何显示的。

例表
CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY) AUTO_INCREMENT=1001;
更改事件键的示例
{"schema": {(1)"type": "struct", "name": "mysql-server-1.inventory.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){“字段”:“id”,“类型”:“int32”,“可选”:假}},“有效载荷”:{(5)"id": 1001}}
1 模式控件中的内容有效载荷
2 mysql服务器- 1. inventory.customers.key定义结构的模式的名称在哪里mysql-server-1是连接器名称,库存是数据库,和开云体育电动老虎机客户是桌子。
3. 表示有效载荷不是可选的。
4 属性中期望的字段类型有效载荷
5 有效负载本身,在本例中只包含一个id字段。

对象中的行inventory.customers表,该表从名为mysql-server-1谁的id主键列的值为1001

更改事件值

更改事件值包含一个模式和一个有效负载部分。有三种具有信封结构的更改事件值类型。下面解释了该结构中的字段,并在每个更改事件值示例上进行了标记。

字段名 描述

1

的名字

mysql服务器- 1. inventory.customers.key定义结构的模式的名称在哪里mysql-server-1是连接器名称,库存是数据库和开云体育电动老虎机客户是桌子

2

人事处

一个强制性的描述操作类型的字符串。

  • c=创建

  • u=更新

  • d=删除

  • r= read (初始快照唯一的)

3.

之前

可选字段,指定事件发生之前行的状态。

4

可选字段,指定事件发生后行的状态。

5

一个强制性的字段,描述事件的源元数据,包括:

  • Debe开云体育官方注册网址zium版本

  • 连接器名称

  • 记录事件的binlog名称

  • binlog位置

  • 事件中的行

  • 如果事件是快照的一部分

  • 受影响的数据库和表的名称开云体育电动老虎机

  • 创建事件的MySQL线程id(非快照)

  • MySQL服务器ID(如果可用)

  • 时间戳

如果binlog_rows_query_log_events选项启用,并且连接器具有include.query启用选项,a查询字段显示,其中包含生成该事件的原始SQL语句。

6

ts_ms

一个可选字段,显示连接器处理事件的时间。

该时间基于运行Kafka Connect任务的JVM中的系统时钟。

让我们看一个示例表,然后看看表的模式和有效负载是如何显示的。

例表
CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY) AUTO_INCREMENT=1001;
创建变更事件值

这个例子显示了创建事件。客户表:

{"schema": {(1)"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "mysql-server-1.inventory.customers。值","field": "before"}, {"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "mysql-server-1.inventory.customers。价值", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.product.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "mysql-server-1.inventory.customers.Envelope" }, "payload": {(2)“人事处”:“c”,“ts_ms”:1465491411815,“之前”:空,“后”:{" id ": 1004年,“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{“版本”:“1.1.2。Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 0, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 0, "gtid": null, "file": "mysql-bin "。000003", "pos": 154, "row": 0, "thread": 7, "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"}}}
1 模式事件的一部分价值显示了信封的模式、源结构的模式(特定于MySQL连接器并在所有事件中重用),以及特定于表的模式之前而且字段。

的模式名称之前而且字段的格式为< logicalName >。<表> value,因此完全独立于所有其他表的所有其他模式。这意味着当使用Avro转换器,每个逻辑源中每个表的结果Avro模式都有自己的演变和历史。

2 有效载荷事件的一部分价值显示事件中的信息,即它正在描述创建行的情况(因为op = c),以及字段值包含新插入行的值idfirst_namelast_name,电子邮件列。

事件的JSON表示形式似乎比它们描述的行要大得多。这是因为JSON表示必须包括消息的模式和有效负载部分。然而,通过使用Avro转换器,你可以大幅减少写入Kafka主题的实际消息的大小。

更新变更事件值

的值更新的更改事件。客户表具有与a完全相同的模式创建事件。有效负载的结构相同,但包含不同的值。下面是一个例子(为了便于阅读,进行了格式化):

{"schema":{…}, "payload": {"before": {(1)“id”:1004年,“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“后”:{(2)"id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "source": {(3)“版本”:“1.1.2。Final", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"}, "op": "u",(4)“ts_ms”:1465581029523(5)}}

中的值进行比较插入事件,您可以在有效载荷部分:

1 之前字段现在拥有数据库提交前包含值的行状态。开云体育电动老虎机
2 字段现在具有该行的更新状态,而first_name价值就是现在安妮玛丽.你可以比较之前而且结构来确定由于提交而在该行中实际更改的内容。
3. 字段结构具有与以前相同的字段,但值不同(此事件来自binlog中的不同位置)。的结构显示关于MySQL的这个更改记录的信息(提供可跟踪性)。它还提供了一些信息,可以用来与本主题和其他主题中的其他事件进行比较,以了解该事件是发生在其他事件之前、之后,还是作为同一MySQL提交的一部分发生。
4 人事处字段值现在是u,表示该行因更新而更改。
5 ts_ms字段显示Debezium处理此事件时的时间戳。开云体育官方注册网址

当一行的主键或唯一键的列被更新时,行键的值将被更改,Debezium将输出三个事件开云体育官方注册网址删除事件和墓碑事件用旧键为行,后面跟着插入事件,使用该行的新键。

删除更改事件值

a的值删除的更改事件。客户表的模式与创建而且更新事件。有效负载的结构相同,但包含不同的值。下面是一个例子(为了便于阅读,进行了格式化):

{"schema":{…}, "payload": {"before": {(1)"id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "after": null,(2)“源”:{(3)“版本”:“1.1.2。Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 805, "row": 0, "thread": 7, "query": "DELETE FROM customers WHERE id=1004"}, "op": "d",(4)“ts_ms”:1465581902461(5)}}

比较有效载荷有效载荷的部分创建而且更新事件,你可以看到一些不同:

1 之前字段现在拥有在数据库提交时删除的行状态。开云体育电动老虎机
2 字段是,表示该行已不存在。
3. 字段结构具有许多与以前相同的值,除了ts_sec而且pos字段已更改(在其他场景中文件可能已更改)。
4 人事处字段值现在是d,表示该行已被删除。
5 ts_ms显示Debezium处理此事件时的时间戳。开云体育官方注册网址

此事件为使用者提供了处理删除该行所需的信息。包含旧值是因为一些使用者可能需要它们来正确处理删除。

MySQL连接器的事件是设计来处理的Kafka对数压缩,只要每个键至少保留最近的消息,就可以删除一些旧消息。这允许Kafka回收存储空间,同时确保主题包含完整的数据集,并可用于重新加载基于键的状态。

删除一行时,删除上面列出的event值仍然适用于日志压缩,因为Kafka仍然可以删除所有使用相同键的早期消息。如果消息值为, Kafka知道它可以删除所有具有相同密钥的消息。为了实现这一点,Debezium的MySQL连开云体育官方注册网址接器总是遵循一个删除事件,使用一个特殊的墓碑事件,该事件具有相同的键价值。

MySQL连接器如何映射数据类型

Debe开云体育官方注册网址zium MySQL连接器用事件表示行更改,事件的结构类似于行所在的表。该事件为每个列值包含一个字段。该列的MySQL数据类型决定了该值在事件中的表示方式。

存储字符串的列在MySQL中使用字符集和排序规则定义。MySQL连接器在binlog事件中读取列值的二进制表示时使用列的字符集。下表显示了连接器如何将MySQL数据类型映射到两者文字而且语义类型。

  • 文字类型:如何使用Kafka Connect模式类型来表示值

  • 语义类型: Kafka Connect模式如何捕获字段的含义(模式名)

MySQL类型 文字类型 语义类型

布尔,布尔值

布尔

N/A

位(1)

布尔

N/A

位(> 1)

字节

io.开云体育官方注册网址debezium.data.Bits

长度模式参数包含一个表示比特数的整数。的byte []包含的位低位优先形式,并调整大小以包含指定的位数。
示例(其中n为位)
numBytes = n/8 + (n%8== 0 ?0: 1)

非常小的整数

INT16

N/A

短整型((M))

INT16

N/A

MEDIUMINT ((M))

INT32

N/A

整数,整数((M))

INT32

N/A

长整型数字((M))

INT64

N/A

真正的((M, D))

FLOAT32

N/A

浮动((M, D))

FLOAT64

N/A

双((M, D))

FLOAT64

N/A

CHAR (M))

字符串

N/A

VARCHAR (M))

字符串

N/A

二进制(M))

字节

N/A

VARBINARY (M))

字节

N/A

TINYBLOB

字节

N/A

非常小的文本串

字符串

N/A

字节

N/A

文本

字符串

N/A

MEDIUMBLOB

字节

N/A

简单

字符串

N/A

LONGBLOB

字节

N/A

量变

字符串

N/A

JSON

字符串

io.开云体育官方注册网址debezium.data.Json

的字符串表示形式JSON文档、数组或标量。

枚举

字符串

io.开云体育官方注册网址debezium.data.Enum

允许Schema参数包含以逗号分隔的允许值列表。

字符串

io.开云体育官方注册网址debezium.data.EnumSet

允许Schema参数包含以逗号分隔的允许值列表。

年[(2 | 4)]

INT32

io.开云体育官方注册网址debezium.time.Year

时间戳((M))

字符串

io.开云体育官方注册网址debezium.time.ZonedTimestamp

ISO 8601格式微秒精度。MySQL允许在…的范围内0 - 6

时间值

不包括时间戳的值,MySQL的时态类型取决于time.precision.mode配置属性。为时间戳列的默认值指定为CURRENT_TIMESTAMP现在,值1970-01-01就是是Kafka Connect模式的默认值。

MySQL允许为零值日期,日期时间,时间戳列,因为零值有时比空值更可取。当列定义允许空值时,MySQL连接器将零值表示为空值,当列不允许空值时,则将零值表示为历元日。

没有时区的时间值

DATETIMEType表示本地日期和时间,例如“2018-01-13 09:48:27”。如您所见,这里没有时区信息。这些列将使用UTC根据列的精度转换为历元毫秒或微秒。的时间戳type表示一个没有时区信息的时间戳,MySQL在写入时将服务器(或会话的)当前时区转换为UTC,在读取值时则相反。例如:

  • DATETIME值为2018-06-20 06:37:03就变成了1529476623000

  • 时间戳值为2018-06-20 06:37:03就变成了2018 - 06 - 20 - t13:37:03z

这些列被转换为等效的列io.开云体育官方注册网址debezium.time.ZonedTimestamp基于服务器(或会话)当前时区的UTC格式。默认从服务器上查询时区。方法显式指定开云体育电动老虎机database.serverTimezone连接器配置属性。例如,如果数据库的时区(全局的或为连接器开云体育电动老虎机配置的)开云体育电动老虎机database.serverTimezoneproperty)表示“America/Los_Angeles”,则TIMESTAMP值“2018-06-20 06:37:03”用ZonedTimestamp值为“2018-06-20T13:37:03Z”。

注意,运行Kafka Connect和Debezium的JVM的时区不会影响这些转换。开云体育官方注册网址

的文档中有有关术语值的属性的详细信息MySQL连接器配置属性

time.precision.mode = adaptive_time_microseconds(默认)

MySQL连接器根据列的数据类型定义确定文字类型和语义类型,以便事件准确地表示数据库中的值。开云体育电动老虎机所有时间字段都以微秒为单位。只有积极的时间范围内的字段值00:00:00.00000023:59:59.999999可以正确捕获。

MySQL类型 文字类型 语义类型

日期

INT32

io.开云体育官方注册网址debezium.time.Date

表示自epoch以来的天数。

((M))

INT64

io.开云体育官方注册网址debezium.time.MicroTime

表示以微秒为单位的时间值,不包括时区信息。MySQL允许在…的范围内0 - 6

Datetime, Datetime (0), Datetime (1), Datetime (2), Datetime (3)

INT64

io.开云体育官方注册网址debezium.time.Timestamp

表示过去纪元的毫秒数,不包括时区信息。

Datetime (4), Datetime (5), Datetime (6)

INT64

io.开云体育官方注册网址debezium.time.MicroTimestamp

表示经过epoch的微秒数,不包括时区信息。
time.precision.mode =连接

MySQL连接器使用预定义的Kafka Connect逻辑类型。此方法不如默认方法精确,如果数据库列具有开云体育电动老虎机分数秒精度值大于3..范围内的值00:00:00.00023:59:59.999可以处理。集time.precision.mode =连接只有你能保证时间表中的值永远不会超过支持的范围。的连接设置有望在Debezium的未来版本中被删除。开云体育官方注册网址

MySQL类型 文字类型 语义类型

日期

INT32

org.apache.kafka.connect.data.Date

表示自epoch以来的天数。

((M))

INT64

org.apache.kafka.connect.data.Time

表示自午夜以来的时间值(以微秒为单位),不包括时区信息。

DATETIME ((M))

INT64

org.apache.kafka.connect.data.Timestamp

表示自epoch以来的毫秒数,不包括时区信息。

==十进制值

方法处理小数decimal.handling.mode财产。

看到MySQL连接器配置属性欲知详情。
decimal.handling.mode =精确
MySQL类型 文字类型 语义类型

数字((M [D]))

字节

org.apache.kafka.connect.data.Decimal

规模模式参数包含一个整数,表示小数点移位了多少位。

小数((M [D]))

字节

org.apache.kafka.connect.data.Decimal

规模模式参数包含一个整数,表示小数点移位了多少位。
decimal.handling.mode =双
MySQL类型 文字类型 语义类型

数字((M [D]))

FLOAT64

N/A

小数((M [D]))

FLOAT64

N/A

decimal.handling.mode =字符串
MySQL类型 文字类型 语义类型

数字((M [D]))

字符串

N/A

小数((M [D]))

字符串

N/A

布尔值

MySQL处理布尔以特定的方式进行内部价值评估。的布尔列在内部映射到非常小的整数(1)数据类型。当在流期间创建表时,它使用proper布尔当Debezium接收开云体育官方注册网址到原始DDL时映射。在快照期间Debezium执行开云体育官方注册网址显示创建表获取返回的表定义非常小的整数(1)对于这两个布尔而且非常小的整数(1)列。

开云体育官方注册网址Debezium则没有办法获得原始类型映射并将其映射到非常小的整数(1).操作员可以配置开箱即用自定义转换器TinyIntOneToBooleanConverter这将映射所有非常小的整数(1)布尔或者,如果选择器参数,则可以使用逗号分隔的正则表达式枚举列的子集。

一个示例配置是

converters=boolean boolean.type=io.开云体育官方注册网址debezium.connector.mysql.converters.TinyIntOneToBooleanConverter boolean.selector=db1.table1。*, db1.table2.column1

空间数据类型

目前,Debezium MyS开云体育官方注册网址QL连接器支持以下空间数据类型:

MySQL类型 文字类型 语义类型

几何,线串,多边形,多点,多线串,多多边形,几何集合

结构体

io.开云体育官方注册网址debezium.data.geometry.Geometry

包含两个字段的结构:
  • srid (INT32:空间引用系统id,定义存储在结构中的几何对象类型

  • wkb(字节):以wkb格式编码的几何对象的二进制表示形式。看到开放地理空间联盟欲知详情。

MySQL连接器和Kafka主题

Debe开云体育官方注册网址zium MySQL连接器为所有人写入事件插入更新,删除从一个表到一个Kafka主题的操作。Kafka主题命名约定如下:

格式
serverName.开云体育电动老虎机databaseName.tableName
例1。例子

我们假设实现服务器名和库存是包含的数据库开云体育电动老虎机吗三个表的订单客户,产品.Debe开云体育官方注册网址zium MySQL连接器在三个Kafka主题上产生事件,分别对应数据库中的每个表:开云体育电动老虎机

履行。库存。订单履行。库存。客户履行。库存。产品

MySQL支持的拓扑

Debe开云体育官方注册网址zium MySQL连接器支持以下MySQL拓扑:

独立的

当使用单个MySQL服务器时,服务器必须启用binlog (并可选地启用gtid),这样Debez开云体育官方注册网址ium MySQL连接器就可以监视服务器。这通常是可以接受的,因为二进制日志也可以用作增量日志备份.在这种情况下,MySQL连接器总是连接并跟随这个独立的MySQL服务器实例。

主人和奴隶

Debe开云体育官方注册网址zium MySQL连接器可以跟随一个主服务器或一个从服务器(如果该slave已启用binlog),但是连接器只能看到对该服务器可见的集群中的更改。通常,除了多主机拓扑,这不是问题。

连接器记录其在服务器binlog中的位置,该位置在集群中的每个服务器上都是不同的。因此,连接器只需要跟随一个MySQL服务器实例。如果该服务器发生故障,则必须重新启动或恢复服务器,然后连接器才能继续。

高可用集群

各种各样的高可用性MySQL的解决方案是存在的,它们使它更容易容忍和几乎立即从问题和失败中恢复。大多数HA MySQL集群使用gtid,以便从服务器能够跟踪任何主服务器上的所有更改。

多主机

一个多主MySQL拓扑使用一个或多个MySQL从服务器,每个从服务器从多个主服务器复制。这是聚合多个MySQL集群复制的强大方法,需要使用gtid。

Debe开云体育官方注册网址zium MySQL连接器可以使用这些多主MySQL slave作为源,并且可以故障转移到不同的多主MySQL slave,只要新slave赶上旧slave (例如,新的slave拥有最后一次在第一个slave上看到的所有事务).即使连接器只使用数据库和/或表的一个子集,这也可以工作,因为当试图重新连接到一个新的多主MySQL从服务器并在binl开云体育电动老虎机og中找到正确的位置时,连接器可以配置为包括或排除特定的GTID源。

主持

Debezium MySQL连接器支持使用托管选项开云体育官方注册网址,如Amazon RDS和Amazon Aurora。

因为这些托管选项不允许全局读锁,表级锁用于创建一致的快照

设置MySQL服务器

为Debezium创建MySQL用户开云体育官方注册网址

您必须定义一个对Debezium MySQL连接器监视的所有数据库具有适当权限的MySQL用户。开云体育官方注册网址开云体育电动老虎机

先决条件
  • 你必须有一个MySQL服务器。

  • 您必须了解基本的SQL命令。

过程
  1. 创建MySQL用户:

mysql> CREATE USER ' USER '@'localhost' IDENTIFIED BY 'password'
  1. 授予用户所需的权限:

mysql>授权选择,重新加载,显示数据库,复制从,复制客户端上*。开云体育电动老虎机*以“密码”识别的“用户”;
看到权限的解释对于每个权限的注释。
如果使用托管选项,如Amazon RDS或Amazon Aurora,则不允许全局读锁,表级锁用于创建一致的快照.在这种情况下,您还需要授予LOCK_TABLES您所创建的用户的权限。看到MySQL连接器如何工作的概述欲知详情。
  1. 确定用户权限:

mysql> FLUSH PRIVILEGES;

权限的解释

许可/项目 描述

选择

允许连接器从数据库中的表中选择行开云体育电动老虎机

仅在执行快照时使用。

重新加载

使连接器能够使用冲洗语句来清除或重新加载内部缓存、刷新表或获取锁。

仅在执行快照时使用。

显示数据库开云体育电动老虎机

命令使连接器可以查看数据库名称开云体育电动老虎机显示数据库开云体育电动老虎机声明。

仅在执行快照时使用。

复制的奴隶

允许连接器连接和读取MySQL服务器的binlog。

复制客户端

允许连接器使用以下语句:

  • 显示主机状态

  • 显示奴隶状态

  • 显示二进制日志

这对于连接器总是必需的。

标识开云体育电动老虎机权限应用的对象。

“用户”

指定了用户被授予权限的对象。

通过“密码”识别

指定了密码对于用户。

为Debezium启用MySQL binlog开云体育官方注册网址

MySQL复制必须启用二进制日志记录。二进制日志记录用于复制工具传播更改的事务更新。

先决条件
  • 你必须有一个MySQL服务器。

  • 你应该有适当的MySQL用户权限。

过程
  1. 检查是否log-bin选项已开启或未开启。

mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin)::" FROM information_schemaWHERE variable_name='log_bin';
  1. 如果,配置你的MySQL服务器配置文件如下:

看到Binlog配置属性每个财产的注释。
Server-id = 223344(1)Log_bin = mysql-bin(2)binlog_format = ROW(3)binlog_row_image = FULL(4)Expire_logs_days = 10(5)
  1. 再次检查binlog状态,确认您的更改。

mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin)::" FROM information_schemaWHERE variable_name='log_bin';

Binlog配置属性

数量 财产 描述

1

服务器id

的值。服务器idMySQL集群中的每个服务器和复制客户端必须是唯一的。当安装MySQL连接器时,我们为连接器分配一个唯一的服务器ID。

2

log_bin

的价值log_binbinlog文件序列的基名称。

3.

binlog_format

binlog-format必须设置为

4

binlog_row_image

binlog_row_image必须设置为完整的完整的

5

expire_logs_days

这是自动删除binlog文件的天数。默认为0这意味着不会自动移除。

将该值设置为与您的环境需求相匹配。

为Debezium启用MySQL全局事务标识符开云体育官方注册网址

全局事务标识符(gtid)唯一标识集群内服务器上发生的事务。虽然Debezium MySQL连接器不需要使用gtid开云体育官方注册网址,但使用gtid简化了复制,并允许您更容易地确认主服务器和从服务器是否一致。

gtid仅在MySQL 5.6.5及更高版本中可用。看到MySQL文档欲知详情。
先决条件
  • 你必须有一个MySQL服务器。

  • 您必须了解基本的SQL命令。

  • 您必须能够访问MySQL配置文件。

过程
  1. 启用gtid_mode

mysql > gtid_mode =
  1. 启用enforce_gtid_consistency

mysql > enforce_gtid_consistency =
  1. 确认更改:

mysql>显示全局变量“%GTID%”;
响应
+--------------------------+-------+ | Variable_name |值  | +--------------------------+-------+ | 上enforce_gtid_consistency | | | gtid_mode |  | +--------------------------+-------+

选项解释

许可/项目 描述

gtid_mode

布尔值,用于指定MySQL服务器是否启用GTID模式。

  • =启用

  • =禁用

enforce_gtid_consistency

布尔值,指示服务器是否通过允许以事务安全方式登录的语句执行来强制GTID一致性;使用gtid时必需的。

  • =启用

  • =禁用

为Debezium设置会话超时开云体育官方注册网址

当为大型数据库创建初始一致快照时,您所建立的连接可能会在读取表时超时。开云体育电动老虎机可以通过配置来防止这种行为interactive_timeout而且wait_timeout在MySQL配置文件中。

先决条件
  • 你必须有一个MySQL服务器。

  • 您必须了解基本的SQL命令。

  • 您必须能够访问MySQL配置文件。

过程
  1. 配置interactive_timeout

mysql > interactive_timeout = < duration-in-seconds >
  1. 配置wait_timeout

> . Mysql > wait_timeout= 
           

选项解释

许可/项目 描述

interactive_timeout

服务器在关闭交互连接之前等待交互连接活动的秒数。

看到MySQL的文档欲知详情。

wait_timeout

服务器在关闭非交互式连接之前等待其活动的秒数。

看到MySQL的文档欲知详情。

为Debezium启用查询日志事件开云体育官方注册网址

你可能想看看原版SQL语句用于每个binlog事件。使binlog_rows_query_log_eventsMySQL配置文件中的选项允许你这样做。

此选项仅在MySQL 5.6及更高版本中可用。
先决条件
  • 你必须有一个MySQL服务器。

  • 您必须了解基本的SQL命令。

  • 您必须能够访问MySQL配置文件。

过程
  1. 启用binlog_rows_query_log_events

mysql > binlog_rows_query_log_events =

选项解释

许可/项目 描述

binlog_rows_query_log_events”

布尔值,启用/禁用对包含原始数据的支持SQL语句。

  • =启用

  • =禁用

部署MySQL连接器

安装MySQL连接器

安装Debezium MySQ开云体育官方注册网址L连接器是一个简单的过程,你只需要下载JAR,将其解压缩到Kafka Connect环境中,并确保在Kafka Connect环境中指定了插件的父目录。

先决条件
过程
  1. 下载Debezium开云体育官方注册网址MySQL连接器插件

  2. 将文件解压缩到Kafka Connect环境中。

  3. 将插件的父目录添加到Kafka Connect中plugin.path

plugin.path = /卡夫卡/连接
上面的示例假设您已经将Debezium MySQL连接器提取到开云体育官方注册网址卡夫卡/ / debezium开云体育官方注册网址-connector-mysql连接路径。
  1. 重新启动Kafka Connect进程。这确保了拾取新的jar。

配置MySQL连接器

通常情况下,您可以在开云体育官方注册网址. json文件中使用连接器可用的配置属性。

先决条件
过程
  1. 设置“名称”中的连接器的. json文件。

  2. 设置Debezium MySQL连接器所需的配置属性。开云体育官方注册网址

有关配置属性的完整列表,请参见MySQL连接器配置属性
MySQL连接器示例配置
{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.mysql. mysqlconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“3306”,(4)“开云体育电动老虎机数据库。用户": "debezium-user",(5)“开云体育电动老虎机数据库。密码": "debezium-user-pw",(6)“开云体育电动老虎机database.server。id”:“184054”,(7):开云体育电动老虎机“database.server.name fullfillment”,(8)“开云体育电动老虎机数据库。白名单”:“库存”,(9)“开云体育电动老虎机database.history.kafka.bootstrap。服务器”:“卡夫卡:9092”,(10)“开云体育电动老虎机database.history.kafka。来pic": "dbhistory.fullfillment",(11)“include.schema。变化”:“真正的”(12)}}

示例配置属性说明

  1. 在Kafka Connect服务中注册时的连接器名称。

  2. 连接器的类名。

  3. MySQL服务器地址。

  4. MySQL服务器端口号。

  5. 具有适当权限的MySQL用户。

  6. MySQL用户的密码。

  7. 连接器的唯一ID。

  8. MySQL服务器或集群的逻辑名称。

  9. 由指定服务器托管的所有数据库开云体育电动老虎机的列表。

  10. 连接器用来将DDL语句写入并恢复到数据库历史主题的Kafka代理列表。开云体育电动老虎机

  11. 数据库历史记录主题的名称。开云体育电动老虎机

  12. 指定连接器是否应在命名的模式更改主题上生成的标志实现具有DDL更改的事件,可以供使用者使用。

MySQL连接器配置属性

这里列出的配置属性是要求运行Debezium 开云体育官方注册网址MySQL连接器。也有高级MySQL连接器属性它们的默认值很少需要更改,因此不需要在连接器配置中指定。

Debe开云体育官方注册网址zium MySQL连接器支持直通在创建Kafka生产者和消费者时配置。看到关于传递属性的信息在本节的末尾,还可以看到卡夫卡文档如欲了解更多有关直通属性。
财产 默认的 描述

的名字

连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。)

connector.class

连接器的Java类的名称。始终使用值io.开云体育官方注册网址debezium.connector.mysql.MySqlConnectorMySQL连接器。

tasks.max

1

应该为此连接器创建的最大任务数。MySQL连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。

开云体育电动老虎机database.hostname

MySQL数据库服务器的IP地址或主机名。开云体育电动老虎机

开云体育电动老虎机database.port

3306

MySQL数据库服务器的整型端口号。开云体育电动老虎机

开云体育电动老虎机database.user

连接MySQL数据库服务器时使用的M开云体育电动老虎机ySQL数据库名称。

开云体育电动老虎机database.password

连接MySQL数据库服务器时使用的密码。开云体育电动老虎机

开云体育电动老虎机database.server.name

逻辑名称,用于标识并提供被监视的特定MySQL数据库服务器/集群的名称空间。开云体育电动老虎机逻辑名在所有其他连接器中应该是唯一的,因为它被用作从该连接器发出的所有Kafka主题名的前缀。只能使用字母数字字符和下划线。

开云体育电动老虎机database.server.id

随机

这个数据库客户端的数字ID,在MySQL集开云体育电动老虎机群中所有当前运行的数据库进程中必须是唯一的。这个连接器作为另一个服务器(使用这个唯一的ID)加入MySQL开云体育电动老虎机数据库集群,因此它可以读取binlog。默认情况下,生成一个介于5400和6400之间的随机数,尽管我们建议设置一个显式值。

开云体育电动老虎机database.history.kafka.topic

Kafka主题的全称,连接器将在其中存储数据库模式历史。开云体育电动老虎机

开云体育电动老虎机database.history.kafka.bootstrap.servers

一个主机/端口对列表,连接器将使用它建立到Kafka集群的初始连接。此连接将用于检索以前由连接器存储的数据库模式历史,并用于写入从源数据库读取的每个DDL语开云体育电动老虎机句。这应该指向Kafka Connect进程使用的相同的Kafka集群。

开云体育电动老虎机database.whitelist

空字符串

一个可选的逗号分隔的正则表达式列表,匹配要监控的数据库名称;开云体育电动老虎机任何未包开云体育电动老虎机含在白名单中的数据库名称都将被排除在监视之外。默认情况下,将监视所有数据库。开云体育电动老虎机不得与开云体育电动老虎机database.blacklist

开云体育电动老虎机database.blacklist

空字符串

可选的逗号分隔的正则表达式列表,匹配要排除在监视之外的数据库名称;开云体育电动老虎机任何未包开云体育电动老虎机含在黑名单中的数据库名称都将被监控。不得与开云体育电动老虎机database.whitelist

table.whitelist

空字符串

一个可选的逗号分隔的正则表达式列表,它匹配要监控的表的完全限定表标识符;任何不在白名单中的表都将被排除在监控之外。每个标识符都是这样的开云体育电动老虎机数据库名的表.默认情况下,连接器将监视每个被监视数据库中的每个非系统表。开云体育电动老虎机不得与table.blacklist

table.blacklist

空字符串

一个可选的逗号分隔的正则表达式列表,它与要排除在监视之外的表的完全限定表标识符匹配;任何不在黑名单中的表都将被监控。每个标识符都是这样的开云体育电动老虎机数据库名的表.不得与table.whitelist

column.blacklist

空字符串

一个以逗号分隔的可选正则表达式列表,该列表与应从更改事件消息值中排除的列的完全限定名称匹配。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName,或开云体育电动老虎机数据库名schemaName的表columnName

column.truncate.to。长度.chars

N/A

一个以逗号分隔的可选正则表达式列表,它与基于字符的列的完全限定名匹配,如果字段值长于指定的字符数,则这些列的值应在更改事件消息值中被截断。可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName

column.mask.with。长度.chars

N/A

一个以逗号分隔的可选正则表达式列表,该列表与基于字符的列的完全限定名称匹配,这些列的值应在更改事件消息值中被替换为由指定数量的星号()字符。可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数或零。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName

column.propagate.source.type

N/A

一个以逗号分隔的可选正则表达式列表,它与列的完全限定名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模式中。模式参数__开云体育官方注册网址Debezium.source.column.type__开云体育官方注册网址Debezium.source.column.length而且_开云体育官方注册网址Debezium.source.column.scale将分别用于传播原始类型名称和长度(对于变宽类型)。对于接收器数据库中相应列的适当大小很有用。开云体育电动老虎机列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName,或开云体育电动老虎机数据库名schemaName的表columnName

datatype.propagate.source.type

N/A

一个以逗号分隔的可选正则表达式列表,它与特定于数据库的数据类型名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模开云体育电动老虎机式中。模式参数__开云体育官方注册网址debezium.source.column.type__开云体育官方注册网址debezium.source.column.length而且__开云体育官方注册网址debezium.source.column.scale将分别用于传播原始类型名称和长度(对于变宽类型)。对于接收器数据库中相应列的适当大小很有用。开云体育电动老虎机完全限定数据类型名称的格式为开云体育电动老虎机数据库名的表typeName,或开云体育电动老虎机数据库名schemaName的表typeName.看到MySQL连接器如何映射数据类型查看mysql特定的数据类型名称列表。

time.precision.mode

adaptive_time_microseconds

时间、日期和时间戳可以用不同的精度表示,包括:adaptive_time_microseconds(默认值)根据数据库列的类型,使用毫秒、微秒或纳秒精度值捕获数据库中的日期、datetime和时间戳值,TIME类型字段除外,它总是以微秒为单位捕获;开云体育电动老虎机自适应(已弃用)根据数据库列的类型,使用毫秒、微秒或纳秒精度值捕获与数据库中完全相同的时间和时间戳值;开云体育电动老虎机或连接总是使用Kafka Connect内置的时间、日期和时间戳表示来表示时间和时间戳值,无论数据库列的精度如何,它都使用毫秒精度。开云体育电动老虎机

decimal.handling.mode

精确的

指定连接器应如何处理的值小数而且数字列:精确的(默认值)精确地表示它们使用java.math.BigDecimal变更事件中以二进制形式表示的值;或表示它们使用值,这可能会导致精度的损失,但将更容易使用。字符串选项将值编码为格式化的字符串,这很容易使用,但关于真实类型的语义信息会丢失。

bigint.unsigned.handling.mode

指定BIGINT UNSIGNED列应该如何在更改事件中表示,包括:精确的使用java.math.BigDecimal来表示值,这些值使用二进制表示和Kafka Connect的方式编码在更改事件中org.apache.kafka.connect.data.Decimal类型;(默认值)表示使用Java的值这种方法可能无法提供精准度,但消费者使用起来要容易得多。通常是更可取的设置。仅当处理大于2^63的值时,精确的设置应该使用,因为这些值不能使用

include.schema.changes

真正的

布尔值,指定连接器是否应该将数据库模式中的更改发布到与数据库服务器ID同名的Kafka主题。开云体育电动老虎机每个模式更改将使用一个包含数据库名称的键记录,该键的值包括DDL语句。开云体育电动老虎机这与连接器内部记录数据库历史的方式无关。开云体育电动老虎机默认为真正的

include.query

布尔值,该值指定连接器是否应包括生成更改事件的原始SQL查询。
注意:该选项要求MySQL将binlog_rows_query_log_events选项设置为ON。查询将不会显示快照进程生成的事件。
警告:启用此选项可能会在更改事件中包含原始SQL语句,从而显式地暴露被列入黑名单或屏蔽的表或字段。因此,该选项默认为“false”。

event.processing.failure.handling.mode

失败

指定连接器在反序列化binlog事件期间应对异常的方式。失败将传播异常(指示有问题的事件及其binlog偏移量),导致连接器停止。
警告将导致跳过有问题的事件,并记录有问题的事件及其binlog偏移量。
跳过会引起问题的事件将被跳过。

inconsistent.schema.handling.mode

失败

指定连接器应该如何响应与表相关的binlog事件,这些事件不存在于内部模式表示中(即内部表示与数据库不一致)开云体育电动老虎机失败将抛出异常(指示有问题的事件及其binlog偏移量),导致连接器停止。
警告将导致跳过有问题的事件,并记录有问题的事件及其binlog偏移量。
跳过将导致跳过有问题的事件。

max.queue.size

8192

正整数值,指定阻塞队列的最大大小,从数据库日志中读取的更改事件在写入Kafka之前被放置在其中。开云体育电动老虎机例如,当写入Kafka较慢或Kafka不可用时,该队列可以为binlog阅读器提供反压力。出现在队列中的事件不包括在此连接器定期记录的偏移量中。属性中指定的最大批处理大小,默认值为8192max.batch.size财产。

max.batch.size

2048

正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。

poll.interval.ms

1000

正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。

connect.timeout.ms

30000

一个正整数值,指定连接器在尝试连接到MySQL数据库服务器后超时前应该等待的最大时间(以毫秒为单位)。开云体育电动老虎机默认为30秒。

gtid.source.includes

一个用逗号分隔的正则表达式列表,它匹配GTID集中的源uuid,用于查找MySQL服务器中的binlog位置。只有源匹配其中一个包含模式的GTID范围才会被使用。不得与gtid.source.excludes

gtid.source.excludes

一个用逗号分隔的正则表达式列表,它匹配GTID集中的源uuid,用于查找MySQL服务器中的binlog位置。只有源不匹配这些排除模式的GTID范围才会被使用。不得与gtid.source.includes

gtid.new.channel.position
已弃用并计划删除

最早的

当设置为最新的,当连接器看到一个新的GTID通道时,它将从该GTID通道中最后执行的事务开始消费。如果设置为最早的(默认),连接器从第一个可用的(未清除的)GTID位置开始读取该通道。最早的当你有一个主-被动的MySQL设置,其中Debezium连接到主,在这种情况下,在故障转移期间,具有新UUID(和GT开云体育官方注册网址ID通道)的从服务器在Debezium连接之前开始接收写。这些写操作在使用时会丢失最新的

tombstones.on.delete

真正的

控制是否在删除事件之后生成墓碑事件。
真正的删除操作由删除事件和后续的墓碑事件表示。当只发送一个删除事件。
触发墓碑事件(默认行为)允许Kafka在源记录被删除后完全删除与给定键相关的所有事件。

message.key.columns

空字符串

与完全限定的表和列匹配以映射主键的正则表达式的分号列表。
每个项(正则表达式)必须匹配<完全限定表>:<逗号分隔的列列表>表示自定义键。
完全限定表可以定义为开云体育电动老虎机数据库名的表

高级MySQL连接器属性

下表描述了高级MySQL连接器属性

财产 默认的 描述

connect.keep.alive

真正的

一个布尔值,指定是否应该使用一个单独的线程来确保与MySQL服务器/集群的连接保持活跃。

table.ignore.builtin

真正的

布尔值,指定是否应忽略内置系统表。无论表是白名单还是黑名单,这都适用。默认情况下,系统表被排除在监视之外,当对任何系统表进行更改时,不会生成任何事件。

开云体育电动老虎机database.history.kafka.recovery.poll.interval.ms

One hundred.

一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认值是100ms。

开云体育电动老虎机database.history.kafka.recovery.attempts

4

在连接器恢复失败并发生错误之前,连接器应尝试读取持久历史数据的最大次数。接收不到数据后等待的最大时间为recovery.attemptsxrecovery.poll.interval.ms

开云体育电动老虎机database.history.skip.unparseable.ddl

布尔值,指定连接器是否应忽略格式错误或未知的数据库语句,或停止处理并让操作员修复问题。开云体育电动老虎机安全默认值为.在处理binlog时,应该小心使用跳过,因为它可能导致数据丢失或损坏。

开云体育电动老虎机database.history.store.only.monitored.tables.ddl

布尔值,指定连接器是否应该记录所有DDL语句或(当真正的)只有那些与Debezium监视的表相关的(通过过滤器配置)。开云体育官方注册网址安全默认值为.使用此特性时应格外小心,因为更改过滤器时可能需要丢失数据。

开云体育电动老虎机database.ssl.mode

禁用

指定是否使用加密连接。默认为禁用,并指定使用未加密的连接。

首选选项将在服务器支持安全连接时建立加密连接,否则将退回到未加密连接。

要求选项建立加密连接,但如果由于任何原因无法建立连接,则会失败。

verify_ca选项表现为要求但是另外,它会根据配置的证书颁发机构(CA)证书验证服务器TLS证书,如果它不匹配任何有效的CA证书,则会失败。

verify_identity选项表现为verify_ca但是还要验证服务器证书是否与远程连接的主机相匹配。

binlog.buffer.size

0

binlog读取器使用的预读缓冲区的大小。
在特定的条件下,MySQL binlog可能包含未提交的数据回滚声明。典型的例子是在单个事务中使用保存点或混合临时和常规的表更改。
当检测到事务的开始时,Debezium尝试前滚binlog位置并找到其中任何一个开云体育官方注册网址提交回滚因此,它可以决定是否将来自事务的更改流化。缓冲区的大小定义了Debezium在搜索事务边界时可以缓冲的事务更改的最大数量。开云体育官方注册网址如果事务的大小大于缓冲区,则Debezium需要在流处理时倒带并重新读取缓冲区中不适合的事件。开云体育官方注册网址价值0禁用缓冲。
默认禁用。
注意:这个特性应该被认为是一个正在孵化的特性。我们需要客户的反馈,但预计它不是完全抛光。

snapshot.mode

最初的

指定在连接器启动时运行快照的条件。默认为最初的,并指定只有在没有为逻辑服务器名记录偏移量时连接器才能运行快照。的when_needed选项指定连接器在启动时运行一个快照,只要它认为有必要(当没有可用的偏移量,或者当以前记录的偏移量指定一个binlog位置或GTID在服务器中不可用时)。的从来没有选项指定连接永远不应该使用快照,并且在第一次启动时使用逻辑服务器名,连接器应该从binlog的开头读取;这应该小心使用,因为只有当binlog保证包含数据库的整个历史时,它才有效。开云体育电动老虎机如果不需要主题包含数据的一致快照,而只需要它们具有自连接器启动以来的更改,则可以使用schema_only选项,其中连接器仅快照模式(而不是数据)。

schema_only_recovery是现有连接器的恢复选项,用于恢复损坏或丢失的数据库历史主题,或定期“清理”可能意外增长的数据库历史主题(需要无限保留)。开云体育电动老虎机

snapshot.locking.mode

最小的

控制连接器在执行快照时是否保持全局MySQL读锁(防止对数据库的任何更新)以及保持多长时间。开云体育电动老虎机有三个可能的值最小的扩展,没有一个

最小的当连接器读取数据库模式和其他元数据时,连接器仅为快照的初始部分持有全局读锁。开云体育电动老虎机快照中的其余工作包括从每个表中选择所有行,即使不再持有全局读锁并且其他MySQL客户端正在更新数据库,也可以使用REPEATABLE READ事务以一致的方式完成这一工作。开云体育电动老虎机

扩展在某些情况下,客户端正在提交MySQL从REPEATABLE READ语义中排除的操作,可能需要在快照的整个持续时间内阻塞所有写操作。对于这些情况,请使用此选项。

没有一个将防止连接器在快照过程中获取任何表锁。此值可用于所有快照模式,但如果和使用该值是安全的只有如果在拍摄快照时没有发生模式更改。注意,对于用MyISAM引擎定义的表,尽管设置了这个属性,当MyISAM获取表锁时,表仍然是锁定的。这种行为与InnoDB引擎不同,InnoDB引擎需要行级锁。

snapshot.select.statement.overrides

控制快照中将包括表中的哪些行。
此属性包含以逗号分隔的全限定表列表(DB_NAME.TABLE_NAME).在进一步的配置属性中为各个表指定选择语句,每个表一个,由id标识snapshot.select.statement.overrides。[DB_NAME] [TABLE_NAME]。.这些属性的值是在快照期间从特定表检索数据时使用的SELECT语句。对于大型只能追加的表,一个可能的用例是设置一个特定的开始(恢复)快照的点,以防之前的快照被中断。
请注意:仅对快照有影响。从binlog捕获的事件完全不受它的影响。

min.row.count.to.stream.results

1000

在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行产生读取事件。这个参数决定MySQL连接是否将一个表的所有结果拉入内存(这很快,但需要大量内存),还是将结果流化(可能较慢,但适用于非常大的表)。该值指定在连接器将结果流传输之前表必须包含的最小行数,默认为1,000。将此参数设置为“0”将跳过所有表大小检查,并在快照期间始终流所有结果。

heartbeat.interval.ms

0

控制心跳消息的发送频率。
此属性包含以毫秒为单位的间隔,该间隔定义连接器向心跳主题发送心跳消息的频率。将此参数设置为0完全不发送心跳消息。
默认禁用。

heartbeat.topics.prefix

__开云体育官方注册网址debezium-heartbeat

控制要向其发送心跳消息的主题的命名。
主题根据模式命名< heartbeat.topics.prefix >。< server.name >

开云体育电动老虎机database.initial.statements

当建立到数据库的JDBC连接(不是事务日志读取连接)时,将执行的SQL语句的分号列表。开云体育电动老虎机使用双分号(';;')将分号用作字符而不是分隔符。
注意:连接器可以根据自己的判断建立JDBC连接,因此这通常只用于配置会话参数,而不是用于执行DML语句。

snapshot.delay.ms

连接器启动后在快照之前应该等待的间隔(以毫秒为单位);
可用于在启动集群中的多个连接器时避免快照中断,这可能导致连接器的重新平衡。

snapshot.fetch.size

指定在进行快照时应一次性从每个表读取的最大行数。连接器将以这个大小的多个批次读取表内容。

snapshot.lock.timeout.ms

10000

正整数值,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果在此时间间隔内无法获取表锁,则快照将失败。看到MySQL连接器如何执行数据库快照开云体育电动老虎机

enable.time.adjuster

MySQL允许用户插入2位或4位的年份值。如果是两位数字,值将自动映射到1970 - 2069范围。这通常由数据库完成。开云体育电动老虎机
设置为真正的(默认值)当Debezium应该做转换开云体育官方注册网址。
设置为当转换完全委托给数据库时。开云体育电动老虎机

source.struct.version

v2

的架构版本Debezium事开云体育官方注册网址件中的块;开云体育官方注册网址Debezium 0.10引入了一些突破
的结构的更改块,以便在所有连接器之间统一暴露的结构。
通过将此选项设置为v1可以生成早期版本中使用的结构。注意,不建议使用这种设置,并计划在未来的Debezium版本中删除。开云体育官方注册网址

sanitize.field.names

真正的连接器配置显式指定key.convertervalue.converter参数使用Avro,否则默认为

是否对字段名进行消毒以符合Avro命名要求。

直通配置属性

MySQL连接器也支持直通配置属性在创建Kafka生产者和消费者时使用。属性开头的所有连接器配置属性开云体育电动老虎机database.history.producer。在创建写入数据库历史的Kafka生成器时使用前缀(不带前缀)。开云体育电动老虎机所有以前缀开头的属性开云体育电动老虎机database.history.consumer。在创建Kafka消费者时使用(没有前缀),在连接器启动时读取数据库历史。开云体育电动老虎机

例如,下面的连接器配置属性可以用来保护Kafka代理的连接:

开云体育电动老虎机database.history.producer.security。协议SSL databas开云体育电动老虎机e.history.producer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.producer.ssl.keystore。密码=test1234 database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks database.history.producer.ssl.truststore.password=test1234 database.history.producer.ssl.key.password=test1234 database.history.consumer.security.protocol=SSL database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks database.history.consumer.ssl.keystore.password=test1234 database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks database.history.consumer.ssl.truststore.password=test1234 database.history.consumer.ssl.key.password=test1234

数据库驱动程序的传递属性开云体育电动老虎机

除了Kafka生产者和消费者的传递属性之外,还有数据库驱动程序的传递属性开云体育电动老虎机.这些属性具有开云体育电动老虎机数据库。前缀。例如,开云体育电动老虎机database.tinyInt1isBit = false传递给JDBC URL。

MySQL连接器监控指标

除了Zo开云体育官方注册网址okeeper、Kafka和Kafka Connect内置的对JMX指标的支持外,Debezium MySQL连接器还有三种指标类型。

请参阅监控文档了解如何通过JMX公开这些指标的详细信息。

快照指标

MBean开云体育官方注册网址debezium.mysql: type = connector-metrics上下文=快照,server开云体育电动老虎机 = < database.server.name >

属性 类型 描述

TotalTableCount

int

快照中包含的表的总数。

RemainingTableCount

int

快照尚未复制的表数。

HoldingGlobalLock

布尔

连接器当前是否持有全局或表写锁。

SnapshotRunning

布尔

快照是否启动。

SnapshotAborted

布尔

快照是否中止。

SnapshotCompleted

布尔

快照是否完成。

SnapshotDurationInSeconds

快照到目前为止所花费的总秒数,即使没有完成。

RowsScanned

Map < String,长>

映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。

LastEvent

字符串

连接器读取的最后一个快照事件。

MilliSecondsSinceLastEvent

自连接器读取并处理最近事件以来的毫秒数。

TotalNumberOfEventsSeen

自上次启动或重置以来,此连接器已看到的事件总数。

NumberOfEventsFiltered

连接器上已配置白名单或黑名单过滤规则过滤的事件个数。

MonitoredTables

string []

连接器监视的表的列表。

QueueTotalCapcity

int

用于在快照阅读器和主Kafka Connect循环之间传递事件的队列长度。

QueueRemainingCapcity

int

用于在快照阅读器和主Kafka Connect循环之间传递事件的队列的空闲容量。

Binlog指标

MBean开云体育官方注册网址debezium.mysql: type = connector-metrics、上下文= binlo开云体育电动老虎机g服务器= < database.server.name >

只有启用binlog事件缓冲时,与事务相关的属性才可用。看到binlog.buffer.size在高级连接器配置属性中获取更多详细信息。
属性 类型 描述

连接

布尔

标志,表示连接器当前是否连接到MySQL服务器。

BinlogFilename

字符串

连接器最近读取的binlog文件名的名称。

BinlogPosition

连接器已读取的binlog中的最近位置(以字节为单位)。

IsGtidModeEnabled

布尔

标志,表示连接器当前是否跟踪来自MySQL服务器的gtid。

GtidSet

字符串

连接器在读取binlog时看到的最新GTID集的字符串表示形式。

LastEvent

字符串

连接器读取的最后一个binlog事件。

SecondsSinceLastEvent(过时的)

自连接器读取并处理最近事件以来的秒数。

SecondsBehindMaster(过时的)

最后一个事件的MySQL时间戳和连接器处理它之间的秒数。这些值将包含运行MySQL服务器和MySQL连接器的机器上的时钟之间的任何差异。

MilliSecondsBehindSource

最后一个事件的MySQL时间戳和连接器处理它之间的毫秒数。这些值将包含运行MySQL服务器和MySQL连接器的机器上的时钟之间的任何差异。

TotalNumberOfEventsSeen

自上次启动或重置以来,此连接器已看到的事件总数。

NumberOfSkippedEvents

MySQL连接器跳过的事件数。通常情况下,由于MySQL binlog中的错误或无法解析的事件,事件会被跳过。

NumberOfEventsFiltered

连接器上已配置白名单或黑名单过滤规则过滤的事件个数。

NumberOfDisconnects

MySQL连接器断开连接的次数。

SourceEventPosition

map < string, string >

上次接收事件的坐标。

LastTransactionId

字符串

最后处理的事务的事务标识符。

LastEvent

字符串

连接器读取的最后一个binlog事件。

MilliSecondsSinceLastEvent

自连接器读取并处理最近事件以来的毫秒数。

MonitoredTables

string []

Debezium监视的表的列表。开云体育官方注册网址

QueueTotalCapcity

int

用于在binlog阅读器和主Kafka Connect循环之间传递事件的队列长度。

QueueRemainingCapcity

int

用于在binlog阅读器和Kafka连接主循环之间传递事件的队列的空闲容量。

NumberOfCommittedTransactions

提交的已处理事务的数量。

NumberOfRolledBackTransactions

回滚且未流化的已处理事务的数量。

NumberOfNotWellFormedTransactions

未符合预期协议的事务数开始+提交/回滚.应该是0在正常情况下。

NumberOfLargeTransactions

未放入前瞻性缓冲区的事务数。应该明显小于NumberOfCommittedTransactions而且NumberOfRolledBackTransactions为了获得最佳性能。

架构历史度量

MBean开云体育官方注册网址debezium.mysql: type = connector-metrics、上下文= schema-histor开云体育电动老虎机y服务器= < database.server.name >

属性 类型 描述

状态

字符串

之一停止恢复(从存储中恢复历史),运行描述数据库历史记录的状态。开云体育电动老虎机

RecoveryStartTime

恢复开始的时间(以epoch秒为单位)。

ChangesRecovered

在恢复阶段读取的更改数。

ChangesApplied

在恢复和运行时期间应用的模式更改总数。

MilliSecondsSinceLastRecoveredChange

自上次更改从历史存储区恢复以来所经过的毫秒数。

MilliSecondsSinceLastAppliedChange

自应用最后一次更改以来所经过的毫秒数。

LastRecoveredChange

字符串

从历史存储中恢复的最后一次更改的字符串表示形式。

LastAppliedChange

字符串

最后应用的更改的字符串表示形式。

MySQL连接器常见问题

配置和启动错误

Debe开云体育官方注册网址zium MySQL连接器失败,报告错误,并在以下启动错误时停止运行:

  • 连接器的配置无效。

  • 连接器无法使用指定的连接参数连接到MySQL服务器。

  • 连接器试图在binlog中MySQL不再有可用历史的位置重新启动。

如果您收到这些错误中的任何一个,您将在错误消息中收到更多详细信息。错误消息还包含可能的变通方法。

MySQL不可用

如果您的MySQL服务器变得不可用,Debezium MySQL连接器将失败并报错,连接器将开云体育官方注册网址停止。您只需要在服务器可用时重新启动连接器。

使用GTIDs

如果您启用了gtid,并且有一个高可用的MySQL集群,请立即重新启动连接器,因为连接器将连接到集群中的另一个MySQL服务器,在服务器的binlog中找到代表最后一个事务的位置,并开始从该特定位置读取新服务器的binlog。

不使用gtid

如果没有启用gtid,连接器只记录它所连接到的MySQL服务器的binlog位置。为了从正确的binlog位置重新启动,必须重新连接到特定的服务器。

Kafka连接停止

当Kafka Connect停止时,有三种情况会导致一些问题:

Kafka Connect优雅地停止

当Kafka Connect优雅地停止时,当Debezium MySQL连接器任务停止并在新的Kafka Connect进程上重新启动时,只有很短的延开云体育官方注册网址迟。

Kafka连接进程崩溃

如果Kafka Connect崩溃,进程将停止,任何Debezium MySQL连接器任务将终止,而不会开云体育官方注册网址记录它们最近处理的偏移量。在分布式模式下,Kafka Connect重启其他进程上的连接器任务。但是,MySQL连接器从前面进程记录的最后一个偏移量开始恢复。这意味着替换任务可能会生成一些在崩溃之前处理过的相同事件,从而创建重复的事件。

每个更改事件消息包括以下特定于源的信息:
  • 事件起源

  • MySQL服务器的事件时间

  • binlog文件名称和位置

  • gtid(如果使用)

Kafka不可用

Kafka Connect框架使用Kafka生产者API记录Kafka中开云体育官方注册网址的Debezium更改事件。如果Kafka代理变得不可用,Debezium MySQL连接器将暂停,直到重新建立连接,开云体育官方注册网址并从它上次停止的地方恢复。

MySQL清除binlog文件

如果Debez开云体育官方注册网址ium MySQL连接器停止太长时间,MySQL服务器会清除旧的binlog文件,连接器的最后一个位置可能会丢失。当连接器重新启动时,MySQL服务器不再拥有起点,连接器执行另一个初始快照。如果快照被禁用,连接器将失败并报错。