基于微服务的架构可以被认为是一种行业趋势,因此最近经常出现在企业应用程序中。在多个服务及其备份数据存储之间保持数据同步的一种可能方法是采用一种称为变更数据捕获简称CDC。

从本质上讲,CDC允许监听数据流一端(即数据源)发生的任何修改,并将其作为更改事件传递给其他相关方或将其存储到数据接收器中。建议在数据源和数据接收器之间解耦事件流,而不是以点到点的方式进行此操作。该场景的实现可以基于开云体育官方注册网址而且Apache卡夫卡相对轻松且有效地无需编码。

例如,考虑以下基于微服务的订单管理系统架构:

该系统包括三项服务,订单而且股票.如果订单服务接收到订单请求后,它将需要来自其他两个服务的信息,例如商品定义或特定商品的库存数量。CDC可用于为管理的数据设置更改事件流,而不是对这些服务进行同步调用以获取此信息而且股票服务。的订单服务可以订阅这些事件流,并在自己的数据库中保留相关项目和库存数据的本地副本。开云体育电动老虎机这种方法有助于解耦服务(例如,不受服务中断的直接影响),也有利于整体性能,因为每个服务都可以保留它感兴趣的其他服务所拥有的数据项的优化视图。

如何处理聚合对象?

然而,在一些用例中,事情有点棘手。有时,通过所谓的聚合(由领域驱动设计(DDD)定义的概念/模式)在服务和数据存储之间共享信息是有用的。一般来说,DDD总用于传输可以由多个不同的域对象组成的状态,这些域对象一起被视为单个信息单元。

具体例子有:

  • 客户及地址哪些被表示为客户记录存储客户和地址列表

  • 订单和相应的行项目它们被表示为一个订单记录存储订单及其所有行项

支持这些DDD聚合的相关域对象的数据很可能存储在RDBMS的单独关系中。当使用目前在Debezium中发现的CDC功能时,所有对域对象的更改都将被独立捕获,默认情况下最终反映在单独的Kafk开云体育官方注册网址a主题中,每个RDBMS关系一个主题。虽然这种行为对很多用例非常有帮助,但对其他用例却有很大的限制,比如上面描述的DDD聚合场景。因此,本文将探讨如何基于Debezium CDC事件构建DDD聚合,使用开云体育官方注册网址Kafka Streams API

从数据源捕获变更事件

本文的完整源代码在Debezium中提供开云体育官方注册网址实例库在GitHub上。首先克隆此存储库并更改为kstreams目录:

Git克隆https://github.com/debez开云体育官方注册网址ium/debezium-examples.git CD kstreams

该项目提供了一个Docker Compose文件,其中包含所有组件的服务,您可能已经从开云体育官方注册网址Debezium教程

此外,它还声明了以下服务:

  • MongoDB哪个将被用作数据接收器

  • 另一个Kafka Connect实例,它将托管MongoDB接收器连接器

  • 用于运行我们将在下面构建的DDD聚合流程的服务

我们稍后会讲到这三点,现在让我们先准备管道的源端:

export 开云体育官方注册网址DEBEZIUM_VERSION=0.7 docker-compose mysql zookeeper kafka connect_source

一旦所有服务启动,通过提交以下JSON文档注册一个Debezium MySQL连接器实例:开云体育官方注册网址

的名字mysql-source配置: {connector.classio.开云体育官方注册网址debezium.connector.mysql.MySqlConnectortasks.max1开云体育电动老虎机database.hostnamemysql开云体育电动老虎机database.port3306开云体育电动老虎机database.user开云体育官方注册网址开云体育电动老虎机database.passworddbz开云体育电动老虎机database.server.id184054开云体育电动老虎机database.server.namedbserver1table.whitelistinventory.customers, inventory.addresses开云体育电动老虎机database.history.kafka.bootstrap.servers卡夫卡:9092开云体育电动老虎机database.history.kafka.topicschema-changes.inventory转换打开transforms.unwrap.typeio.开云体育官方注册网址debezium.transforms.UnwrapFromEnvelopetransforms.unwrap.drop.tombstones}}

为此,执行以下curl命令:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @mysql-source.json

这将使用给定的凭据为指定的数据库设置连接器。开云体育电动老虎机对于我们的目的,我们只对改变感兴趣客户而且地址表,因此table.whitelist属性仅用于选择这两个表。另一个值得注意的东西是应用的“unwrap”转换。默认情况下,Debezi开云体育官方注册网址um的CDC事件将包含更改行的旧状态和新状态,以及关于更改源的一些附加元数据。通过应用UnwrapFromEnvelopeSMT(单消息转换),只有新的状态才会传播到相应的Kafka主题中。

我们可以在连接器部署完成并完成两个捕获表的初始快照后查看它们:

Docker-compose exec kafka/ kafka/bin/kafka-console-consumer.sh \——bootstrap-server kafka:9092 \——from-beginning \——属性打印。Key =true \——topic dbserver1.inventory。客户# or dbserver1.inventory.addresses

例如,您应该看到以下输出

(为了可读性,格式化并省略了模式信息)对于客户更改的主题:

{"schema":{…}, "payload": {"id": 1001}} {"schema":{…“有效载荷”}:{" id ": 1001年,“first_name”:“莎莉”,“last_name”:“托马斯”、“电子邮件”:“sally.thomas@acme.com”}}…

构建DDD聚合

KStreams应用程序将处理来自两个Kafka主题的数据。这些主题根据MySQL中的客户和地址关系接收CDC事件,每个主题都有相应的jackson注释的POJO(客户和地址),通过保存CDC事件类型的字段(即UPSERT/DELETE)来丰富。

由于Kafka主题记录是在Debezium JSON格式的未包装信封,一个开云体育官方注册网址特殊的SerDe是为了能够分别使用它们的POJO或Debezium事件表示来读写这些记录。开云体育官方注册网址序列化器只是使用Jackson将pojo转换为JSON,而反序列化器则是一种“混合型”,能够从Debezium CDC事件或jsonified pojo反序列化。开云体育官方注册网址

在此基础上,可以按照如下方式构建用于动态创建和维护DDD聚合的KStreams拓扑:

客户主题(“父”)

所有的客户记录都简单地从客户主题读入KTable根据记录密钥自动维护每个客户的最新状态(即客户的PK)

KTable customerTable = builder。表(parentTopic Consumed.with (defaultIdSerde customerSerde));

地址主题(“儿童”)

对于地址记录,处理稍微复杂一些,需要几个步骤。首先,将所有地址记录读入KStream

KStream addressStream = builder。流(childrenTopic,消耗。(defaultIdSerde addressSerde));

其次,根据这些地址记录的键(关系中的原始主键)对这些地址记录进行“伪”分组。在这一步中,维护与相应客户记录的关系。这有效地允许跟踪哪个地址记录属于哪个客户记录,即使在地址记录删除的情况下也是如此。要做到这一点,还需要额外的努力LatestAddress引入POJO,允许存储最新已知的PK <→FK关系地址记录本身。

KTable tempTable = addressStream . groupbykey (Serialized. groupbykey)with(defaultIdSerde, addressSerde)) .aggregate(() ->LatestAddress(), (DefaultId addressId,地址地址,LatestAddress最新)->{最新。update(address, addressId,DefaultId (address.getCustomer_id ()));返回最新的;},物化。< DefaultId、LatestAddress KeyValueStore <字节,字节[]> > (childrenTopic +_table_temp) .withKeySerde(defaultIdSerde) .withValueSerde(latestAddressSerde));

三、中间环节KTable再次转换为KStream.的LatestAddress记录被转换为使用客户id (FK关系)作为它们的新密钥,以便根据每个客户对它们进行分组。在分组步骤中,更新客户特定的地址,这可能导致添加或删除地址记录。出于这个目的,另一个POJO打来电话地址,其中包含相应更新的地址记录映射。结果是KTable持有最新的地址每个客户id。

KTable addressTable = tempTable.toStream() .map((addressId, latestAddress) -> . addressKeyValue<>(latestAddress. getcustomerid (),latestAddress)) .groupByKey(Serialized.with(defaultIdSerde,latestAddressSerde)) .aggregate(() -> . getcustomerid ()) .groupByKey(Serialized.with(defaultIdSerde,latestAddressSerde)) .aggregate(() -> . getcustomerid ()Addresses(), (customerId, latestAddress, Addresses) -> {Addresses .update(latestAddress);返回地址;},物化。< DefaultId、地址、KeyValueStore <字节,字节[]> > (childrenTopic +_table_aggregate) .withKeySerde(defaultIdSerde) .withValueSerde(addressesSerde));

客户与地址组合

最后,它很容易将客户和地址结合在一起将客户KTable与地址KTable连接起来从而构建DDD聚合,DDD聚合由CustomerAddressAggregatePOJO。最后,KTable的更改被写入KStream, KStream又被保存到kafka主题中。这允许以多种方式使用产生的DDD聚合。

KTable dddAggregate = customerTable。join(addressTable, (customer, addresses) -> customer. get_eventtype () == EventType。删除吗?CustomerAddressAggregate(客户,addresses.getEntries ()));dddAggregate.toStream (), (final_ddd_aggregatesProduced.with (defaultIdSerde (Serde) aggregateSerde));

客户KTable中的记录可能会收到CDC删除事件。如果是这样,可以通过检查客户POJO的事件类型字段来检测,例如返回'null'而不是DDD聚合。当消费方也需要对删除采取相应的行动时,这样的公约是有用的。_

运行聚合管道

实现了聚合管道之后,是时候对其进行测试运行了。为此,构建poc-ddd-aggregates包含完整实现的Maven项目:

MVN清洁包-f pocc -ddd- totals /pom.xml

然后运行聚合器服务,该服务接收这个项目构建的JAR,并使用java-jboss-openjdk8-jdk基地图片:

Docker-compose up -d聚合器

一旦聚合管道开始运行,我们可以使用控制台消费者查看聚合事件:

Docker-compose exec kafka/ kafka/bin/kafka-console-consumer.sh \——bootstrap-server kafka:9092 \——from-beginning \——属性打印。Key =true \——topic final_ddd_aggregate . Key =true \——topic final_ddd_aggregate

将DDD聚合体传输到数据接收器

我们最初构建这些DDD聚合是为了在数据源(在本例中是MySQL表)和方便的数据接收器之间传输数据和同步更改。根据定义,DDD聚合体通常是复杂的数据结构,因此将它们写入提供灵活的方式和方法来查询和/或索引它们的数据存储是非常有意义的。谈到NoSQL数据库,文档存储似乎是最自开云体育电动老虎机然的选择MongoDB成为此类用例的领先数据库。开云体育电动老虎机

多亏了卡夫卡连接还有很多交钥匙准备好了连接器做到这一点几乎毫不费力。使用一个MongoDB接收器连接器从开源社区,可以很容易地将DDD聚合写入MongoDB。它所需要的是一个适当的配置,可以张贴到REST API为了运行连接器。

所以让我们启动MongoDb和另一个Kafka Connect实例来托管接收器连接器:

Docker-compose up -d mongodb connect_sink

如果DDD聚合应该不加修改地写入MongoDB,配置可能看起来像下面这样简单:

的名字mongodb-sink配置: {connector.classat.grahsl.kafka.connect.mongodb.MongoDbSinkConnectortasks.max1主题final_ddd_aggregatesmongodb.connection.urimongodb: / / mongodb: 27017 /库存吗?w = 1杂志= truemongodb.collectioncustomers_with_addressesmongodb.document.id.strategyat.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategymongodb.delete.on.null.values真正的}}

与源连接器一样,使用curl部署连接器:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8084/connectors/ -d @mongodb-sink.json

这个连接器将使用来自Kafka主题“final_ddd_totals”的消息,并将它们写入MongoDB文档进入“customers_with_addresses”集合。

你可以通过启动一个Mongo shell并查询集合的内容来查看:

Docker-compose exec mongodb bash -c ' mongodb inventory' > db.customers_with_address .find().pretty()
_id: {id1001},地址: [{邮政编码76036_eventType插入城市Euless摩尔大道3183号id10状态德州customer_id1001类型航运}, {邮政编码17116_eventType插入城市哈里斯堡隐谷路2389号id11状态宾西法尼亚customer_id1001类型计费}),客户: {_eventType插入last_name托马斯。id1001first_name莎莉电子邮件sally.thomas@acme.com}}

由于数据组合在一个文档中,有些部分是不需要的或多余的。为了摆脱任何不需要的数据(例如_eventType,每个地址子文档的customer_id),也可以调整配置,以便将所述字段列入黑名单。

最后,在MySQL源数据库中更新一些客户或地址数据:开云体育电动老虎机

MYSQL_USER -p$MYSQL_PASSWORD库存'mysql >更新客户的first_name= "Sarah" where id = 1001;

此后不久,您将看到MongoDB中相应的聚合文档已相应地更新。

缺点和限制

虽然从基于表的CDC事件创建DDD聚合的第一个版本基本可以工作,但了解其当前的局限性非常重要:

  • 不一般适用,因此需要为pojo和中间类型定制代码

  • 不能跨多个实例进行扩展,这是由于在处理之前缺少但必要的数据重新分区

  • 仅限于基于1:N关系之间的单个JOIN构建聚合

  • 由此产生的DDD聚合最终是一致的,这意味着它们有可能在收敛之前暂时表现出中间状态

前几个问题可以通过在KStreams应用程序上进行合理的工作来解决。最后一个问题,处理最终产生的DDD聚合的一致性性质要困难得多,需要在Debezium自己的CDC机制上做一些努力。开云体育官方注册网址

前景

在这篇文章中,我们描述了从Debezium的CDC事件创建聚合事件的方法。开云体育官方注册网址在后续的博客文章中,我们可能会更深入地讨论如何通过运行多个KStreams聚合器实例来水平扩展DDD创建。为此,在运行拓扑之前需要对数据进行适当的重新分区。此外,研究一个更通用的版本可能会很有趣,它只需要自定义类来描述涉及的两个主要pojo。

我们还考虑提供一个即用的组件,它可以以通用的方式工作(基于Connect记录,即不绑定到特定的序列化格式,如JSON),并且可以设置为运行给定聚合的可配置的独立进程。

同样在处理最终一致性的话题上,我们得到了一些想法,但这些肯定需要更多的探索和调查。请继续关注!

我们很乐意听到您对事件聚合主题的反馈。如果你对这个主题有任何想法或想法,请在下方发表评论或发送消息给我们邮件列表

汉斯Grahsl

Hans-Peter是NETCONOMY的技术培训师,同时也是Java web开发和现代数据架构的个人顾问。此外,他还担任软件工程的副讲师。他住在奥地利的格拉茨。

贡纳Morling

Gunnar是Decodable的软件工程师,也是一名不折不扣的开源爱好者。多年来,他一直是Debezium的项目负责人。开云体育官方注册网址Gunnar创建了kcctl、JfrUnit和MapStruct等开源项目,并且是Bean验证2.0 (JSR 380)的规范负责人。他在德国汉堡工作。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题