在这篇文章中,我们将讨论规范化关系数据库MySQL(作为命令数据库)和非规范化NoSQL数据库MongoDB(作为查询数据库)之间的CDC-CQRS管道,从而通过Debezium和Kaf开云体育电动老虎机ka-Streams创建DDD聚合。开云体育官方注册网址
这个例子围绕三个微服务展开:order-write-service
,order-aggregation-service
而且order-read-service
.这些服务在Java中被实现为Spring-Boot应用程序。
的order-write-service
在MySQL数据库中,将两个REST端点持久化在各自的表中。开云体育电动老虎机开云体育官方注册网址Debezium跟踪MySQL bin日志以捕获这两个表中的任何事件,并将消息发布到Kafka主题。这些主题由order-aggregation-service
这是一个Kafka-Streams应用程序,它将来自这两个主题的数据连接起来,创建一个Order-Aggregate对象,然后发布到第三个主题。该主题由MongoDB Sink Connector使用,数据持久化在MongoDB中order-read-service
.
解决方案的整体架构如下图所示:
REST应用:order-write-service
触发工作流启动的第一个组件是order-write-service
.这已经被实现为Spring-Boot应用程序,并公开了两个REST端点:
职位:
api /运输详细信息
在MySQL数据库中持久化运输细节开云体育电动老虎机职位:
api /条目细节
在MySQL数据库中持久化项目详细信息开云体育电动老虎机
这两个端点都将它们的数据保存在MySQL数据库中各自的表中。开云体育电动老虎机
命令数据库:My开云体育电动老虎机SQL
上述REST端点的后端处理最终将数据持久化到MySQL中各自的表中。
运输详细信息存储在名为SHIPPING_DETAILS
.项详细信息存储在名为ITEM_DETAILS
.
的数据模型SHIPPING_DETAILS
表,列ORDER_ID
它的主键:
的数据模型ITEM_DETAILS
表,列ORDER_ID
+ITEM_ID
它的主键:
Kafka-Connect源连接器:MySQL CDC Debezium开云体育官方注册网址
变更数据捕获(CDC)是一种解决方案,它从数据库事务日志(在MySQL中称为BinLogs)捕获变更事件,并将这些事件转发给下游消费者(例如Kafka开云体育电动老虎机主题)。
开云体育官方注册网址Debezium是一个为变更数据捕获(CDC)提供低延迟数据流平台的平台,它构建在Apache Kafka之上。它允许将数据库行级更开云体育电动老虎机改作为事件捕获并发布到Apache Kafka主题。我们设置和配置Debezium来监视数据库,然开云体育官方注册网址后我们的应用程序为对数据库所开云体育电动老虎机做的每一个行级更改消耗事件。
在我们的例子中,我们将使用Debezium MySQL So开云体育官方注册网址urce连接器来捕获上述表中的任何新事件,并将它们中继到Apache Kafka。为了实现这一点,我们将通过post以下JSON请求到Kafka Connect的REST API来注册我们的连接器:
{"的名字":"app-mysql-db-connector","配置": {"connector.class":"io.开云体育官方注册网址debezium.connector.mysql.MySqlConnector","tasks.max":"1","开云体育电动老虎机database.hostname":"mysql_db_server","开云体育电动老虎机database.port":"3306","开云体育电动老虎机database.user":"custom_mysql_user","开云体育电动老虎机database.password":"custom_mysql_user_password","开云体育电动老虎机database.server.id":"184054","开云体育电动老虎机database.server.name":"app-mysql-server","开云体育电动老虎机database.whitelist":"app-mysql-db","table.whitelist":"app-mysql-db.shipping_details, app-mysql-db.item_details","开云体育电动老虎机database.history.kafka.bootstrap.servers":"kafka_server: 29092","开云体育电动老虎机database.history.kafka.topic":"dbhistory.app-mysql-db","include.schema.changes":"真正的","转换":"打开","transforms.unwrap.type":"io.开云体育官方注册网址debezium.transforms.ExtractNewRecordState"}}
上述配置基于Debezium 1.9.5.Final。开云体育官方注册网址请注意,如果尝试将演示版本与Debezium 2.0+一起使用,上面的许多配置属性将具有新的名称,并开云体育官方注册网址且配置将需要进行一些调整。 |
的实例io.开云体育官方注册网址debezium.connector.mysql.MySqlConnector
,从指定的MySQL实例中捕获更改。注意,通过表包含列表的方式,只能更改SHIPPING_DETAILS
而且ITEM_DETAILS
表被捕获。它还应用名为ExtractNewRecordState
它提取后
字段来自Kafka记录中的开云体育官方注册网址Debezium更改事件。SMT仅用它的更改事件替换原始的更改事件后
字段创建一个简单的Kafka记录。
默认情况下,Kafka主题名是“serverName.schemaName”。tableName”,根据我们的连接器配置翻译为:
app-mysql-server.app-mysql-db.item_details
app-mysql-server.app-mysql-db.shipping_details
Kafka-Streams应用:order-aggregation-service
即Kafka-Streams应用程序order-aggregation-service
,将处理来自两个Kafka cdc-topic的数据。这些主题根据在MySQL中找到的发货细节和项目细节关系接收CDC事件。
在此基础上,可以按照如下方式构建用于动态创建和维护DDD订单聚合的KStreams拓扑。
应用程序从shipping-details-cdc-topic中读取数据。由于Kafka主题记录是带有未包装信封的Debezium JSON格式,我开云体育官方注册网址们需要解析其中的order-id和shipping-details,以order-id为键,shipping-details为值创建一个KTable。
//读取运输细节KStream <字符串,字符串> shippingDetailsSourceInputKStream = streamsBuilder。流(shippingDetailsTopicName,消耗。(STRING_SERDE STRING_SERDE));//将消息的Json值更改为ShippingDetailsDtoKStream <字符串, ShippingDetailsDto> shippingDetailsDtoWithKeyAsOrderIdKStream = shippingDetailsSourceInputKStream .map((orderIdJson, shippingDetailsJson) -> .map新KeyValue < > (parseOrderId (orderIdJson) parseShippingDetails (shippingDetailsJson)));//将KStream转换为KTableKTable <字符串, ShippingDetailsDto> shippingDetailsDtoWithKeyAsOrderIdKTable = shippingDetailsDtoWithKeyAsOrderIdKStream。toTable(物化。<字符串, ShippingDetailsDto, KeyValueStore字节[]> > (SHIPPING_DETAILS_DTO_STATE_STORE) .withKeySerde (STRING_SERDE) .withValueSerde (SHIPPING_DETAILS_DTO_SERDE));
类似地,应用程序从item-details-cdc-topic中读取数据,并从每个单独的消息中解析order-id和item- by -一个列表中属于同一order-id的所有条目,然后将该列表聚合到一个KTable,其中order-id为键,属于该特定order-id的条目列表为值。
//项目详细信息KStream <字符串,字符串> itemDetailsSourceInputKStream = streamsBuilder。流(itemDetailsTopicName,消耗。(STRING_SERDE STRING_SERDE));//将消息的Key从ItemId + OrderId更改为只有OrderId,并将Json值解析为ItemDtoKStream <字符串, ItemDto> itemDtoWithKeyAsOrderIdKStream = itemDetailsSourceInputKStream .map((itemIdOrderIdJson, itemDetailsJson) -> .map新KeyValue < > (parseOrderId (itemIdOrderIdJson) parseItemDetails (itemDetailsJson)));//将每个OrderId的所有ItemDtos进行分组KGroupedStream <字符串, ItemDto> itemDtoWithKeyAsOrderIdKGroupedStream = itemDtoWithKeyAsOrderIdKStream.groupByKey(分组。(STRING_SERDE ITEM_DTO_SERDE));//在一个列表中聚合所有属于每个OrderId的ItemDtosKTable <字符串,ArrayList> itemDtoListWithKeyAsOrderIdKTable = itemDtoWithKeyAsOrderIdKGroupedStream。总(初始化<ArrayList< ItemDto > >)ArrayList::新, (orderId, itemDto, itemDtoList) -> addItemToList(itemDtoList, itemDto), Materialized.<字符串,ArrayList< ItemDto >, KeyValueStore <字节,字节[]> > (ITEM_DTO_STATE_STORE) .withKeySerde (STRING_SERDE) .withValueSerde (ITEM_DTO_ARRAYLIST_SERDE));
由于两个ktable都使用order-id作为键,因此很容易使用order-id来连接它们,从而创建一个名为Order-Aggregate的聚合。Order-Aggregate是一个复合对象,它通过吸收来自发货详细信息和项目详细信息的数据而创建。然后这个Order-Aggregate被写入一个Order-Aggregate Kafka主题。
//连接两个表:shippingDetailsDtoWithKeyAsOrderIdKTable和itemDtoListWithKeyAsOrderIdKTableValueJoiner < ShippingDetailsDto,ArrayList, OrderAggregate> shippingDetailsAndItemListJoiner = (shippingDetailsDto, itemDtoList) -> instantiateOrderAggregate(shippingDetailsDto, itemDtoList);KTable <字符串, OrderAggregate> orderAggregateKTable = shippingDetailsDtoWithKeyAsOrderIdKTable。加入(itemDtoListWithKeyAsOrderIdKTable shippingDetailsAndItemListJoiner);//输出到Kafka主题orderAggregateKTable.toStream()。(orderAggregateTopicName,生产。(STRING_SERDE ORDER_AGGREGATE_SERDE));
Kafka-Connect Sink Connector: MongoDB连接器
接收器连接器是一个Kafka Connect连接器,它从Apache Kafka读取数据并将数据写入某个数据存储。使用MongoDB接收器连接器,可以很容易地将DDD聚合写入MongoDB。它所需要的是一个配置,可以发布到Kafka Connect的REST API,以便运行连接器。
{"的名字":"app-mongo-sink-connector","配置": {"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector","主题":"order_aggregate","connection.uri":"mongodb: / / root_mongo_user: root_mongo_user_password@mongodb_server: 27017","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":假,"开云体育电动老虎机":"order_db","集合":"订单","document.id.strategy.overwrite.existing":"真正的","document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy","转换":"香港,高压","transforms.hk.type":"org.apache.kafka.connect.transforms.HoistField美元关键","transforms.hk.field":"_id","transforms.hv.type":"org.apache.kafka.connect.transforms.HoistField美元价值","transforms.hv.field":"订单"}}
查询数据库:开云体育电动老虎机MongoDB
DDD聚合被写入数据库开云体育电动老虎机order_db
在收藏中订单
在MongoDB。order-id变为_id
桌子和订单
列将订单聚合存储为JSON。
REST应用:order-read-service
MongoDB中持久化的Order Aggregate是通过REST端点提供服务的order-read-service
.
得到:
api /订单/{订单id}
从MongoDB数据库检索订单开云体育电动老虎机
执行指令
本文提供了完整的源代码在这里在Github。首先克隆此存储库并更改为cdc-cqrs-pipeline
目录中。该项目提供了一个Docker Compose文件,为所有组件提供服务:
MySQL
管理员(以前称为phpMinAdmin),通过浏览器管理MySQL
MongoDB
MongoDB Express,通过浏览器管理MongoDB
动物园管理员
汇合的卡夫卡
卡夫卡连接
所有服务启动后,通过执行命令注册Debezium MySQL连接器和MongoDB连接器的实例开云体育官方注册网址Create-MySQL-开云体育官方注册网址Debezium-Connector
而且Create-MongoDB-Sink-Connector
请求分别来自cdc-cqrs-pipeline.postman_collection.json
.执行请求Get-All-Connectors
来验证连接器是否已正确创建。
切换到单独的目录并启动三个Spring-Boot应用程序:
order-write-service
:在端口no上运行8070
order-aggregation-service
:在端口no上运行8071
order-read-service
:在端口no上运行8072
这样,我们的设置就完成了。
要测试应用程序,请执行请求Post-Shipping-Details
从邮差收集到插入运输细节和Post-Item-Details
为特定的订单id插入项目详细信息。
最后,执行Get-Order-By-Order-Id
请求,以检索完整的订单汇总。
总结
Apache Kafka充当了服务间消息传递的高度可伸缩和可靠的骨干。将Apache Kafka置于整个体系结构的中心也确保了相关服务的解耦。例如,如果解决方案的单个组件出现故障或在一段时间内不可用,则稍后将简单地处理事件:重新启动后,Debezium连接器将从之前停止的位置继续跟踪相关表。开云体育官方注册网址类似地,任何使用者都将继续处理来自其先前偏移量的主题。通过跟踪已经成功处理的消息,可以检测到重复的消息并将其排除在重复处理之外。
当然,不同服务之间的事件管道最终是一致的,即消费者(如订单读取服务)可能会落后于生产者(如订单写入服务)。不过,通常这是可以的,并且可以根据应用程序的业务逻辑进行处理。此外,整个解决方案的端到端延迟通常很低(秒级甚至次秒级范围),这要归功于基于日志的更改数据捕获,它允许以近乎实时的方式发射事件。
关于Debe开云体育官方注册网址zium
开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源下Apache许可证,版本2.0.
参与
我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium,在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题.