大多数情况下,Debezium被用于开云体育官方注册网址流数据更改Apache卡夫卡.如果你使用的是另一个流媒体平台,比如Apache脉冲星或者基于云的解决方案,例如亚马逊运动Azure事件中心诸如此类?你还能从Debezium强大的变更数据捕获(CDC)功开云体育官方注册网址能中获益,并从数据库(如MySQL、Postgres、SQL Server等)中摄取变更吗?开云体育电动老虎机

事实证明,只需一点胶水代码,就可以!在接下来的文章中,我们将讨论如何使用Debezium捕获MySQL数据库中的更改,并开云体育官方注册网址将更改事件流到Kinesis中,Kinesis是Amazo开云体育电动老虎机n云上提供的一个完全托管的数据流服务。

介绍Debezium嵌入式引擎开云体育官方注册网址

开云体育官方注册网址Debezium是作为Kafka的一组连接器实现的,因此通常通过卡夫卡连接.但是Debezium中有一颗小宝石还不为人所知,那就是开云体育官方注册网址嵌入式引擎

当使用这个引擎时,Debezium连接器不是在Kafka开云体育官方注册网址 Connect中执行的,而是作为一个库嵌入到你自己的Java应用程序中。为此目的,开云体育官方注册网址debezium-embedded模块提供了一个小的运行环境来执行原本由Kafka Connect框架处理的任务:从连接器请求更改记录,提交偏移量等。连接器产生的每个更改记录都被传递给一个配置的事件处理程序方法,在我们的例子中,该方法将记录转换为它的JSON表示,并使用Kinesis Java API将其提交给Kinesis流。

整体架构如下所示:

开云体育官方注册网址Debezium嵌入式引擎流到亚马逊Kinesis">
           </div>
          </div>
          <div class=

现在让我们浏览一下所需代码的相关部分。完整的可执行示例可以在开云体育官方注册网址debezium-examples在GitHub上回购。

设置

为了使用Debezium的嵌入式开云体育官方注册网址引擎,需要添加开云体育官方注册网址debezium-embedded依赖项以及您选择的Debezium连接器到您的项目开云体育官方注册网址pom.xml.下面我们将使用MySQL连接器。类型的依赖项也需要添加Kinesis客户端API,所以这些是需要的依赖项:

...< >的依赖< groupId >io.开云体育官方注册网址debezium< / groupId >< artifactId >开云体育官方注册网址debezium-embedded< / artifactId ><版本>0.8.3.Final> < /版本< / >的依赖< >的依赖< groupId >io.开云体育官方注册网址debezium< / groupId >< artifactId >开云体育官方注册网址debezium-connector-mysql< / artifactId ><版本>0.8.3.Final> < /版本< / >的依赖< >的依赖< groupId >com.amazonaws< / groupId >< artifactId >amazon-kinesis-client< / artifactId ><版本>1.9.0> < /版本< / >的依赖...

配置嵌入式引擎

的实例配开云体育官方注册网址置Debezium嵌入式引擎io.开云体育官方注册网址debezium.config.Configuration.该类可以从系统属性或给定的配置文件中获取值,但为了示例起见,我们将简单地通过其fluent构建器API传递所有所需的值:

配置配置=配置共创();(EmbeddedEngine.CONNECTOR_CLASS,io.开云体育官方注册网址debezium.connector.mysql.MySqlConnector) (EmbeddedEngine.ENGINE_NAME运动) (MySqlConnectorConfig.SERVER_NAME运动) (MySqlConnectorConfig.SERVER_ID8192) (MySqlConnectorConfig.HOSTNAME本地主机) (MySqlConnectorConfig.PORT3306) (MySqlConnectorConfig.USER开云体育官方注册网址) (MySqlConnectorConfig.PASSWORDdbz) (MySqlConnectorConfig.DATAB开云体育电动老虎机ASE_WHITELIST库存) (MySqlConnectorConfig.TABLE_WHITELISTinventory.customers) (EmbeddedEngine.OFFSET_STORAGEorg.apache.kafka.connect.storage.MemoryOffsetBackingStore) (MySqlConnectorConfig。开云体育电动老虎机DATABASE_HISTORY,MemoryDatabaseHistory.class.getName()) .with(schemas.enable) .build ();

如果您曾经在Kafka Connect中设置过Deb开云体育官方注册网址ezium MySQL连接器,那么大多数属性对您来说都很熟悉。

我们来讨论一下OFFSET_STORAGE而且开云体育电动老虎机DATABASE_HISTORY更详细的选项。它们处理如何持久化连接器偏移量和数据库历史。开云体育电动老虎机当通过Kafka Connect运行连接器时,两者通常都存储在特定的Kafka主题中。但这不是一个选择,所以需要一个替代方案。对于本例,我们只是将偏移量和数据库历史保存在内存中。开云体育电动老虎机例如,如果引擎重新启动,该信息将丢失,连接器将从头开始,例如使用一个新的初始快照。

的替代实现并不太难,但这超出了本文的范围OffsetBackingStore而且开云体育电动老虎机DatabaseHistory合同,分别。例如,如果您完全使用AWS云服务,您可以考虑将偏移量和数据库历史存储在DynamoDB NoSQL存储中。开云体育电动老虎机注意,与Kafka不同,Kinesis流不适合存储数据库历史。开云体育电动老虎机原因是,Kinesis数据流的最大保留期是7天,而数据库历史必须在连接器的整个生命周期内保持。开云体育电动老虎机另一种替代方法是使用现有的基于文件系统的实现FileOffsetBackingStore而且File开云体育电动老虎机DatabaseHistory,分别。

下一步是构建一个EmbeddedEngine实例。同样,这是使用一个流畅的API完成的:

EmbeddedEngine引擎= EmbeddedEngine.create() .using(config) .using(.getClass().getClassLoader()) .using(Clock.SYSTEM) .notifying(:: sendRecord) .build ();

这里最有趣的部分是通知调用。这里传递的方法是引擎将为每个发出的数据更改记录调用的方法。让我们来看看这个方法的实现。

向Kinesis发送变更记录

sendRecord ()方法是魔法发生的地方。我们将转换输入SourceRecord转换成等效的JSON表示,并将其传播到Kinesis流。

因此,理解Apache Kafka和Kinesis之间的一些概念差异是很重要的。具体来说,Kafka中的消息有一个关键和一个价值(它们都是任意的字节数组)。在Debezium中,开云体育官方注册网址数据更改事件的键表示受影响记录的主键,值是由新旧行状态以及一些附加元数据组成的结构。

另一方面,在Kinesis中,消息包含一个blob数据(同样是任意字节序列)和分区键.Kinesis流可以分成多个分片,分区键用于确定给定的消息应该进入哪个分片。

现在可以考虑将Debezium的更改数据事件中的键映射到Kinesis分区键,但是分区键开云体育官方注册网址的长度被限制为256字节。根据捕获的表中主键列的长度,这可能还不够。因此,更安全的选择是从更改消息键创建一个散列值,并将其用作分区键。这反过来意味着更改消息键结构应该添加到Kinesis消息的数据blob的实际值旁边。虽然键列值本身也是值结构的一部分,但使用者不知道哪些列组成了主键。

考虑到这一点,让我们来看看sendRecord ()实现:

私人无效sendRecord(SourceRecord记录){//我们只对数据事件感兴趣,而不是模式更改事件如果(record.topic () .equals (运动)) {返回;}//创建一个包含键*和值*的容器模式模式schema = SchemaBuilder.struct() .field(关键, record.keySchema()) .field(价值, record.valueSchema()) .build();结构体消息=结构体(模式);message.put (关键record.key ());message.put (价值record.value ());//通过哈希记录的键来创建分区键字符串partitionKey =字符串.valueOf(record.key() !=?record.key().hashCode(): -1);//使用Kafka Connect's创建代表容器的数据blob// JSON转换器最后字节[]有效载荷= valueConverter.fromConnectData(, schema, message);//组装放置记录请求…PutRecordRequest putRecord =PutRecordRequest ();putRecord.setStreamName (record.topic ());putRecord.setPartitionKey (partitionKey);putRecord.setData (ByteBuffer.wrap(载荷));/ /……然后执行kinesisClient.putRecord (putRecord);}

代码非常简单;如上所述,它首先创建一个包含key的容器结构而且传入源记录的值。然后使用Kafka Connect提供的JSON转换器将该结构转换为二进制表示JsonConverter).然后一个PutRecordRequest由blob、分区键和更改记录的主题名组装而成,最后发送到Kinesis。

Kinesis客户端对象可以重用,设置一次,如下所示:

//使用本地“默认”AWS配置文件中的凭据AWSCredentialsProvider credentialsProvider =ProfileCredentialsProvider (默认的);.kinesisClient = AmazonKinesisClientBuilder.standard() .withCredentials(credentialsProvider) .withRegion(eu-central-1//使用您的AWS区域.build ();

这样,我们就建立了Debezium的一个实例开云体育官方注册网址EmbeddedEngine它运行已配置的MySQL连接器,并将每个发出的更改事件传递给Amazon Kinesis。最后一个缺少的步骤是实际运行引擎。这是在一个单独的线程上使用遗嘱执行人例如:像这样:

ExecutorService遗嘱执行人=执行人.newSingleThreadExecutor ();executor.execute(引擎);

注意,你还应该确保正确关闭引擎最终。怎样才能做到呢显示的是在附带的例子中开云体育官方注册网址debezium-examples回购。

运行示例

最后,让我们看看如何运行完整的示例并从Kinesis流中消费Debezium CDC事件。开云体育官方注册网址首先克隆示例存储库并转到运动目录:

Git克隆https://github.com/debez开云体育官方注册网址ium/debezium-examples.git CD debezium-examples/kinesis

确保你已经遇到了先决条件在示例中描述README.md;最重要的是,你应该有一个本地Docker安装,你需要设置一个AWS帐户,并安装AWS客户端工具。请注意,在AWS注册时,Kinesis不是免费层的一部分,即在执行示例时,您将支付(少量)钱。不要忘记删除您设置的流,我们不会为您支付AWS账单:)

现在运行Debe开云体育官方注册网址zium的MySQL示例数据库来处理一些数据:开云体育电动老虎机

docker run -it——rm——name mysql -p 3306:3306 \ -e MYSQL_ROOT_PASSWORD=de开云体育官方注册网址bezium \ -e MYSQL_USER=mysqluser \ -e MYSQL_PASSWORD=mysqlpw \ debezium/example-mysql:0.8

为变更事件创建一个Kinesis流客户表:

Aws kinesis create-stream——stream-name kinesis.inventory.customers \——碎片计数

执行运行Debezium嵌入式引擎的Java应用程序开云体育官方注册网址kinesis.region财产pom.xml先去你自己的地区):

mvn exec: java

这将启动引擎和MySQL连接器,它对捕获的数据库进行初始快照。开云体育电动老虎机

为了查看Kinesis流中的CDC事件,可以使用AWS CLI(通常,您需要实现Kinesis Streams应用程序来使用这些事件)。为此,设置一个碎片迭代器第一:

ITERATOR=$(aws kinesis get-分片- ITERATOR——stream-name kinesis.inventory.customers——分片-id 0——分片- ITERATOR -type TRIM_HORIZON | jq '.ShardIterator')

注意金桥实用工具用于从Kinesis API返回的JSON结构中获取迭代器生成的id。接下来,迭代器可以用来检查流:

aws kinesis get-records—分片迭代器$ITERATOR

你应该收到一个这样的记录数组:

记录: [{SequenceNumber49587760482547027816046765529422807492446419903410339842ApproximateArrivalTimestamp1535551896.475数据4 oti3mzn9 eyJiZWZvcm……PartitionKeyeyJpZCI6MTAwMX0 =},]}

数据元素是消息数据blob的base64编码表示形式。再一次金桥很方便:我们可以用它来提取数据并解码Base64表示(确保使用jq 1.6或更新版本):

aws kinesis get-records——fragment - ITERATOR $ITERATOR | \ jq -r '. records[]。数据| @base64d' | jq。

现在你应该看到JSON格式的更改事件,每个事件都有键和值:

关键: {id1001},价值: {之前: {id1001first_name莎莉last_name托马斯。电子邮件sally.thomas@acme.com},: {版本0.8.1.Final的名字运动server_id0ts_sec0gtid文件mysql-bin.000003pos1540快照真正的线程db库存表格客户查询},人事处cts_ms1535555325628}}

接下来,让我们尝试更新MySQL中的记录:

#启动MySQL CLI客户端docker命令-it——rm——name mysqlterm——link MySQL——rm MySQL:5.7 \ sh -c 'exec MySQL -h"$MYSQL_PORT_3306_TCP_ADDR" \ -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -P"$ MYSQL_ENV_MYSQL_ROOT_PASSWORD"' #在MySQL客户端使用目录;set first_name = 'Trudy' where id = 1001;

如果你现在再次获取迭代器,你应该看到一个表示更新的数据更改事件:

关键: {id1001},价值: {之前: {id1001first_name莎莉last_name托马斯。电子邮件sally.thomas@acme.com},: {id1001first_name特鲁迪last_name托马斯。电子邮件sally.thomas@acme.com},: {版本0.8.1.Final的名字运动server_id223344ts_sec1535627629gtid文件mysql-bin.000003pos3640快照线程10db库存表格客户查询},人事处uts_ms1535627622546}}

完成后,通过按Ctrl + C停止嵌入式引擎应用程序,通过运行停止MySQL服务器停止mysql并删除kinesis.inventory.customers流在运动。

总结与展望

在这篇博客文章中,我们已经演示了Debezium不仅可以用于将数据更改流到Apach开云体育官方注册网址e Kafka中,还可以用于其他流媒体平台,如Amazon Kinesis。利用它的嵌入式引擎并通过实现一些粘合代码,您可以从中受益所有CDC连接器以及它们的功能,并将它们开云体育官方注册网址连接到您所选择的流媒体解决方案。

我们正在考虑进一步简化Debezium的使用。开云体育官方注册网址我们不要求您实现自己的应用程序来调用嵌入式引擎API,而是考虑提供一个小型的自包含Debezium运行时,您可以简单地执行它。开云体育官方注册网址它将配置为源连接器,以运行并使用带有Kinesis、Apache Pulsar等现成实现的出站插件SPI。当然,这样的运行时也会提供合适的实现来安全地持久化偏移量和数据库历史,并且它还会提供监控、健康检查等手段。开云体育电动老虎机这意味着您可以以健壮可靠的方式将Debezium源连接器与首开云体育官方注册网址选的流媒体平台连接起来,而不需要任何手动编码!

如果你喜欢这个想法,那么请查看JIRA问题dbz - 651让我们知道你的想法,例如,在下面的评论区或在我们的网站上留言邮件列表

贡纳Morling

Gunnar是Decodable的软件工程师,也是一名不折不扣的开源爱好者。多年来,他一直是Debezium的项目负责人。开云体育官方注册网址Gunnar创建了kcctl、JfrUnit和MapStruct等开源项目,并且是Bean验证2.0 (JSR 380)的规范负责人。他在德国汉堡工作。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题