这篇文章最初发表在WePay工程博客

变更数据捕获它已经存在了一段时间,但最近一些技术的发展赋予了它新的生命。值得注意的是,使用卡夫卡作为一个骨干流,您的数据库数据实时已成为开云体育电动老虎机越来越普遍

如果你想知道为什么你想把数据库的变化流到Kafka中,我强烈建议你阅读开云体育电动老虎机微服务最难的部分:你的数据.在WePay,我们希望将微服务和下游数据存储相互集成,这样每个系统都可以访问所需的数据。我们使用Kafka作为我们的数据集成层,所以我们需要一种方法将我们的数据库数据放入其中。开云体育电动老虎机

去年,Yelp的工程团队出版了一本优秀的系列职位他们的数据管道。其中包括讨论他们如何流MySQL数据到Kafka.值得注意的是,他们的架构包括一系列自行开发的软件来完成这项任务系统化而且MySQL流光.这篇文章在Debezium的博客上引发了一篇发人深思的文章,讨论了一个提议的等效架构使用开云体育官方注册网址卡夫卡连接开云体育官方注册网址,Confluent的模式注册表.这个提议的架构是我们在WePay实现的,这篇文章描述了我们如何利用Debezium和Kafka连接将我们的MySQL数据库流到Kafka中。开云体育官方注册网址开云体育电动老虎机

体系结构

数据流从每个微服务的MySQL数据库开始。开云体育电动老虎机这些数据库运开云体育电动老虎机行在谷歌云作为CloudSQLMySQL实例启用gtid.我们已经专门为Debezium建立了一个下游MySQL集群。开云体育官方注册网址每个CloudSQL实例将其数据复制到Debezium集群中,该集群由两台MySQL机器组成:一个主(活开云体育官方注册网址动)服务器和一个辅助(被动)服务器。这个单一的Debeziu开云体育官方注册网址m集群是一个操作技巧,使我们更容易操作Debezium。我们可以只连接一个数据库,而不是让De开云体育官方注册网址bezium直接连接到几十个微服务数据库。开云体育电动老虎机这也使Debezium不会影响主Clo开云体育官方注册网址udSQL实例正在处理的生产OLTP工作负载。

我们运行一个Debez开云体育官方注册网址ium连接器(在分布式模式在Kafka连接框架)为每个微服务数据库。开云体育电动老虎机同样,这里的目标是隔离。理论上,我们可以运行单个Debezium连接器,为所有数据库生成消息(因为开云体育官方注册网址所有微服务数据库都在Debezium集群中)。开云体育电动老虎机这种方法实际上可以更有效地利用资源,因为每个Debezium连接器都必须读取MySQL的全部数据开云体育官方注册网址binlog无论如何。我们选择不这样做,因为我们希望能够上下移动Debezium连接器,并为每个微服务DB配置不同的连接器。开云体育官方注册网址

Debe开云体育官方注册网址zium连接器将MySQL消息提供给Kafka(并将它们的模式添加到Confluent模式注册表中),下游系统可以在Kafka中使用它们。我们使用卡夫卡连接BigQuery连接器将MySQL数据加载到BigQuery中流API.这为我们提供了BigQuery中的数据仓库,通常比生产中的数据晚30秒。其他微服务、流处理器和数据基础设施也使用提要。

开云体育官方注册网址Debezium架构”>
           </div>
          </div>
         </div>
        </div>
        <div class=

开云体育官方注册网址

本文的其余部分将重点介绍Debezium(上图中的DBZ框),以及我们如何配置和操开云体育官方注册网址作它。开云体育官方注册网址Debezium通过连接MySQL并假装是一个副本来工作。MySQL将它的复制数据发送到Debezium,认为它实际上是在向另一个开云体育官方注册网址下游MySQL实例输送数据。开云体育官方注册网址然后Debezium获取数据,将模式从MySQL模式转换为Kafka连接结构,并将它们转发给卡夫卡。

添加新数据库开云体育电动老虎机

当一个带有CloudSQL数据库的新微服务上线时,我们希望将数据导入Kafka。开云体育电动老虎机该过程的第一步是将数据加载到Debezium MySQL集群中。开云体育官方注册网址这包括几个步骤:

  1. 在微服务数据库中转储MySQL数据。

  2. 暂停备用Debezium MySQL D开云体育官方注册网址B。

  3. 将MySQL转储加载到第二个Debezium MySQL DB中。开云体育官方注册网址

  4. 重置GTID_PURGED参数来包含新的DB转储中的GTID。

  5. 取消暂停从Debezium MySQL DB开云体育官方注册网址。

  6. 更新HA Proxy以指向辅助服务器,辅助服务器现在变成了主服务器。

  7. 对旧的主实例(现在是辅助实例)执行步骤2-5。

我们实际运行的命令是:

$ mydump——host=123.123.12开云体育电动老虎机3.123——port=3306——user=foo——password=********* -B log——trx-consistency-only——triggers——routines -o /mysqldata/new_db/ -c -L mydump .log #(2)停止从Debezium集群的所有复制。开云体育官方注册网址$ mysql> STOP SLAVE channel 'foo';$ mysql> STOP SLAVE channel 'bar';$ mysql> STOP SLAVE channel 'baz';#从MySQL中获取当前GTID清除的值。$ mysql>显示全局变量像'%gtid_purged%';#(3)将数据库转储加载到Debezium集群中。开云体育官方注册网址开云体育电动老虎机$ myloader -d /mysqldata/new_db/ -s new_db #清除现有的GTID_PURGED值,以便我们可以覆盖它,以包括新的转储文件中的GTID。$ mysql>重置主#设置新的GTID_PURGED值,包括MySQL转储文件中的GTID_PURGED值。 $ mysql> set global GTID_PURGED="f3a44d1a-11e6-44ba-bf12-040bab830af0:1-10752,c627b2bc-b36a-11e6-a886-42010af00790:1-9052,01261abc3-6ade-11e6-9647-42010af0044a:1-375342"; # (5) Start replication for the new DB. $ mysql> CHANGE MASTER TO MASTER_HOST='123.123.123.123', MASTER_USER='REPLICATION_USER', MASTER_PASSWORD='REPLICATION_PASSWORD',MASTER_AUTO_POSITION=1 for CHANNEL 'new_db'; $ mysql> START SLAVE for channel 'new_db'; # Start replication for the DBs that we paused. $ mysql> START SLAVE for channel 'foo'; $ mysql> START SLAVE for channel 'bar'; $ mysql> START SLAVE for channel 'baz'; # Repeat steps 2-5 on the old primary (now secondary).

在这些步骤的最后,主Debezium MySQL服务器和辅助Debezium MySQL服务器都有了新的数据库。开云体育官方注册网址开云体育电动老虎机一旦完成,我们就可以向Kafka连接集群中添加一个新的Debezium连接开云体育官方注册网址器。这个连接器的配置大致如下所示:

的名字开云体育官方注册网址debezium-connector-microservice1配置: {的名字开云体育官方注册网址debezium-connector-microservice1connector.classio.开云体育官方注册网址debezium.connector.mysql.MySqlConnectortasks.max1开云体育电动老虎机database.hostnamedbz-mysql01开云体育电动老虎机database.port3306开云体育电动老虎机database.user用户开云体育电动老虎机database.password* * * * * * *开云体育电动老虎机database.server.id101开云体育电动老虎机database.server.namedb.开云体育官方注册网址debezium.microservice1gtid.source.includesc34aeb9e - 89 -广告- 11 - e6 - 877 - b - 42010 - a93af2d开云体育电动老虎机database.whitelistmicroservice1_dbpoll.interval.ms2table.whitelistmicroservice1_db.table1, microservice1_db.table2column.truncate.to.1024.charsmicroservice1_db.table1.text_col开云体育电动老虎机database.history.kafka.bootstrap.serverskafka01:9093、kafka02:9093 kafka03:9093开云体育电动老虎机database.history.kafka.topic开云体育官方注册网址debezium.history.microservice1开云体育电动老虎机database.ssl.truststore确实的事情/信任库开云体育电动老虎机database.ssl.truststore.password* * * * * * *开云体育电动老虎机database.ssl.mode要求开云体育电动老虎机database.history.producer.security.protocolSSL开云体育电动老虎机database.history.producer.ssl.truststore.location确实的事情/信任库开云体育电动老虎机database.history.producer.ssl.truststore.password* * * * * * *开云体育电动老虎机database.history.consumer.security.protocolSSL开云体育电动老虎机database.history.consumer.ssl.truststore.location确实的事情/信任库开云体育电动老虎机database.history.consumer.ssl.truststore.password* * * * * * *,}}

可以找到这些配置字段的详细信息在这里

新的连接器将启动并开始快照数据库,开云体育电动老虎机因为这是第一次启动。开云体育官方注册网址Debezium的快照实现(参见DBZ-31)使用的方法非常类似于MySQL的mysqldump工具。一旦快照完成,Debezium将切换到使用MySQL的binl开云体育官方注册网址og来接收所有未来的数据库更新。开云体育电动老虎机

Kafka connect和Debe开云体育官方注册网址zium一起工作,定期提交Debezium在MySQL binlog中的位置MySQL全局事务ID(GTID)。当Debe开云体育官方注册网址zium重新启动时,Kafka connect将给它最后一个提交的MySQL GTID, Debezium将从那里开始。

注意,提交只会周期性地发生,所以Debezium可能会从日志中它接收到的最后一行之前的位置启动开云体育官方注册网址。在这种情况下,您将在Debezium Kafka topic中观察到重复的消息。开云体育官方注册网址开云体育官方注册网址Debezium在向Kafka写入消息时保证至少发送一次消息。

高可用性

当我们第一次开始使用Debezium时,我们面临的困难之一是如何使它能够容忍机器故障(包括上游的MySQL服务器和De开云体育官方注册网址bezium本身)。5.6版之前的MySQL使用(binlog文件名,文件偏移量)元组在其父binlog中建模副本的位置。这种方法的问题是,MySQL机器之间的binlog文件名不相同。这意味着从上游MySQL机器1读取的副本不能轻易地故障转移到MySQL机器2。有一个完整的工具生态系统(包括尼古拉斯)来尝试解决这个问题。

从MySQL 5.6开始,MySQL引入了全局事务id的概念。这些gtid标识MySQL binlog中的特定位置在机器.这意味着从一个MySQL服务器上的binlog读取数据的用户可以切换到另一个服务器,前提是两个服务器都有可用的数据。这就是我们运行系统的方式。CloudSQL实例和Debezium MySQL集群都在启用gtid的开云体育官方注册网址情况下运行。Debe开云体育官方注册网址zium MySQL服务器也启用了复制binlog,这样Debezium就可以读取binlog(默认情况下副本通常不启用binlog)。所有这些使得Debezium可以从主De开云体育官方注册网址bezium MySQL服务器进行消费,但是如果出现故障,则可以切换到辅助服务器(通过HA代理)。

如果Debezium本身运行的机器出现故开云体育官方注册网址障,那么Kafka连接框架就会将连接器转移到集群中的另一台机器上。当故障转移发生时,Debezium从Kafka co开云体育官方注册网址nnect接收它的最后一次提交偏移量(GTID),并从它停止的地方开始(与上面的警告相同:由于周期性提交频率,您可能会看到一些重复的消息)。

需要调用的一个重要配置是gtid.source.includes我们在上面设置的字段。当我们第一次设置架构部分中描述的拓扑时,我们发现不能从主Debezium DB故障转移到辅助DB,即使它们都复制完全相同的数据。开云体育官方注册网址这是因为,除了主计算机和辅助计算机正在复制的各种上游db的gtid之外,每台计算机都有自己的gtid自己的它的各种MySQL数据库的服务器UUID(例如information开云体育电动老虎机_schema)。这两个服务器中有不同的UUID,这一事实导致MySQL在触发故障转移时感到困惑,因为Debezium的GTID将包括主服务器的服务器UUID,而辅助服务器不知道。开云体育官方注册网址修复方法是从GTID中过滤掉我们不关心的所有uuid。每个Deb开云体育官方注册网址ezium连接器都会过滤掉所有服务器UUID,除了它所关心的微服务DB的UUID。这允许连接器从主连接器到辅助连接器发生故障而没有任何问题。此问题在上有详细的文档dbz - 129

模式

开云体育官方注册网址Debezium的消息格式包括一行的“之前”和“之后”版本。对于插入,“before”为空。对于删除,“after”为空。更新包含了“之前”和“之后”字段。消息还包括一些服务器信息,例如消息来自的服务器ID、消息的GTID、服务器时间戳,等等。

之前: {id1004first_name安妮last_nameKretchmar电子邮件annek@noanswer.org},: {id1004first_name安妮玛丽last_nameKretchmar电子邮件annek@noanswer.org},: {的名字mysql-server-1server_id223344ts_sec1465581gtid文件mysql-bin.000003pos4840快照},人事处uts_ms1465581029523

Debezium发送给Kafka的序列化格式是可配置的。开云体育官方注册网址我们更喜欢WePay的Avro,因为它体积小,模式DDL,性能好,生态系统丰富。我们已经配置Kafka连接使用ConfluentAvro编码器卡夫卡编解码器。这个编码器将消息序列化到Avro,但也将模式注册到Confluent的模式注册中心。

如果MySQL表的模式发生了变化,Debezium会通过更新事件消息的“be开云体育官方注册网址fore”和“after”部分的结构和模式来适应这种变化。在Avro编码器看来,这将是一个新的模式,在消息发送到Kafka之前,它将注册到模式注册表中。注册中心运行完整的兼容性检查,以确保下游使用者不会因模式演变而中断。

请注意,仍然有可能对MySQL模式本身进行不兼容的更改,这会破坏下游消费者。我们还没有将自动兼容性检查添加到MySQL表更改中。

未来的工作

庞大的数据库开云体育电动老虎机

除了微服务之外,我们还有一个比微服务数据库大得多的遗留单片数据库。开云体育电动老虎机我们正在升级这个集群,使其在启用gtid的情况下运行。一旦完成,我们计划用Debezium将这个集群复制到Kafka中。开云体育官方注册网址

大表快照

幸运的是,我们所有的微服务数据库都是相对易于管理的大小。开云体育电动老虎机我们的单片数据库有一些更大的表开云体育电动老虎机。我们还没有在非常大的表上测试Debezi开云体育官方注册网址um,所以还不清楚为了在初始Debezium负载时对这些表进行快照,是否需要任何调优或补丁。我们听到社区报告说,大表(60亿+行)可以工作,前提是在dbz - 152是集。这是我们马上要做的工作。

更多的监控

Kafka connect目前还不容易通过Kafka指标框架公开指标。因此,Kafka连接框架中可用的指标非常少。开云体育官方注册网址Debezium通过JMX公开指标(参见dbz - 134),但我们目前并没有将他们暴露在我们的指标系统中。我们确实监控系统,但当出现问题时,很难确定发生了什么。卡夫卡- 2376是开放的JIRA,旨在解决底层的Kafka连接问题。

多个数据库开云体育电动老虎机

当我们添加更多的微服务数据库时,我们将开始对现有的两台D开云体育电动老虎机ebezium MySQL服务器施加压力。开云体育官方注册网址最终,我们计划将现有的单个Debezium集群拆分为多个集群,其中一些微服务只复开云体育官方注册网址制到一个集群,而其余的则复制到其他集群。

统一兼容性检查

正如我在上面的模式一节中提到的,Confluent模式注册中心现在运行模式兼容性检查。这使得我们可以很容易地防止前后不兼容的更改进入Kafka。我们目前在MySQL层还没有相应的检查。这是一个问题,因为这意味着DBA可能在MySQL层进行不兼容的更改。开云体育官方注册网址Debezium在尝试将新消息生成到Kafka时将失败。我们需要通过在MySQL层添加相同的检查来确保这种情况不会发生。dbz - 70讨论更多。

自动主题配置

我们目前运行的Kafka主题自动创建,默认为6个分区,并基于时间/大小保留。这种配置对于Debezium主题没有太大意义。开云体育官方注册网址至少,他们应该使用日志压缩作为他们的保留。我们计划编写一个脚本,查找配置错误的Debezium主题,并将它们更新为适当的保留设置。开云体育官方注册网址

结论

在过去的8个月里,我们一直在生产中运行开云体育官方注册网址Debezium。最初,我们将其暗运行,然后为上面的架构图所示的实时BigQuery管道启用它。最近,我们开始在微服务和流处理系统中使用消息。我们期待添加更多的数据到管道中,并解决在会议中提出的一些问题未来的工作部分。

特别感谢兰德尔Hauch他在解决许多错误修复和功能请求方面发挥了不可估量的作用。

克里斯Riccomini

Chris是WePay的首席软件工程师。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题