开云体育官方注册网址Debezium MySQL连接器
开云体育官方注册网址Debezium的MySQL连接器可以监视和记录MySQL服务器或HA MySQL集群上数据库的所有行级更改。开云体育电动老虎机当它第一次连接到MySQL服务器/集群时,它会读取所有数据库的一致快照。开云体育电动老虎机当快照完成时,连接器将连续读取提交到MySQL 5.6或更高版本的更改,并生成相应的插入、更新和删除事件。每个表的所有事件都记录在一个单独的Kafka主题中,应用程序和服务可以很容易地使用它们。
从Debez开云体育官方注册网址ium 0.4.0开始,该连接器添加了对Amazon RDS而且Amazon Aurora (MySQL兼容性).然而,由于这些MySQL托管形式的限制,连接器在初始一致快照期间保留锁用于快照的持续时间.
概述
MySQL的二进制日志,或binlog,以数据库提交的相同顺序记录所有操作,包括对表模式的更改或对存储在表中的数据的更改。开云体育电动老虎机MySQL使用binlog进行复制和恢复。
开云体育官方注册网址Debezium的MySQL连接器读取MySQL的二进制日志,以了解数据以什么顺序发生了变化。然后产生一个更改事件对于binlog中的每一个行级插入、更新和删除操作,在一个单独的Kafka主题中记录每个表的所有更改事件。您的客户端应用程序读取对应于它感兴趣的数据库表的Kafka主题,并对在这些主题中看到的每个行级事件做出反应。开云体育电动老虎机
MySQL通常设置为在一段时间后清除二进制日志。这意味着二进制日志不会包含对数据库所做的所有更改的完整历史。开云体育电动老虎机因此,当MySQL连接器第一次连接到一个特定的MySQL服务器或集群时,它首先执行一个一致的快照每个数据库的。开云体育电动老虎机当连接器完成快照时,它开始从创建快照的确切位置读取binlog。这样,我们从所有数据的一致视图开始,然后继续读取,而不会丢失在创建快照时所做的任何更改。
连接器对故障的容忍度也很高。当连接器读取binlog并产生事件时,它会记录每个事件的binlog位置。如果连接器由于任何原因停止(包括通信故障、网络问题或崩溃),在重新启动时,它只是在上次停止的地方继续读取binlog。这包括快照:如果在连接器停止时快照没有完成,那么在重新启动时它将开始一个新的快照。稍后我们将讨论连接器的行为当事情出错时.
设置MySQL
在使用Debezium开云体育官方注册网址 MySQL连接器监视MySQL服务器上提交的更改之前,必须将服务器设置为可使用行级二进制日志记录并拥有一个具有适当权限开云体育电动老虎机的数据库用户。如果MySQL被配置为使用全局事务标识符(gtid),那么当其中一个MySQL服务器出现故障时,Debezium连接器可以更容易地重新建立连接。开云体育官方注册网址
下面几节将更详细地介绍如何在MySQL中设置这些特性。
启用binlog
MySQL服务器必须配置为使用行级二进制日志,将在MySQL文档.这通常是在MySQL服务器配置文件中完成的,看起来类似于下面的片段:
Server-id = 223344 log_bin = mysql-bin binlog_format = row binlog_row_image = full expire_logs_days = 10
地点:
的值
服务器id
MySQL集群中的每个服务器和复制客户端必须是唯一的。当我们设置连接器时,我们还将为连接器分配一个唯一的服务器ID。的值
log_bin
binlog文件序列的基名称。的值
binlog_format
必须设置为行
或行
.的值
binlog_row_image
必须设置为完整的
或完整的
.的值
expire_log_days
自动删除二进制日志文件的天数。默认值为0,这意味着“不自动删除”,因此请确保设置一个适合您的环境的值。
运行启用二进制日志记录的MySQL服务器确实会略微降低MySQL服务器的性能,但收益通常大于成本。每个binlog阅读器也会在服务器上放置少量负载,因此使用Debezium是最小化负载的好方法,同时向大量不同的消费者提供更改事件。开云体育官方注册网址 |
RDS MySQL通过启用/禁用产品特性(如读副本和自动备份)直接管理log_bin参数。确保在连接器使用数据的任何RDS MySQL节点中启用其中一个。否则,在执行快照时会失败。 |
启用gtid(可选)
MySQL服务器可以配置使用GTID-based复制.全局事务标识符(gtid)是在MySQL 5.6.5中引入的,它们唯一标识集群中特定服务器上发生的事务。使用gtid极大地简化了复制,并可以轻松地确认主服务器和从服务器是否一致。请注意,如果您使用的是较早版本的MySQL,则不能启用gtid。
启用gtid可以在MySQL服务器配置文件中完成,看起来类似于下面的片段:
Gtid_mode = on enforce_gtid_consistency = on
地点:
的值
gtid_mode
MySQL服务器的GTID模式。的值
enforce_gtid_consistency
指示服务器强制GTID一致性,只允许执行那些可以事务安全方式登录的语句,并且在使用GTID时是必需的。
查阅MySQL文档参阅有关设置gtid的详情。
MySQL连接器不要求MySQL使用gtid和基于gtid的复制。每次连接器启动时,它都会自动检测是否启用并相应地调整其行为。 |
启用查询日志事件(可选)
从MySQL 5.6开始,基于行的复制可以配置为包含每个binlog事件的原始SQL语句。请注意,如果您使用的是较早版本的MySQL,则无法启用此特性。
启用这个选项可以在MySQL服务器配置文件中完成,看起来类似于下面的片段:
Binlog_rows_query_log_events = on
地点:
的值
binlog_rows_query_log_events
可以设置为在
或在
以启用支持在binlog条目中包含原始SQL语句。
设置会话超时(可选)
当执行非常大的数据库的初始快照时,已经建立的连接可能会在读取数据库表的内容开云体育电动老虎机时超时。
为了防止这种行为,您可以通过配置选项来扩大超时时间
> . Interactive_timeout = wait_timeout = . Interactive_timeout =
这里的值取决于数据集的大小,但通常应该以小时为单位。
为连接器创建一个MySQL用户
必须定义一个MySQL用户,该用户对连接器将要监视的所有数据库具有以下所有权限:开云体育电动老虎机
例如,下面的语句为用户授予这些权限开云体育官方注册网址
使用密码进行身份验证dbz
,其中用户可以在任何机器上:
在*上授权选择、重新加载、显示数据库、复制从机、复制客开云体育电动老虎机户端。* TO '开云体育官方注册网址debezium' IDENTIFIED BY 'dbz';
选择一个好的密码,不同于我们上面使用的密码。 同样,上面的授权等同于在上指定任何身份验证客户端任何主机,所以显然不建议在生产环境中使用。相反,在生产环境中,您几乎肯定会将复制用户限制在Kafka Connect服务中运行MySQL连接器的机器上,例如 |
当使用MySQL连接器时Amazon RDS,Amazon Aurora (MySQL兼容性),或连接器的数据库用户无法获得全局读锁的任何其他服务器,则数据库用户还必须具有开云体育电动老虎机 |
支持的MySQL拓扑
MySQL连接器可以与各种MySQL拓扑一起使用。
MySQL独立
当单独使用一个MySQL服务器时,该服务器必须启用binlog(还可以选择启用gtid),以便MySQL连接器能够监视它。这通常是可以接受的,因为二进制日志也可以用作增量备份.在这种情况下,MySQL连接器将始终连接到并遵循这个独立的MySQL服务器实例。
MySQL主从
MySQL复制可以用来建立一个MySQL实例集群,其中一个MySQL服务器实例被认为是主其他的a奴隶.拓扑可以包括单主带单从、单主带多从、多主带多从。你的选择取决于你的需求,你的备份和恢复策略,以及你如何扩展MySQL来处理大数据量和查询。
要使用带有这些拓扑之一的MySQL连接器,连接器可以跟随一个主服务器或一个从服务器(如果该从服务器启用了它的binlog),但是连接器只能看到集群中对该服务器可见的那些更改。通常,除了多主机拓扑,这不是问题。
连接器记录其在服务器binlog中的位置,该位置在集群中的每个服务器上都是不同的。因此,连接器只需要跟随一个MySQL服务器实例。如果该服务器发生故障,则必须重新启动或恢复服务器,然后连接器才能继续。
高可用的MySQL集群
一个各种高可用性解决方案它们使MySQL更容易容忍,并且几乎可以立即从问题和失败中恢复。大多数HA MySQL集群使用gtid,以便从服务器能够跟踪任何主服务器上的所有更改。
多主机MySQL
一个多主机MySQL拓扑使用一个或多个MySQL slave进行复制多个主人。这是聚合多个MySQL集群复制的强大方法,需要使用gtid。
从Debez开云体育官方注册网址ium 0.3.5开始,Debezium MySQL连接器可以使用这些多主MySQL从服务器作为源,并可以故障转移到不同的多主MySQL slave,只要新slave赶上旧slave(例如,新slave拥有最后在第一个slave上看到的所有事务)。即使连接器只使用数据库和/或表的一个子集,这也可以工作,因为当试图重新连接到一个新的多主MySQL从服务器并在binl开云体育电动老虎机og中找到正确的位置时,连接器可以配置为包括或排除特定的GTID源。
主持MySQL
从Debez开云体育官方注册网址ium 0.4.0开始,MySQL连接器添加了对Amazon RDS而且Amazon Aurora (MySQL兼容性).连接器在读取binlog时像往常一样工作,但是在这些环境中连接器以不同的方式执行快照.这是因为MySQL的这些托管形式阻止了数据库用户获得全局读锁,因此连接器获得一致快照的唯一方法是使用表级开云体育电动老虎机锁。不幸的是,表级锁影响当前事务,这意味着在连接器完成读取所有数据并提交其事务之前不能释放锁。
MySQL连接器是如何工作的
本节将详细介绍MySQL连接器如何跟踪表的结构,执行快照,将binlog事件转换为Debezium更改事件,这些事件被记录在Kafka中,以及当事情出错时连接器如何表现。开云体育官方注册网址
开云体育电动老虎机数据库模式历史记录
当数据库客户端开云体育电动老虎机查询数据库时,它使用数据库的当前模式。但是,数据库模式可以在任何开云体育电动老虎机时候更改,这意味着连接器必须知道每次插入、更新或删除操作时的模式是什么样子记录.它也不能只使用当前模式,因为它可能正在处理相对较旧的事件,并且可能在表的模式更改之前就已经记录了这些事件。幸运的是,MySQL在binlog中包含了对数据的行级更改而且应用到数据库的DDL语句。开云体育电动老虎机当连接器读取binlog并遇到这些DDL语句时,它将解析它们并更新每个表的模式的内存表示,然后使用该表示在每次插入、更新或删除发生时了解表的结构并产生适当的更改事件。它也在一个单独的记录开云体育电动老虎机数据库历史Kafka主题所有的DDL语句以及在binlog中每个DDL语句出现的位置。
当连接器在崩溃或正常停止后重新启动时,连接器将开始从特定位置(即特定的时间点)读取binlog。连接器重新构建已存在的表结构在这个时间点通过读取数据库历史Kafka主开云体育电动老虎机题,解析所有DDL语句,直到连接器开始的binlog点。
此数据库历开云体育电动老虎机史记录主题仅供连接器使用,但连接器可以选择生成架构更改事件针对消费者应用程序的不同主题。我们会在架构更改主题部分。
当MySQL连接器监视一个表时,模式更改工具喜欢Gh-ost或pt-online-schema-change则在迁移过程中创建的辅助表需要包含在白名单表中。 如果下游系统不需要临时表生成的消息,那么可以编写并应用一个简单的消息转换来过滤掉它们。 |
数据库模式历史中事件的全局顺序至关重要,因此数据库历史主题不能分区。开云体育电动老虎机这意味着在创建此主题时必须指定分区计数为1。当依赖于自动主题创建时,确保Kafka的 |
快照
当MySQL连接器被配置为跟随MySQL服务器实例第一次启动时,默认情况下它将执行初始化一致的快照数据库的。开云体育电动老虎机这是默认模式,因为很多时候MySQL binlog不再包含数据库的完整历史。开云体育电动老虎机
连接器在每次快照时执行以下步骤:
获取一个全局读锁,它可以阻止其他数据库客户机的写操作。开云体育电动老虎机
使用可重复读取语义以确保此事务中的所有后续读取都是针对单个一致快照完成的。
读取binlog的当前位置。
读取连接器配置允许的数据库和表的模式。开云体育电动老虎机
释放全局读锁,允许其他DB客户端再次写入数据库开云体育电动老虎机
可选地将DDL更改写入架构更改主题,包括所有必要的
滴……
而且创建…
DDL语句扫描所有数据库表,并在适当的特定于开云体育电动老虎机表的Kafka主题上生成
创建
每一行的事件。提交事务。
在连接器偏移量中记录连接器成功完成快照。
第1步中开始的事务不阻止其他客户端对表行进行更改,而是为连接器提供表中数据的一致且不变的视图。但是,该事务并不阻止其他客户机应用DDL,这可能会干扰连接器读取binlog位置和表模式的尝试。因此,连接器在第2步中获得一个全局读锁以防止此类问题,并且在第3步和第4步中读取binlog位置和表模式时,它会在很短的时间内保持这个锁。这个全局读锁在第5步中释放,在连接器执行大量复制数据的工作之前。
如果连接器失败、重新平衡或在快照完成之前停止,则连接器在重新启动时将开始一个新的快照。一旦连接器完成了它的初始快照,MySQL连接器接着从步骤3中读取的位置读取binlog,确保连接器不会错过任何更新。如果连接器由于任何原因再次停止,在重新启动时,它将简单地继续读取之前停止的binlog。但是,如果连接器停止的时间足够长,MySQL可能会清除旧的binlog文件,连接器的最后一个位置可能会丢失。在这种情况下,当连接器配置为最初的快照模式(默认)最终重新启动,MySQL服务器将不再有起点,连接器将失败并报错。
第二个快照模式允许连接器执行快照在必要时.此行为类似于默认值最初的上面提到的快照行为,除了一个例外:如果连接器重新启动而且MySQL不再在binlog中有它的起点,而不是失败的连接器将执行另一个快照。这种模式可能是最自动化的,但是在出现问题时(通常是连接器宕机太长时间时)要冒执行额外快照的风险。
第三个快照模式确保连接器从来没有执行快照。当一个新的连接器以这种方式配置时,它将从头开始读取binlog。这不是默认的行为,因为在这种模式下启动一个新连接器(没有快照)需要MySQL binlog包含所有被监控数据库的整个历史,而MySQL实例很少以这种方式配置。开云体育电动老虎机具体来说,binlog必须至少包含创建表…
语句。如果这个需求没有得到满足,连接器将不能正确地解释binlog中低级事件的结构,并且它将简单地跳过那些缺少表定义的所有事件。(连接器不能依赖于这些表的当前定义,因为在binlog中记录初始事件后,这些表可能已经被更改,从而阻止连接器正确地解释binlog事件。)
从0.3.4开始,第四个快照模式允许连接器在启动时从其当前位置开始读取MySQL binlog。与schema_only
模式下,连接器读取当前binlog的位置,在不读取任何数据的情况下捕获当前表模式,然后继续从其当前位置读取binlog。这发生得非常快,并且产生的变更事件流只包括那些发生的变更事件快照启动后.对于不需要知道数据库的完整状态,但只需要知道自连接器启动以来所做的更改的使用者来说,这可能很有用。开云体育电动老虎机
从0.7.2开始,出现了第五种快照模式schema_only_recovery
允许现有连接器恢复损坏或丢失的数据库历史主题。开云体育电动老虎机它的行为类似于schema_only
,因为它在不读取任何数据的情况下捕获当前表模式。区别在于:
它只能用于现有的连接器,作为连接器配置的更新。
它从这个现有连接器的最后一个提交偏移量处开始读取binlog,而不是binlog的当前位置。
schema_only_recovery
还可以用于定期“清理”可能意外增长的数据库历史主题(需要无限保留)。开云体育电动老虎机为此,在将连接器的快照模式更新为开云体育电动老虎机之前,必须手动删除数据库历史主题schema_only_recovery
.注意,使用这种模式是安全的只有如果在提交的偏移量之后没有发生模式更改。否则,在提交的偏移量和模式更改的binlog位置之间的binlog事件将以不一致的模式发出(已经基于已更改的模式,它尚未应用于前面的这些事件)。因此,一旦历史主题恢复成功,建议返回到其他快照模式之一,以防止在随后重新启动连接器后继续快照。
由于连接器在执行快照时记录偏移量的方式不同,因此连接器现在默认为include.schema.events = true
.这将把快照期间执行的所有DDL更改写入可被应用程序使用的主题。而且,更重要的是,在上面提到的最后一步中,它确保立即记录更新的偏移量(而不是等到数据库发生更改)。开云体育电动老虎机
不带全局读锁的快照
一些MySQL环境,包括Amazon RDS而且Amazon Aurora (MySQL兼容性),不允许用户获取全局读锁。从0.4.0开始,当MySQL连接器检测到不允许使用全局读锁时,它将退回到表级锁(要求数据库用户也具有开云体育电动老虎机锁表
特权),并执行以下步骤的快照:
使用可重复读取语义以确保此事务中的所有后续读取都是针对单个一致快照完成的。
获取全局读锁以阻止其他数据库客户端写入失败。开云体育电动老虎机
读取数据库和表的名称,使用连接器的配开云体育电动老虎机置对它们进行筛选。
在所有配置的表上获得表级锁。
读取binlog的当前位置。
读取所有配置的数据库和表的模式。开云体育电动老虎机
可选地将DDL更改写入架构更改主题,包括所有必要的
滴……
而且创建…
DDL语句扫描所有数据库表,并在适当的特定于开云体育电动老虎机表的Kafka主题上生成
创建
每一行的事件。提交事务。
释放表级锁。
在连接器偏移量中记录连接器成功完成快照。
注意几乎所有一致性快照都持有表级锁,包括在步骤7中读取所有数据库表内容。开云体育电动老虎机这与使用全局读锁非常不同,因为全局读锁的持有时间非常短。不幸的是,这是MySQL连接器获得一致快照的唯一方法,因为释放表级锁隐式地提交会话持有的任何打开事务.因为我们需要事务获得数据库内容的一致快照,所以在第7步读取数据并在第8步提交事务之前,我们无法释放表级锁。开云体育电动老虎机
读取MySQL binlog
MySQL连接器通常会花费大量时间读取所连接的MySQL服务器的binlog。
当MySQL连接器读取binlog时,它将二进制日志事件转换为Debezium开云体育官方注册网址创建,更新,或删除事件,包括binlog中发现事件的位置(如果使用gtid,则包括gtid)。MySQL连接器将这些更改事件转发给Kafka Connect框架(在同一个进程中运行),然后Kafka Connect框架同步地将它们写入相应的Kafka主题。Kafka Connect使用了这个术语抵消对于Debezium包含在每个事件中的源特定位置信息,Kafka Connect定期记录另一个Kaf开云体育官方注册网址ka主题中最近的偏移量。
当Kafka Connect优雅地关闭时,它会停止连接器,将所有事件刷新到Kafka,并记录从每个连接器接收到的最后偏移量。在重新启动时,Kafka Connect读取每个连接器的最后记录偏移量,并从该点启动连接器。MySQL连接器使用binlog文件名,该文件中的位置,以及记录在偏移量中的gtid(如果它们在MySQL服务器中启用)来请求MySQL将binlog事件发送到该位置之后。
主题名称
MySQL连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。卡夫卡主题的名称总是采用这种形式serverName.开云体育电动老虎机数据库名.的表,在那里serverName连接器的逻辑名称是否与开云体育电动老虎机database.server.name
配置属性,开云体育电动老虎机数据库名发生操作的数据库名称和开云体育电动老虎机的表发生操作的数据库表的名称。开云体育电动老虎机
例如,考虑一个MySQL安装库存
开云体育电动老虎机包含四个表的数据库:产品
,products_on_hand
,客户
,订单
.如果给监视此数据库的连接器一个逻辑服务器名开云体育电动老虎机实现
,那么连接器将在这四个Kafka主题上产生事件:
fulfillment.inventory.products
fulfillment.inventory.products_on_hand
fulfillment.inventory.customers
fulfillment.inventory.orders
架构更改主题
应用程序使用描述数据库模式中的更改的事件通常是有用的,因此可以配置MySQL连接器来生成开云体育电动老虎机架构更改事件将所有DDL语句应用到MySQL服务器中的数据库。开云体育电动老虎机当启用时,连接器将所有此类事件写入名为serverName,在那里serverName连接器的逻辑名称是否与开云体育电动老虎机database.server.name
配置属性。在前面的示例中,逻辑服务器名为实现
时,模式更改事件将记录在主题中实现
.
的开云体育电动老虎机数据库历史主题而且架构更改主题两者都包含DDL语句的事件。但是,我们已经将模式更改主题上的事件设计得更容易使用,因此它们更细粒度,并且始终具有数据库名称。开云体育电动老虎机如果要使用模式更改事件,请确保使用模式更改主题和从来没有使用数据库历史记录主题。开云体育电动老虎机 |
为了保持模式更改的正确顺序,不能对模式更改主题进行分区。这意味着在创建此主题时必须指定分区计数为1。当依赖于自动主题创建时,确保Kafka的 |
写入模式更改主题的每个消息都有一个消息键,该消息键包含客户端连接到的数据库的名称,并在应用DDL语句时使用该数据库:开云体育电动老虎机
{"schema": {"type": "struct", "name": "io. d开云体育官方注册网址ebezum .connector.mysql. schemachangekey ", "optional": false, "fields": [{"开云体育电动老虎机field": "databaseName", "type": "string", "optional": false}]}, "payload": {"databaseName": "inventory"}}
同时,模式更改事件消息的值将包含一个结构,该结构包含DDL语句(语句所在的数据库)开云体育电动老虎机应用,以及语句在binlog中出现的位置:
{"模式":{“类型”:“结构”、“名称”:“io.debezium.connector开云体育官方注册网址.mysql.SchemaChangeValue”、“可选”:假的,“字段”:[{“字段”:“数据库名”、“类型”:“弦”、“可选”:假},{”字段”:“d开云体育电动老虎机dl”、“类型”:“弦”、“可选”:假},{”字段”:“源”、“类型”:“结构”、“名称”:“io.debezium.connector.mysql.Source”、“可选”:假的,“字段”:[{“类型”:“弦”、“可选”:真的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“server_id”},{“类型”:“int64”、“可选”:假的,“场”:“ts_sec”},{“类型”:“弦”、“可选”:真的,“场”:“gtid”},{“类型”:“弦”、“可选”:假的,“场”:“文件”},{“类型”:“int64”、“可选”:假的,“场”:“pos”},{“类型”:“int32”、“可选”:假的,“场”:“行”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“int64”、“可选”:真的,“场”:“线程”},{“类型”:"string", "optional": true, "field": "db"}, {"type": "string", "optional": true, "field": "table"}, {"type": "string", "optional": true, "field": "query"}]}]}, "payload": {"databaseNa开云体育电动老虎机me": "inventory", "ddl": "CREATE table products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT);ALTER TABLE products AUTO_INCREMENT = 101;", "source": {"version": "0.10.0. "最后”、“名称”:“mysql-server-1”、“server_id”:0,”ts_sec”:0,”gtid”:空,“文件”:“mysql-bin。000003.", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": null, "table": null, "query": null } } }
的ddl
字段可以包含多个DDL语句,但是事件中的每个语句都将应用于开云体育电动老虎机开云体育电动老虎机数据库名
字段,它们将以应用到数据库的相同顺序出现。开云体育电动老虎机此外,模式更改主题中的所有事件将以应用到MySQL服务器的相同顺序出现。
的 |
如上所述,每个模式更改事件将包含一个或多个应用于单个数据库的DDL语句。开云体育电动老虎机如果客户端提交一系列应用于的DDL语句会发生什么多个开云体育电动老虎机数据库(例如,也许它们使用全限定名)?如果MySQL原子地应用这些语句(例如,作为单个事务),那么连接器将接受这些DDL语句为了,根据受影响的数据库对它们进行分组,然后为每个组创建一个模开云体育电动老虎机式更改事件。另一方面,如果MySQL单独应用这些语句,那么连接器将为每条语句创建一个单独的模式更改事件。
事件
由MySQL连接器产生的所有数据更改事件都有一个键和一个值,尽管键和值的结构依赖于产生更改事件的表(参见主题名称).
从Kafka 0.10开始,Kafka可以选择记录消息键和值时间戳消息被创建(由生产者记录)或被Kafka写入日志的时间。 |
从Debez开云体育官方注册网址ium 0.3开始,Debezium MySQL连接器确保所有Kafka连接模式名是有效的Avro模式名.这意味着逻辑服务器名必须以拉丁字母或下划线开头(例如,[a-z, a-z, _]),逻辑服务器名中的其余字符以及数据库和表名中的所有字符必须是拉丁字母、数字或下划线(例如,[a-z, a-z, 0-9,\_])。开云体育电动老虎机如果不是,则所有无效字符将自动替换为下划线字符。 当逻辑服务器名、数据库名和表名包含其他字符时,这可能导致模式名中出现意外冲突,并且表全名之间唯一的区分字符无效,因此被下划线取代。开云体育电动老虎机 |
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流,这些事件的结构可能会随着时间而改变。这对于消费者来说可能很难处理,所以Kafka Connect使得每个事件都是自包含的。每个消息键和值都有两部分模式而且有效载荷.模式描述有效负载的结构,而有效负载包含实际数据。
更改事件的键
对于给定的表,更改事件的键将具有一个结构,该结构在创建事件时包含表的主键(或唯一键约束)中的每一列的字段。考虑一个库存
开云体育电动老虎机数据库的客户
表定义为:
CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY) AUTO_INCREMENT=1001;
的每一个变化事件客户
表,当它有这个定义时,将具有相同的键结构,在JSON中是这样的:
{"schema": {"type": "struct", "name": "mysql-server-1.inventory.customers. "关键”、“可选”:假的,“字段”:[{”字段”:“id”、“类型”:“int32”、“可选”:假}]},“有效载荷”:{" id ": 1001}}
的模式
密钥的部分包含一个Kafka Connect模式,描述有效负载部分的内容,在我们的例子中,这意味着有效载荷
值不是可选的,是由名为mysql服务器- 1. inventory.customers.key
,并且有一个名为id
类型的int32
.如果我们看一下键的值有效载荷
字段,我们将看到它确实是一个结构(在JSON中只是一个对象)id
字段,值为1004
.
因此,我们将此键解释为描述对象中的行inventory.customers
表(从命名的连接器输出mysql-server-1
),其id
主键列的值为1004
.
虽然 |
如果表没有主键或唯一键,则更改事件的键将为空。这是有意义的,因为没有主键或唯一键约束的表中的行不能被唯一标识。 |
更改事件的值
更改事件消息的值稍微复杂一些。就像关键信息一样,它有一个模式节和有效载荷部分。从Debezium 0.2开开云体育官方注册网址始,MySQL连接器产生的每个更改事件值的有效负载部分都有一个信封结构,使用以下字段:
人事处
必选字段,包含描述操作类型的字符串值。MySQL连接器的值为c
对于创建(或插入),u
对于更新,d
对于删除,和r
用于读取(在非初始快照的情况下)。之前
是否是可选字段,如果存在则包含行状态之前事件发生了。该结构将由mysql服务器- 1. inventory.customers.value
Kafka Connect模式,其中mysql-server-1
对象中的所有行使用连接器inventory.customers
表格后
是否是可选字段,如果存在则包含行状态后事件发生了。结构的描述是相同的mysql服务器- 1. inventory.customers.value
中使用的Kafka Connect模式之前
.源
是一个必须的字段,它包含一个描述事件源元数据的结构,在MySQL中包含几个字段:Debe开云体育官方注册网址zium版本、连接器名称、记录事件的binlog文件的名称、事件在binlog文件中出现的位置、事件中的行(如果有多个)、该事件是否是快照的一部分、受影响的数据库和表的名称、创建事件的MySQL线程的id(仅限非快照事件),以及可用的MySQL服务器id,以及以秒为单位的时间戳。开云体育电动老虎机对于非快照事件,如果MySQL服务器具有binlog_rows_query_log_events选项启用后,连接器将使用include.query
选项启用后,查询字段将包含生成事件的原始SQL语句。ts_ms
是可选的,如果存在,则包含连接器处理事件的时间(使用运行Kafka Connect任务的JVM中的系统时钟)。
当然,还有模式事件消息值的一部分包含描述此信封结构及其内嵌套字段的模式。
我们来看看a创建事件值可能看起来像客户
表:
{"schema": {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "mysql-server-1. directory .customers。值","field": "before"}, {"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "mysql-server-1.inventory.customers。价值", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "mysql-server-1.inventory.customers.Envelope" }, "payload": { "op": "c", "ts_ms": 1465491411815, "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "0.10.0.Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 0, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "thread": 7, "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')" } } }
如果我们看模式
事件的一部分价值的模式信封的模式源
结构(特定于MySQL连接器并在所有事件中重用),以及表特定的模式之前
而且后
字段。
的模式名称 |
如果我们看有效载荷
事件的一部分价值,我们可以看到事件中的信息,即它正在描述创建的行(sinceop = c
),以及后
字段值包含新插入行的'id
,first_name
,last_name
,电子邮件
列。
事件的JSON表示形式似乎比它们描述的行要大得多。这是正确的,因为JSON表示必须包含模式和有效载荷部分信息。这是可能的,甚至建议使用Avro转换器以大幅减少写入Kafka主题的实际消息的大小。 |
的值更新这个表上的Change事件实际上会有完全相同的结果模式,其有效负载的结构相同,但将持有不同的值。这里有一个例子:
这是新事件价值格式化为更容易阅读:
{"schema":{…},“有效载荷”:{“前”:{" id ": 1004年,“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“后”:{" id ": 1004年,“first_name”:“安妮玛丽”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{“版本”:“0.10.0。Final", "name": "mysql-server-1", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 484, "row": 0, "thread": 7, "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"}, "op": "u", "ts_ms": 1465581029523}}
当我们把这个和插入事件中,我们看到了一些不同有效载荷
部分:
的
人事处
字段值现在是u
,表示该行因更新而更改的
之前
字段现在拥有数据库提交前包含值的行状态开云体育电动老虎机的
后
字段现在已经更新了行状态,这里可以看到first_name
价值就是现在安妮玛丽
.的
源
字段结构具有与以前相同的字段,但值不同,因为此事件来自binlog中的不同位置。的
ts_ms
显示了Debezium处理此事件的时间戳。开云体育官方注册网址
通过观察,我们可以学到很多东西有效载荷
部分。我们可以比较之前
而且后
结构来确定由于提交而在该行中实际更改的内容。的源
结构告诉我们MySQL关于这个变化记录的信息(提供可追溯性),但更重要的是,我们可以将这个信息与这个和其他主题中的其他事件进行比较,以了解这个事件是在之前发生的,之后发生的,还是作为其他事件的一部分发生在同一个MySQL提交中。
当一行的主键/唯一键的列更新时,行键的值也发生了变化,因此Debezium将输出开云体育官方注册网址三个事件: |
到目前为止,我们已经看到了样本创建而且更新事件。现在,我们来看看a的值删除事件。再一次,模式
的部分值将与创建而且更新事件:
{"schema":{…},“有效载荷”:{“前”:{" id ": 1004年,“first_name”:“安妮玛丽”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“后”:空,“源”:{“版本”:“0.10.0。Final", "connector": "mysql", "name": "mysql-server-1", "ts_ms": 1465581, "snapshot": false, "db": "inventory", "table": "customers", "server_id": 223344, "gtid": null, "file": "mysql-bin "。000003", "pos": 805, "row": 0, "thread": 7, "query": "DELETE FROM customers WHERE id=1004"}, "op": "d", "ts_ms": 1465581902461}}
如果我们看有效载荷
部分,我们看到了一些不同的比较创建或更新事件的有效载荷:
的
人事处
字段值现在是d
,表示该行已被删除的
之前
字段现在拥有在数据库提交时删除的行状态开云体育电动老虎机的
后
字段为空,表示该行不再存在的
源
字段结构具有许多与以前相同的值,除了ts_sec
而且pos
字段发生了变化(并且文件
在其他情况下可能会改变)。的
ts_ms
显示了Debezium处理此事件的时间戳。开云体育官方注册网址
此事件向使用者提供了用于处理删除该行的各种信息。我们包含旧的值,这样一些消费者可能需要它们来正确地处理删除,如果没有它,他们可能不得不求助于更复杂的行为。
MySQL连接器的事件是设计来处理的Kafka对数压缩,只要每个键至少保留最近的消息,就可以删除一些旧消息。这允许Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。
删除一行时,删除上面列出的event值仍然适用于日志压缩,因为Kafka仍然可以删除所有使用相同键的早期消息。但只有当消息值为空时Kafka才知道它可以删除所有消息用同样的钥匙。为了实现这一点,Debezium的MySQL连开云体育官方注册网址接器总是跟随删除特别活动墓碑上具有相同键但空值的事件。
数据类型
如上所述,MySQL连接器用事件表示行更改,事件的结构类似于行所在的表。事件包含每个列值的字段,该值在事件中如何表示取决于列的MySQL数据类型。本节描述此映射。
下表描述了连接器如何将每个MySQL数据类型映射到文字类型而且语义类型在事件字段中。在这里,文字类型描述了如何使用Kafka Connect模式类型逐字表示值,即INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,布尔
,字符串
,字节
,数组
,地图
,结构体
.的语义类型描述了Kafka Connect模式如何捕获意义该字段使用Kafka Connect模式的名称。
MySQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
的 |
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
包含JSON文档、数组或标量的字符串表示形式。 |
|
|
|
的 |
|
|
|
的 |
|
|
|
|
|
|
|
包含特定时区中ISO8601格式的日期和时间(精度可达微秒)。MySQL允许 |
存储字符串的列在MySQL中使用字符集和排序规则定义,可以显式地在列的定义上定义,也可以隐式地继承表、数据库或服务器的默认字符集和排序规则。开云体育电动老虎机从0.3.1开始,MySQL连接器在binlog事件中读取列值的二进制表示时使用列的字符集。
其他数据类型映射将在下面几节中描述。
如果存在,列的默认值将被传播到相应字段的Kafka Connect模式。为时间戳
指定默认值为的列CURRENT_TIMESTAMP
或现在
,值1970-01-01就是将被用作Kafka Connect模式的默认值。更改消息将包含字段的默认值(除非给出了显式的列值),因此很少需要从模式获取默认值。传递默认值有助于满足兼容性规则使用Avro与Confluent模式注册中心一起作为序列化格式。
时间值
除了MySQL之外时间戳
的值,MySQL的时态类型取决于time.precision.mode
配置属性。
在Debez开云体育官方注册网址ium 0.7 从Debez开云体育官方注册网址ium 0.10开始 |
当 的 |
当time.precision.mode
配置属性设置为adaptive_time_microseconds
(默认值),然后连接器将确定MySQL类型的文字类型和语义类型时间
,日期
而且DATETIME
基于列的数据类型定义,使事件完全表示数据库中的值,所有TIME字段将以微秒为单位捕获:开云体育电动老虎机
MySQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示以微秒为单位的时间值,不包括时区信息。MySQL允许 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示经过epoch的微秒数,不包括时区信息。 |
当time.precision.mode
配置属性设置为自适应
(已弃用),那么连接器将根据列的数据类型定义确定时态类型的文字类型和语义类型,以便事件完全表示数据库中的值:开云体育电动老虎机
MySQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示午夜过后的毫秒数,不包括时区信息。 |
|
|
|
表示午夜过后的微秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示经过epoch的微秒数,不包括时区信息。 |
当time.precision.mode
配置属性设置为连接
,那么连接器将使用预定义的Kafka Connect逻辑类型,就像0.2的情况一样。MySQL连接器。当用户只知道内置的Kafka Connect逻辑类型而无法处理可变精度的时间值时,这可能很有用。另一方面,由于MySQL两者都允许时间
而且DATETIME
有分数秒精度0-6的存储精度可达微秒,事件由连接器生成连接
时间精度模式*导致精度损失*当数据库列具有开云体育电动老虎机分数秒精度取值大于3:
MySQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示从午夜开始的毫秒数,不包括时区信息。MySQL允许 |
|
|
|
表示自epoch以来的毫秒数,不包括时区信息。MySQL允许 |
MySQL允许零值为日期
,DATETIME
,时间戳
列,有时优先于空值。类的任何Java类型都不能表示这些值time.precision.mode
选项,因此MySQL连接器将它们表示为零
值,当列定义允许为空时为时代天当列不允许为空时。
没有时区的时间值
的DATETIME
类型表示本地日期和时间,例如“2018-01-13 09:48:27”,即没有时区信息。使用UTC将这些列转换为历元毫秒或微秒(基于列的精度)。例如,类型列的值“2018-06-20 06:37:03”DATETIME
(没有给出精度)将由值1529476623000表示。
的时间戳
type表示一个没有时区信息的时间戳,MySQL在写入时将服务器(或会话的)当前时区转换为UTC,在读取值时则相反。这些列被转换为等效的列io.开云体育官方注册网址debezium.time.ZonedTimestamp
基于服务器(或会话)当前时区的UTC格式。默认情况下,将从服务器上查询时区。方法将其显式指定为连接器选项开云体育电动老虎机database.serverTimezone
选择。因此,例如,如果数据库的时区(全局的或通过前面开云体育电动老虎机提到的选项为连接器配置的)是“America/Los_Angeles”,则时间戳
值“2018-06-20 06:37:03”将用a表示ZonedTimestamp
值为“2018-06-20T13:37:03Z”。
注意,运行Kafka Connect和Debezium的JVM的时区不会影响这些转换。开云体育官方注册网址
这些列类型的处理基于使用MySQL JDBC连接器的非传统日期/时间处理模式。因此,强烈建议不要通过 |
十进制值
当decimal.handling.mode
配置属性设置为精确的
,那么连接器将使用预定义的Kafka Connectorg.apache.kafka.connect.data.Decimal
所有的逻辑类型小数
而且数字
列。这是默认模式。
MySQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
的 |
|
|
|
的 |
然而,当decimal.handling.mode
配置属性设置为双
,则连接器将表示所有小数
而且数字
值是Java的双精度值,并按如下方式编码:
MySQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
||
|
|
最后一个选项decimal.handling.mode
配置属性为字符串
.在这种情况下,连接器将表示所有小数
而且数字
值作为它们的格式化字符串表示,并按照如下方式编码它们:
MySQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
||
|
|
空间数据类型
从0.5.1版本开始,MySQL连接器对以下一些功能的支持也很有限空间数据类型:
空间数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
包含一个带2的结构 |
从0.7.2版本开始,MySQL连接器完全支持以下所有功能空间数据类型:
空间数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
包含一个具有2个字段的结构
|
当事情出错时
开云体育官方注册网址Debezium是一个分布式系统,它捕获多个上游数据库中的所有更改,并且永远不会错过或丢失事件。开云体育电动老虎机当然,当系统在名义上运行或被仔细管理时,Debezium可以提供开云体育官方注册网址只有一天交付每个变更事件。但是,如果确实发生了错误,那么系统仍然不会丢失任何事件,尽管当它从错误中恢复时,它可能会重复一些更改事件。因此,在这些不正常的情况下,Debezium(像Kafka)提供了开云体育官方注册网址至少一次变更事件的交付。
本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址
配置和启动错误
连接器将在启动时失败,在日志中报告错误/异常,当连接器的配置无效时,当连接器不能成功地使用指定的连接参数连接到MySQL时,或者当连接器从MySQL历史中先前记录的位置(通过binlog坐标或GTID设置)重新启动时,MySQL不再有该历史可用时,连接器将停止运行。
在这些情况下,错误将提供有关问题的更多详细信息,并可能提供建议的解决方法。当配置已经纠正或MySQL问题已经解决时,连接器可以重新启动。
MySQL不可用
一旦连接器运行,如果它所连接的MySQL服务器由于任何原因变得不可用,连接器将失败并报错,连接器将停止。当服务器可用时,只需重新启动连接器。
注意,当使用gtid和高可用的MySQL集群时,您可以简单地立即重新启动连接器,连接器将连接到集群中的另一个MySQL服务器,在该服务器的binlog中找到代表最后一个完全处理的事务的位置,并开始从该位置读取新服务器的binlog。
当连接器和MySQL不使用gtid时,连接器将记录它所连接的MySQL服务器的特定binlog中的位置。这些binlog坐标仅在该MySQL服务器上有效,因此恢复连接器必须通过连接到该服务器(或已从MySQL服务器的备份中恢复的另一个服务器)来实现。
Kafka Connect进程优雅地停止
如果Kafka Connect正在以分布式模式运行,并且一个Kafka Connect进程被优雅地停止,那么在关闭该进程之前,Kafka Connect将把该进程的所有连接器任务迁移到该组中的另一个Kafka Connect进程中,新的连接器任务将准确地拾取先前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。
Kafka连接进程崩溃
如果Kafka连接器进程意外停止,那么它正在运行的任何连接器任务显然都将终止,而不会记录它们最近处理的偏移量。当Kafka Connect以分布式模式运行时,它将重新启动其他进程上的连接器任务。但是,MySQL连接器将从上次偏移量开始恢复记录通过较早的流程,这意味着新的替换任务可能会生成一些与崩溃之前处理的相同的更改事件。重复事件的数量将取决于偏移刷新周期和崩溃前的数据更改量。
由于在故障恢复过程中可能会出现重复的事件,因此使用者应该始终预测到某些事件可能会重复。开云体育官方注册网址氘的变化是幂等的,所以一系列的事件总是在相同的状态下发生。 开云体育官方注册网址Debezium还在每个更改事件消息中包含关于事件起源的特定于源的信息,包括MySQL服务器的事件时间、其binlog文件名和位置以及GTID(如果使用的话)。消费者可以跟踪这些信息(特别是gtid),以了解它是否已经看到了特定的事件。 |
Kafka不可用
当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。Kafka Connect还会定期记录在这些更改事件中出现的最新偏移量,频率在Kafka Connect worker配置中指定。如果Kafka代理变得不可用,运行连接器的Kafka Connect工作进程将简单地重复尝试重新连接到Kafka代理。换句话说,连接器任务将简单地暂停,直到重新建立连接,此时连接器将完全从它们停止的地方恢复。
连接器停止一段时间
如果连接器被优雅地停止,数据库可以继续使用,任何新的更改都将记录在MySQL服务器的bi开云体育电动老虎机nlog中。当连接器重新启动时,它将在上次停止的地方继续读取MySQL binlog,记录连接器停止时所做的所有更改的更改事件。
一个正确配置的Kafka集群能够巨大的吞吐量.Kafka Connect是用Kafka最佳实践编写的,如果有足够的资源,它也能够处理大量的数据库更改事件。开云体育电动老虎机正因为如此,当一个连接器重新启动一段时间后,它很可能会赶上数据库,尽管多快将取决于Kafka的能力和性能以及对MySQL中的数据所做的更改量。开云体育电动老虎机
如果连接器停止的时间足够长,MySQL可能会清除旧的binlog文件,连接器的最后一个位置可能会丢失。在这种情况下,当连接器配置为最初的快照模式(默认)最终重新启动,MySQL服务器将不再有起点,连接器将执行初始快照。另一方面,如果连接器的快照模式被禁用,那么连接器将失败并报错。 |
部署连接器
如果您已经安装了动物园管理员,卡夫卡,卡夫卡连接,那么使用Debezium开云体育官方注册网址的MySQL连接器就很容易了。只需下载连接器插件存档,将jar文件解压到Kafka Connect环境中,并添加jar文件所在的目录Kafka Connect的类路径.重新启动Kafka Connect进程以获取新的jar。
如果你喜欢不可变容器,那就试试吧开云体育官方注册网址Debezium的Docker图片对于Zookeeper, Kafka和Kafka连接MySQL连接器已经预安装并准备好了。我们的教程甚至引导您使用这些图像,这是了解Debezium的一个很好的方法。开云体育官方注册网址你甚至可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium.
要使用连接器为特定的MySQL服务器或集群产生更改事件,只需创建一个MySQL连接器配置文件并使用Kafka连接REST API将该连接器添加到Kafka Connect集群。当连接器启动时,它将获取MySQL服务器中数据库的一致快照,并开始读取MySQL binlog,为每个插入、更新和删除的行生成事件。开云体育电动老虎机连接器可以选择使用应用的DDL语句生成事件,您甚至可以选择为数据库和表的一个子集生成事件。开云体育电动老虎机可选地忽略、屏蔽或截断敏感、过大或不需要的列。
监控
Kafka, Zookeeper和Kafka Connect都有内置支持用于JMX度量。MySQL连接器还发布了许多关于连接器活动的度量,这些活动可以通过JMX进行监视。连接器有两种类型的指标。快照度量可以帮助您监视快照活动,并且在连接器执行快照时可用。当连接器读取MySQL Binlog时,Binlog度量可以帮助您监视进度和活动。
快照指标
MBean: 开云体育官方注册网址debezium.mysql: type = connector-metrics上下文=快照,server =<开云体育电动老虎机 database.server.name >
属性名称 | 类型 | 描述 |
---|---|---|
|
|
快照中包含的表的总数。 |
|
|
快照尚未复制的表数。 |
|
|
连接器当前是否持有全局或表写锁。 |
|
|
快照是否启动。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。 |
|
|
映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。 |
|
|
连接器读取的最后一个快照事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在快照阅读器和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在快照阅读器和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
Binlog指标
MBean: 开云体育官方注册网址debezium.mysql: type = connector-metrics、上下文= binlog, server =<开云体育电动老虎机 database.server.name >
属性名称 | 类型 | 描述 |
---|---|---|
|
|
标志,表示连接器当前是否连接到MySQL服务器。 |
|
|
连接器最近读取的binlog文件名的名称。 |
|
|
连接器已读取的binlog中的最近位置(以字节为单位)。 |
|
|
标志,表示连接器当前是否跟踪来自MySQL服务器的gtid。 |
|
|
连接器在读取binlog时看到的最新GTID集的字符串表示形式。 |
|
|
连接器读取的最后一个binlog事件。 |
|
|
自连接器读取并处理最近事件以来的秒数。 |
|
|
最后一个事件的MySQL时间戳和连接器处理它之间的秒数。这些值将包含运行MySQL服务器和MySQL连接器的机器上的时钟之间的任何差异。 |
|
|
最后一个事件的MySQL时间戳和连接器处理它之间的毫秒数。这些值将包含运行MySQL服务器和MySQL连接器的机器上的时钟之间的任何差异。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
MySQL连接器跳过的事件数。通常情况下,由于MySQL binlog中的错误或无法解析的事件,事件会被跳过。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
MySQL连接器断开连接的次数。 |
|
|
上次接收事件的坐标。 |
|
|
最后处理的事务的事务标识符。 |
|
|
连接器读取的最后一个binlog事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
Debezium监视的表的列表。开云体育官方注册网址 |
|
|
用于在binlog阅读器和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在binlog阅读器和Kafka连接主循环之间传递事件的队列的空闲容量。 |
|
|
提交的已处理事务的数量。 |
|
|
回滚且未流化的已处理事务的数量。 |
|
|
未符合预期协议的事务数 |
|
|
未放入前瞻性缓冲区的事务数。应该明显小于 |
注意:只有启用binlog事件缓冲时,与事务相关的属性才可用-请参阅binlog.buffer.size
欲知详情
架构历史指标
MBean: 开云体育官方注册网址debezium.mysql: type = connector-metrics、上下文= schema-history, server =<开云体育电动老虎机 database.server.name >
属性名称 | 类型 | 描述 |
---|---|---|
|
|
之一 |
|
|
恢复开始的时间(以epoch秒为单位)。 |
|
|
在恢复阶段读取的更改数。 |
|
|
在恢复和运行时期间应用的模式更改总数。 |
|
|
自上次更改从历史存储区恢复以来所经过的毫秒数。 |
|
|
自应用最后一次更改以来所经过的毫秒数。 |
|
|
从历史存储中恢复的最后一次更改的字符串表示形式。 |
|
|
最后应用的更改的字符串表示形式。 |
示例配置
使用MySQL连接器非常简单。下面是一个MySQL连接器的配置示例,它监视位于192.168.99.100上3306端口的MySQL服务器,我们在逻辑上将其命名为192.168.99.100fullfillment
:
{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.mysql. mysqlconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“3306”,(4)“开云体育电动老虎机数据库。user": "debezium",(5)“开云体育电动老虎机数据库。密码”:“dbz”,(6)“开云体育电动老虎机database.server。id”:“184054”,(7):开云体育电动老虎机“database.server.name fullfillment”,(8)“开云体育电动老虎机数据库。白名单”:“库存”,(9)“开云体育电动老虎机database.history.kafka.bootstrap。服务器”:“卡夫卡:9092”,(10)“开云体育电动老虎机database.history.kafka。来pic": "dbhistory.fullfillment",(11)“include.schema。变化”:“真正的”(12)}}
1 | 当我们向Kafka Connect服务注册连接器时,连接器的名称。 |
2 | 这个MySQL连接器类的名称。 |
3. | MySQL服务器地址。 |
4 | MySQL服务器的端口号。 |
5 | 属性的MySQL用户名需要的特权. |
6 | 的MySQL用户的密码需要的特权. |
7 | 连接器的标识符,在MySQL集群中必须是唯一的,并且类似于MySQL的标识符服务器id 配置属性。 |
8 | MySQL服务器/集群的逻辑名称,它形成了一个名称空间,并用于连接器写入的所有Kafka主题的名称、Kafka Connect模式名称以及相应Avro模式的名称空间中Avro连接器使用。 |
9 | 此连接器将监视的由此服务器托开云体育电动老虎机管的所有数据库的列表。这是可选的,还有其他属性用于列出要从监视中包含或排除的数据库和表。开云体育电动老虎机 |
10 | 这个连接器将使用的Kafka代理列表,用于将DDL语句写入并恢复到数据库历史主题。开云体育电动老虎机 |
11 | 的名称开云体育电动老虎机数据库历史主题连接器将写入并恢复DDL语句。此主题仅供内部使用,消费者不应使用。 |
12 | 对象上应生成连接器的标志架构更改主题命名fullfillment 事件与DDL改变了这一点可以被消费者使用。 |
看到连接器属性的完整列表可以在这些配置中指定。
这个配置可以通过POST发送到正在运行的Kafka Connect服务,然后该服务将记录配置并启动一个连接器任务,该任务将连接到MySQL数据库,读取binlog,并将事件记录到Kafka主题。开云体育电动老虎机
连接器属性
以下配置属性为要求除非有默认值可用。
财产 | 默认的 | 描述 |
---|---|---|
|
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。) |
|
|
连接器的Java类的名称。始终使用值 |
|
|
|
应该为此连接器创建的最大任务数。MySQL连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。 |
|
MySQL数据库服务器的IP地址或主机名。开云体育电动老虎机 |
|
|
|
MySQL数据库服务器的整型端口号。开云体育电动老虎机 |
|
连接MySQL数据库服务器时使用的M开云体育电动老虎机ySQL数据库名称。 |
|
|
连接MySQL数据库服务器时使用的密码。开云体育电动老虎机 |
|
|
逻辑名称,用于标识并提供被监视的特定MySQL数据库服务器/集群的名称空间。开云体育电动老虎机逻辑名在所有其他连接器中应该是唯一的,因为它被用作从该连接器发出的所有Kafka主题名的前缀。 |
|
|
随机 |
这个数据库客户端的数字ID,在MySQL集开云体育电动老虎机群中所有当前运行的数据库进程中必须是唯一的。这个连接器作为另一个服务器(使用这个唯一的ID)加入MySQL开云体育电动老虎机数据库集群,因此它可以读取binlog。默认情况下,生成一个介于5400和6400之间的随机数,尽管我们建议设置一个显式值。 |
|
Kafka主题的全称,连接器将在其中存储数据库模式历史。开云体育电动老虎机 |
|
|
一个主机/端口对列表,连接器将使用它建立到Kafka集群的初始连接。此连接将用于检索以前由连接器存储的数据库模式历史,并用于写入从源数据库读取的每个DDL语开云体育电动老虎机句。这应该指向Kafka Connect进程使用的相同的Kafka集群。 |
|
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,匹配要监控的数据库名称;开云体育电动老虎机任何未包开云体育电动老虎机含在白名单中的数据库名称都将被排除在监视之外。默认情况下,将监视所有数据库。开云体育电动老虎机不得与 |
|
空字符串 |
可选的逗号分隔的正则表达式列表,匹配要排除在监视之外的数据库名称;开云体育电动老虎机任何未包开云体育电动老虎机含在黑名单中的数据库名称都将被监控。不得与 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,它匹配要监控的表的完全限定表标识符;任何不在白名单中的表都将被排除在监控之外。每个标识符都是这样的开云体育电动老虎机数据库名.的表.默认情况下,连接器将监视每个被监视数据库中的每个非系统表。开云体育电动老虎机不得与 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,它与要排除在监视之外的表的完全限定表标识符匹配;任何不在黑名单中的表都将被监控。每个标识符都是这样的开云体育电动老虎机数据库名.的表.不得与 |
|
空字符串 |
一个以逗号分隔的可选正则表达式列表,该列表与应从更改事件消息值中排除的列的完全限定名称匹配。列的完全限定名的格式为开云体育电动老虎机数据库名.的表.columnName,或开云体育电动老虎机数据库名.schemaName.的表.columnName. |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与基于字符的列的完全限定名匹配,如果字段值长于指定的字符数,则这些列的值应在更改事件消息值中被截断。可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数。列的完全限定名的格式为开云体育电动老虎机数据库名.的表.columnName,或开云体育电动老虎机数据库名.schemaName.的表.columnName. |
|
N/A |
一个以逗号分隔的可选正则表达式列表,该列表与基于字符的列的完全限定名称匹配,这些列的值应在更改事件消息值中被替换为由指定数量的星号( |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与列的完全限定名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模式中。模式参数 |
|
|
时间、日期和时间戳可以用不同的精度表示,包括: |
|
|
指定连接器应如何处理的值 |
|
|
指定BIGINT UNSIGNED列应该如何在更改事件中表示,包括: |
|
|
布尔值,指定连接器是否应该将数据库模式中的更改发布到与数据库服务器ID同名的Kafka主题。开云体育电动老虎机每个模式更改将使用一个包含数据库名称的键记录,该键的值包括DDL语句。开云体育电动老虎机这与连接器内部记录数据库历史的方式无关。开云体育电动老虎机默认为 |
|
|
布尔值,该值指定连接器是否应包括生成更改事件的原始SQL查询。 |
|
|
指定连接器在反序列化binlog事件期间应对异常的方式。 |
|
|
指定连接器应该如何响应与表相关的binlog事件,这些事件不存在于内部模式表示中(即内部表示与数据库不一致)开云体育电动老虎机 |
|
|
正整数值,指定阻塞队列的最大大小,从数据库日志中读取的更改事件在写入Kafka之前被放置在其中。开云体育电动老虎机例如,当写入Kafka较慢或Kafka不可用时,该队列可以为binlog阅读器提供反压力。出现在队列中的事件不包括在此连接器定期记录的偏移量中。属性中指定的最大批处理大小,默认值为8192 |
|
|
正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。 |
|
|
正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|
|
一个正整数值,指定连接器在尝试连接到MySQL数据库服务器后超时前应该等待的最大时间(以毫秒为单位)。开云体育电动老虎机默认为30秒。 |
|
一个用逗号分隔的正则表达式列表,它匹配GTID集中的源uuid,用于查找MySQL服务器中的binlog位置。只有源匹配其中一个包含模式的GTID范围才会被使用。不得与 |
|
|
一个用逗号分隔的正则表达式列表,它匹配GTID集中的源uuid,用于查找MySQL服务器中的binlog位置。只有源不匹配这些排除模式的GTID范围才会被使用。不得与 |
|
|
|
当设置为 |
|
|
控制是否在删除事件之后生成墓碑事件。 |
|
空字符串 |
与完全限定的表和列匹配以映射主键的正则表达式的分号列表。 |
以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。
财产 | 默认的 | 描述 |
---|---|---|
|
|
一个布尔值,指定是否应该使用一个单独的线程来确保与MySQL服务器/集群的连接保持活跃。 |
|
|
布尔值,指定是否应忽略内置系统表。无论表是白名单还是黑名单,这都适用。默认情况下,系统表被排除在监视之外,当对任何系统表进行更改时,不会生成任何事件。 |
|
|
一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认值是100ms。 |
|
|
在连接器恢复失败并发生错误之前,连接器应尝试读取持久历史数据的最大次数。接收不到数据后等待的最大时间为 |
|
|
布尔值,指定连接器是否应忽略格式错误或未知的数据库语句,或停止处理并让操作员修复问题。开云体育电动老虎机安全默认值为 |
|
|
布尔值,指定连接器是否应该记录所有DDL语句或(当 |
|
|
指定是否使用加密连接。默认为 的 的 的 的 |
|
0 |
binlog读取器使用的预读缓冲区的大小。 |
|
|
指定在连接器启动时运行快照的条件。默认为
|
|
|
控制连接器在执行快照时是否保持全局MySQL读锁(防止对数据库的任何更新)以及保持多长时间。开云体育电动老虎机有三个可能的值
|
|
控制快照中将包括表中的哪些行。 |
|
|
|
在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行产生读取事件。这个参数决定MySQL连接是否将一个表的所有结果拉入内存(这很快,但需要大量内存),还是将结果流化(可能较慢,但适用于非常大的表)。该值指定在连接器将结果流传输之前表必须包含的最小行数,默认为1,000。将此参数设置为“0”将跳过所有表大小检查,并在快照期间始终流所有结果。 |
|
|
控制心跳消息的发送频率。 |
|
|
控制要向其发送心跳消息的主题的命名。 |
|
当建立到数据库的JDBC连接(不是事务日志读取连接)时,将执行的SQL语句的分号列表。开云体育电动老虎机使用双分号(';;')将分号用作字符而不是分隔符。 |
|
|
连接器启动后在快照之前应该等待的间隔(以毫秒为单位); |
|
|
指定在进行快照时应一次性从每个表读取的最大行数。连接器将以这个大小的多个批次读取表内容。 |
|
|
MySQL允许用户插入2位或4位的年份值。如果是两位数字,值将自动映射到1970 - 2069范围。这通常由数据库完成。开云体育电动老虎机 |
|
|
v2 |
的架构版本 |
|
|
是否对字段名进行消毒以符合Avro命名要求。看到Avro命名欲知详情。 |
连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。属性开头的所有连接器配置属性开云体育电动老虎机database.history.producer。
在创建写入数据库历史的Kafka生产者时使用前缀(不带前缀),以及所有以前缀开头的生产者开云体育电动老虎机开云体育电动老虎机database.history.consumer。
在创建Kafka消费者时使用(没有前缀),在连接器启动时读取数据库历史。开云体育电动老虎机
例如,可以使用以下连接器配置属性安全连接到Kafka代理:
除了直通到Kafka生产者和消费者,属性从开云体育电动老虎机数据库。
,如。开云体育电动老虎机database.tinyInt1isBit = false
传递给JDBC URL。
开云体育电动老虎机database.history.producer.security。协议SSL databas开云体育电动老虎机e.history.producer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.producer.ssl.keystore。密码= test1234 datab开云体育电动老虎机ase.history.producer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks 开云体育电动老虎机database.history.producer.ssl.truststore。密码= test1234 datab开云体育电动老虎机ase.history.producer.ssl.key。密码= test1234 datab开云体育电动老虎机ase.history.consumer.security。协议SSL databas开云体育电动老虎机e.history.consumer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks 开云体育电动老虎机database.history.consumer.ssl.keystore。密码= test1234 datab开云体育电动老虎机ase.history.consumer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks 开云体育电动老虎机database.history.consumer.ssl.truststore。密码= test1234 datab开云体育电动老虎机ase.history.consumer.ssl.key.password = test1234