
更新外部全文搜索索引(例如:<一个href="https://www.elastic.co/products/elasticsearch">Elasticsearch一个>)是变更数据捕获(CDC)的一个非常流行的用例。
正如我们在<一个href="//www.coraltyphoon.com/blog/2018/01/17/streaming-to-elasticsearch/">博客一个>一段时间前,Debezium的CDC源连接器和Confluent开云体育官方注册网址的结合<一个href="https://docs.confluent.io/current/connect/connect-elasticsearch/docs/index.html">用于Elasticsearch的下沉连接器一个>可以直接捕捉MySQL、Postgres等中的数据变化,并将其近实时地推向Elasticsearch。这将导致源数据库中的表和Elasticsearch中相应的搜索索引之间的1:1关系,这对于许多用例来说是非常好的。开云体育电动老虎机
但是,如果您想将整个聚合放入单个索引中,则会变得更具挑战性。一个例子可以是一个客户和他们所有的地址;它们通常存储在RDBMS中两个独立的表中,通过外键链接,而Elasticsearch中只需要一个索引,包含嵌入地址的客户文档,允许您根据地址有效地搜索客户。
跟进<一个href="//www.coraltyphoon.com/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/">KStreams-based解决方案一个>对于我们最近所描述的,我们想在这篇文章中提出一种由应用层驱动的聚合视图的替代方案。
概述
其思想是在原始数据被更改时,将视图物化到源数据库中的单独表中。开云体育电动老虎机
聚合被序列化为JSON结构(自然可以表示任何嵌套的对象结构)并存储在特定的表中。这是在更改数据的实际事务中完成的,这意味着聚合视图始终与主数据一致。特别是,这种方法不容易像上面链接的文章中讨论的基于kstreams的解决方案那样暴露中间聚合。
整体架构如下图所示:

在这里,通过对的一个小扩展来实现聚合视图<一个href="http://hibernate.org/orm/">Hibernate ORM一个>,它将JSON聚合存储在源数据库中(注意,“聚合视图”在概念上可以被认为与来自不同RDBMS的“物化视图”开云体育电动老虎机相同,因为它们物化了“连接”操作的结果,但从技术上讲,我们不是使用后者来存储聚合视图,而是使用常规表)。然后Debezium捕获对该聚合表的更改,并将其流到每个聚合类型的一个主题。开云体育官方注册网址Elasticsearch接收器连接器可以订阅这些主题并更新相应的全文索引。
你可以在我们的文章中找到这个想法的概念验证实现(Hibernate扩展和相关代码)<一个href="https://github.com/debezium/debezium-examples/tree/main/jpa-aggregations">实例库一个>.当然,一般的思想并不局限于Hibernate ORM或JPA,您可以使用任何其他用于访问数据的API来实现类似的东西。
通过Hibernate ORM创建聚合视图
对于下面的内容,让我们假设我们正在持久化一个简单的领域模型(包含一个客户
实体和一些相关的,比如地址
(客户)类别
等等)。开云体育电动老虎机使用Hibernate可以使聚合的创建对实际应用程序代码完全透明<一个href="http://docs.jboss.org/hibernate/orm/current/userguide/html_single/Hibernate_User_Guide.html">Hibernate事件监听器一个>.由于其可扩展的体系结构,我们可以将这样的侦听器插入Hibernate,只需将它添加到类路径中,当引导实体管理器/会话工厂时,它将从类路径中自动提取。
我们的示例侦听器响应一个注释,@MaterializeAggregate
,它标记了那些应该是物化聚合的根的实体类型。
@ entity年代p一个n><年代p一个n类="annotation">@MaterializeAggregate年代p一个n>(aggregateName =<年代p一个n类="string">"年代p一个n><年代p一个n类="content">客户所有年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>)<年代p一个n类="directive">公共年代p一个n><年代p一个n类="type">类年代p一个n><年代p一个n类="class">客户年代p一个n>{<年代p一个n类="annotation">@ id年代p一个n><年代p一个n类="directive">私人年代p一个n><年代p一个n类="type">长年代p一个n>我d;<年代p一个n类="directive">私人年代p一个n><年代p一个n类="predefined-type">字符串年代p一个n>firstName;<年代p一个n类="annotation">@OneToMany年代p一个n>(mappedBy =<年代p一个n类="string">"年代p一个n><年代p一个n类="content">客户年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>, fetch = FetchType。EAGER, cascade = CascadeType.ALL)<年代p一个n类="directive">私人年代p一个n><年代p一个n类="predefined-type">集年代p一个n><地址>地址;<年代p一个n类="annotation">@ManyToOne年代p一个n><年代p一个n类="directive">私人年代p一个n>分类类别;...}
现在,如果任何实体注释@MaterializeAggregate
通过Hibernate插入,更新或删除,侦听器将启动并物化聚合根(客户)及其关联实体(地址,类别)的JSON视图。
在引擎盖下<一个href="https://github.com/FasterXML/jackson">杰克逊API一个>用于将模型序列化为JSON。这意味着您可以使用它的任何注释来定制JSON输出,例如。@JsonIgnore
排除逆关系地址
来客户
:
@ entity年代p一个n><年代p一个n类="directive">公共年代p一个n><年代p一个n类="type">类年代p一个n><年代p一个n类="class">地址年代p一个n>{<年代p一个n类="annotation">@ id年代p一个n><年代p一个n类="directive">私人年代p一个n><年代p一个n类="type">长年代p一个n>我d;<年代p一个n类="annotation">@ManyToOne年代p一个n><年代p一个n类="annotation">@JoinColumn年代p一个n>(name =<年代p一个n类="string">"年代p一个n><年代p一个n类="content">customer_id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>)<年代p一个n类="annotation">@JsonIgnore年代p一个n><年代p一个n类="directive">私人年代p一个n>客户的客户;<年代p一个n类="directive">私人年代p一个n><年代p一个n类="predefined-type">字符串年代p一个n>街;<年代p一个n类="directive">私人年代p一个n><年代p一个n类="predefined-type">字符串年代p一个n>城市;...}
请注意,地址
本身没有标记@MaterializeAggregate
也就是说,它本身不会物化到一个聚合视图中。
在使用JPA的Ent我tyManager
为了插入或更新一些客户,让我们看一下聚合
由侦听器填充的表(为简洁起见,省略了值模式):
><年代p一个n类="class">选择年代p一个n>*<年代p一个n类="keyword">从年代p一个n>骨料;| rootType | keySchema | rootId | materialization | valueSchema | | customers-complete | {<年代p一个n类="string">"年代p一个n><年代p一个n类="content">模式年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="string">"年代p一个n><年代p一个n类="content">类型年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">结构体年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">字段年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: [{<年代p一个n类="string">"年代p一个n><年代p一个n类="content">类型年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">int64年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">可选年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="predefined-constant">假年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">场年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}),<年代p一个n类="string">"年代p一个n><年代p一个n类="content">可选年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="predefined-constant">假年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">的名字年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">客户所有。关键年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}} | {<年代p一个n类="string">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">1004年代p一个n>}|{<年代p一个n类="string">"年代p一个n><年代p一个n类="content">模式年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>{…}} | {<年代p一个n类="string">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">1004年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">firstName年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">安妮年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">姓年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">Kretchmar年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">电子邮件年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">annek@noanswer.org年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">标签年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:【<年代p一个n类="string">"年代p一个n><年代p一个n类="content">长期年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">贵宾年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>),<年代p一个n类="string">"年代p一个n><年代p一个n类="content">生日年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">5098年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">类别年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="string">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">100001年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">的名字年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">零售年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>},<年代p一个n类="string">"年代p一个n><年代p一个n类="content">地址年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: [{<年代p一个n类="string">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">16年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">街年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">大学山路1289号年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">城市年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">Canehill年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">状态年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">阿肯色州年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">邮政编码年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">72717年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">类型年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">航运年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}]} |
该表包含这些列:
rootType
类中给出的聚合的名称@MaterializeAggregate
注释rootId
:聚合的id为序列化的JSON物质化
:聚合本身为序列化的JSON;在这种情况下,一个客户和他们的地址,类别等。keySchema
:行键的Kafka Connect模式valueSchema
:物化的Kafka Connect模式
让我们稍微讨论一下这两个模式列。就其支持的数据类型而言,JSON本身是非常有限的。例如,我们会丢失关于数值字段的值范围(int vs. long等)的信息,而没有任何其他信息。因此,侦听器从实体模型中获取键和聚合视图的相应模式信息,并将其存储在聚合记录中。
现在Jackson本身只支持JSON Schema,这对于我们的目的来说有点太有限了。因此,示例实现为Jackson的模式系统提供了自定义序列化器,它允许我们发出Kafka Connect的模式表示(带有更精确的类型信息),而不是普通的JSON模式。这将在接下来的时候派上用场,当我们想将基于字符串的JSON表示的键和值扩展为正确类型的Kafka Connect记录时。
捕获对聚合表的更改
现在,我们有了一种机制,每当通过Hibernate更改应用程序数据时,它可以透明地将聚合持久化到源数据库中的一个单独的表中。开云体育电动老虎机请注意,这发生在源事务的边界内,因此如果由于某种原因将回滚相同的事务,那么聚合视图也不会更新。
Hibernate监听器在编写聚合视图时使用插入或更新语义,也就是说,对于给定的聚合根,在聚合表中总是只有一个对应的条目反映了它的当前状态。如果删除了聚合根实体,侦听器也将从聚合表中删除该条目。
现在让我们设置Debezium来开云体育官方注册网址捕获对聚合
表:
c年代p一个n><年代p一个n类="error">u年代p一个n><年代p一个n类="error">r年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">我年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">X年代p一个n><年代p一个n类="error">P年代p一个n><年代p一个n类="error">O年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">T年代p一个n><年代p一个n类="error">\年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">H年代p一个n><年代p一个n类="string">"年代p一个n><年代p一个n类="content">接受:application / json年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>\年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">H年代p一个n><年代p一个n类="string">"年代p一个n><年代p一个n类="content">内容类型:application / json年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>\年代p一个n><年代p一个n类="error">h年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">p年代p一个n>:<年代p一个n类="error">/年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">一个年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">h年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">t年代p一个n>:<年代p一个n类="integer">8083年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">n年代p一个n><年代p一个n类="error">n年代p一个n><年代p一个n类="error">e年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">r年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">d年代p一个n><年代p一个n类="error">@年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error"><年代p一个n><年代p一个n类="error"><年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">E年代p一个n><年代p一个n类="error">O年代p一个n><年代p一个n类="error">F年代p一个n>{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">的名字年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">inventory-connector年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">配置年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">connector.class年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">io.开云体育官方注册网址debezium.connector.mysql.MySqlConnector年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">tasks.max年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">1年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.hostname年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">mysql年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.port年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">3306年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.user年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">开云体育官方注册网址"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.password年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">dbz年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.server.id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">184054年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.server.name年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">dbserver1年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.whitelist年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">库存年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">table.whitelist年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">*总量。年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.history.kafka.bootstrap.servers年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">卡夫卡:9092年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">开云体育电动老虎机database.history.kafka.topic年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">schema-changes.inventory年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}}<年代p一个n类="error">E年代p一个n><年代p一个n类="error">O年代p一个n><年代p一个n类="error">F年代p一个n>
这将MySQL连接器注册到“库存”数据库(我们使用的是扩展版本的模式开云体育电动老虎机<一个href="//www.coraltyphoon.com/docs/tutorial/">开云体育官方注册网址Debezium教程一个>),捕获对“聚合”表的任何更改。
扩大JSON
如果我们现在要浏览相应的Kafka主题,我们将看到已知的Debezium格式的所有数据更改事件开云体育官方注册网址聚合
表格
但是,带有记录“after”状态的“materialization”字段仍然是包含JSON字符串的单个字段。我们更希望有一个强类型的Kafka Connect记录,它的模式准确地描述了聚合结构和字段的类型。为此,示例项目提供了一个SMT(单个消息转换),它接受JSON物化和相应的信息valueSchema
并将其转换为完整的Kafka Connect记录。对于键也是如此。DELETE事件被重写为墓碑事件。最后,SMT将每个记录重新路由到以聚合根命名的主题,允许使用者只订阅对特定聚合类型的更改。
所以让我们在注册Debezium CDC连接器时添加SMT:开云体育官方注册网址
.年代p一个n><年代p一个n类="error">.年代p一个n><年代p一个n类="error">.年代p一个n><年代p一个n类="key">"年代p一个n><年代p一个n类="content">转换年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">expandjson年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">transforms.expandjson.type年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">io.开云体育官方注册网址debezium.aggregation.smt.ExpandJsonSmt年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="error">.年代p一个n><年代p一个n类="error">.年代p一个n><年代p一个n类="error">.年代p一个n>
当现在浏览“客户完成”主题时,我们将看到我们所期望的强类型Kafka Connect记录:
{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">模式年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">类型年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">结构体年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">字段年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: [{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">类型年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">int64年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">可选年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="value">假年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">场年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}),<年代p一个n类="key">"年代p一个n><年代p一个n类="content">可选年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="value">假年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">的名字年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">客户所有。关键年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>},<年代p一个n类="key">"年代p一个n><年代p一个n类="content">有效载荷年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">1004年代p一个n>}}{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">模式年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">类型年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">结构体年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">字段年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:【<年代p一个n类="error">.年代p一个n><年代p一个n类="error">.年代p一个n><年代p一个n类="error">.年代p一个n>),<年代p一个n类="key">"年代p一个n><年代p一个n类="content">可选年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="value">真正的年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">的名字年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">urn: jsonschema: com:例如:域:客户年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>},<年代p一个n类="key">"年代p一个n><年代p一个n类="content">有效载荷年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">1004年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">firstName年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">安妮年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">姓年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">Kretchmar年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">电子邮件年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">annek@noanswer.org年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">活跃的年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="value">真正的年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">标签年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:【<年代p一个n类="string">"年代p一个n><年代p一个n类="content">长期年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">贵宾年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>),<年代p一个n类="key">"年代p一个n><年代p一个n类="content">生日年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">5098年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">类别年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">100001年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">的名字年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">零售年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>},<年代p一个n类="key">"年代p一个n><年代p一个n类="content">地址年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: [{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">16年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">街年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">大学山路1289号年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">城市年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">Canehill年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">状态年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">阿肯色州年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">邮政编码年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">72717年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">类型年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">生活年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}]}}
为了确认这些是真正的Kafka连接记录,而不仅仅是一个JSON字符串字段,你可以使用<一个href="//www.coraltyphoon.com/docs/configuration/avro/">Avro消息转换器一个>并检查模式注册中心中的消息模式。
将聚合消息放入Elasticsearch
最后一个缺失的步骤是注册Confluent Elasticsearch接收器连接器,将其与“customers-complete”主题连接起来,并让它将任何更改推到相应的索引:
c年代p一个n><年代p一个n类="error">u年代p一个n><年代p一个n类="error">r年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">我年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">X年代p一个n><年代p一个n类="error">P年代p一个n><年代p一个n类="error">O年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">T年代p一个n><年代p一个n类="error">\年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">H年代p一个n><年代p一个n类="string">"年代p一个n><年代p一个n类="content">接受:application / json年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>\年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">H年代p一个n><年代p一个n类="string">"年代p一个n><年代p一个n类="content">内容类型:application / json年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>\年代p一个n><年代p一个n类="error">h年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">p年代p一个n>:<年代p一个n类="error">/年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">一个年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">h年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">t年代p一个n>:<年代p一个n类="integer">8083年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">n年代p一个n><年代p一个n类="error">n年代p一个n><年代p一个n类="error">e年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">r年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">d年代p一个n><年代p一个n类="error">@年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error"><年代p一个n><年代p一个n类="error"><年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">E年代p一个n><年代p一个n类="error">O年代p一个n><年代p一个n类="error">F年代p一个n>{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">的名字年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">es-customers年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">配置年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">connector.class年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">io.confluent.connect.elasticsearch.ElasticsearchSinkConnector年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">tasks.max年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">1年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">主题年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">客户所有年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">connection.url年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">http://elastic:9200年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">key.ignore年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">假年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">schema.ignore年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">假年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">behavior.on.null.values年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">删除年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">type.name年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">customer-with-addresses年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">转换年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">关键年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">transforms.key.type年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">org.apache.kafka.connect.transforms.ExtractField美元关键年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">transforms.key.field年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}}<年代p一个n类="error">E年代p一个n><年代p一个n类="error">O年代p一个n><年代p一个n类="error">F年代p一个n>
使用Connect的ExtractField
转换,从键结构中获得实际的id值,并将其用作对应Elasticsearch文档的键。指定“behavior.on.null”。选项将允许连接器在遇到墓碑消息(即有键但没有值的消息)时从索引中删除相应的文档。
最后,我们可以使用Elasticsearch REST API来浏览索引,当然还可以使用其强大的全文查询语言通过地址或嵌入到聚合结构中的任何其他属性来查找客户:
>年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">u年代p一个n><年代p一个n类="error">r年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">X年代p一个n><年代p一个n类="error">G年代p一个n><年代p一个n类="error">E年代p一个n><年代p一个n类="error">T年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">H年代p一个n><年代p一个n类="string">"年代p一个n><年代p一个n类="content">接受:application / json年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>\年代p一个n><年代p一个n类="error">h年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">p年代p一个n>:<年代p一个n类="error">/年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">一个年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">h年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">t年代p一个n>:<年代p一个n类="integer">9200年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">u年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">米年代p一个n><年代p一个n类="error">e年代p一个n><年代p一个n类="error">r年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">-年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">o年代p一个n><年代p一个n类="error">米年代p一个n><年代p一个n类="error">p年代p一个n><年代p一个n类="error">l年代p一个n><年代p一个n类="error">e年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">e年代p一个n><年代p一个n类="error">/年代p一个n><年代p一个n类="error">_年代p一个n><年代p一个n类="error">年代年代p一个n><年代p一个n类="error">e年代p一个n><年代p一个n类="error">一个年代p一个n><年代p一个n类="error">r年代p一个n><年代p一个n类="error">c年代p一个n><年代p一个n类="error">h年代p一个n><年代p一个n类="error">?年代p一个n><年代p一个n类="error">p年代p一个n><年代p一个n类="error">r年代p一个n><年代p一个n类="error">e年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">t年代p一个n><年代p一个n类="error">y年代p一个n>{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">_shards年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">失败的年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">0年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">成功的年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">5年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">总计年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">5年代p一个n>},<年代p一个n类="key">"年代p一个n><年代p一个n类="content">支安打年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">支安打年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: [{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">_id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">1004年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">_index年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">客户所有年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">_score年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="float">1.0年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">_source年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">活跃的年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="value">真正的年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">地址年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: [{<年代p一个n类="key">"年代p一个n><年代p一个n类="content">城市年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">Canehill年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">16年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">状态年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">阿肯色州年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">街年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">大学山路1289号年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">类型年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">生活年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">邮政编码年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">72717年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}),<年代p一个n类="key">"年代p一个n><年代p一个n类="content">标签年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:【<年代p一个n类="string">"年代p一个n><年代p一个n类="content">长期年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="string">"年代p一个n><年代p一个n类="content">贵宾年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>),<年代p一个n类="key">"年代p一个n><年代p一个n类="content">生日年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">5098年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">类别年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: {<年代p一个n类="key">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">100001年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">的名字年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">零售年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>},<年代p一个n类="key">"年代p一个n><年代p一个n类="content">电子邮件年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">annek@noanswer.org年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">firstName年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">安妮年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">id年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">1004年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">姓年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">Kretchmar年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">分数年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: [],<年代p一个n类="key">"年代p一个n><年代p一个n类="content">someBlob年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="value">零年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">标签年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>: []},<年代p一个n类="key">"年代p一个n><年代p一个n类="content">_type年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="string">"年代p一个n><年代p一个n类="content">customer-with-addresses年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>}),<年代p一个n类="key">"年代p一个n><年代p一个n类="content">max_score年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="float">1.0年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">总计年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">1年代p一个n>},<年代p一个n类="key">"年代p一个n><年代p一个n类="content">timed_out年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="value">假年代p一个n>,<年代p一个n类="key">"年代p一个n><年代p一个n类="content">花了年代p一个n><年代p一个n类="delimiter">"年代p一个n>年代p一个n>:<年代p一个n类="integer">11年代p一个n>}
这样你就有了:客户的完整数据,包括他们的地址、类别、标签等,都被物化到Elasticsearch中的一个文档中。如果您正在使用JPA更新客户,您将看到索引中的数据以近乎实时的方式相应地更新。
利与弊
那么,这种从多个源表物化聚合的方法的优点和缺点是什么<一个href="//www.coraltyphoon.com/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/">KStreams-based方法一个>?
最大的优势是一致性和事务边界意识,而基于kstreams的解决方案在其建议的形式中容易暴露中介聚合。例如,如果存储一个客户和三个地址,可能会发生这样的情况:流查询首先创建客户和先插入的两个地址的聚合,然后不久再创建包含所有三个地址的完整聚合。这里讨论的方法不是这样的,因为你只会把完整的聚合流到Kafka。此外,这种方法感觉有点“轻量级”,即一个简单的标记注释(以及一些用于微调发出的JSON结构的Jackson注释)就足够从您的域模型中实现聚合,而使用KStreams解决方案需要更多的努力来设置所需的流、临时表等。
通过应用层驱动聚合的缺点是,它并不完全与您访问主数据的方式无关。如果您绕过应用程序,例如直接在数据库中修补数据,自然会错过这些更新,需要刷新受影响的聚合。开云体育电动老虎机尽管这同样可以通过变更数据捕获和Debezium来完成:对源表的变更事件可以由应用程序本身捕获和使用,从而允许它在外部数据更改后重新开云体育官方注册网址实现聚合。您还可能认为,在源事务中运行JSON序列化并在源数据库中存储聚合会带来一些开销。开云体育电动老虎机不过,这通常是可以接受的。
另一个要问的问题是,与简单地向Elasticsearch发布REST请求相比,在中间聚合表上使用变更数据捕获有什么优势。答案是高度增强的健壮性和容错性。如果Elasticsearch集群由于某种原因无法访问,Kafka和Kafka Connect的机制将确保任何更改事件最终都会传播,一旦sink再次启动。此外,Elasticsearch以外的其他消费者也可以订阅聚合主题,日志可以从头播放等等。
注意,虽然我们主要谈论的是使用Elasticsearch作为数据接收器,但也有其他数据存储和连接器支持复杂结构的记录。一个例子是MongoDB和<一个href="https://github.com/hpgrahsl/kafka-connect-mongodb">水槽连接器一个>由Hans-Peter Grahsl维护,人们可以使用它将客户聚合到MongoDB中,例如,通过一个主键查找就可以有效地检索客户及其所有相关数据。
前景
Hibernate ORM扩展以及本文中讨论的SMT可以在我们的<一个href="https://github.com/debezium/debezium-examples/tree/main/jpa-aggregations">实例库一个>.它们目前应该被认为处于“概念验证”级别。
也就是说,我们正在考虑将其作为一个合适的Debezium组件,允许您在基于hibernate的应用程开云体育官方注册网址序中使用这种聚合方法,只需引入这个新组件。不过,要做到这一点,我们首先得改进一些东西。最重要的是,需要一个API,它可以让你(重新)按需创建聚合,例如,为现有数据或通过Criteria API / JPQL批量更新的数据(侦听器将错过)。此外,如果任何引用的实体发生变化,聚合也应该自动重新创建(对于当前PoC,只有对客户实例本身的更改才会触发其聚合视图的重新构建,但对其地址之一的更改则不会)。
如果你喜欢这个想法,那么让我们知道,这样我们就可以衡量对它的普遍兴趣。此外,如果您有兴趣为Debezium项目做出贡献,这将是一个很好的项目。开云体育官方注册网址期待您的回复,例如在下面的评论区或在我们的<一个href="https://groups.google.com/forum/">邮件列表一个>.
非常感谢Hans-Peter Grahsl对这篇文章早期版本的反馈!
关于Debe开云体育官方注册网址zium
开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在<一个href="http://kafka.apache.org/">卡夫卡一个>并提供<一个href="http://kafka.apache.org/documentation.html">卡夫卡连接一个>监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是<一个href="//www.coraltyphoon.com/license/">开源一个>下<一个href="http://www.apache.org/licenses/LICENSE-2.0.html">Apache许可证,版本2.0一个>.
参与
我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们<一个href="https://twitter.com/debezium">@开云体育官方注册网址debezium一个>,<一个href="https://debezium.zulipchat.com/">在Zulip上和我们聊天一个>,或加入我们的<一个href="https://groups.google.com/forum/">邮件列表一个>与社区对话。所有的代码都是开源的<一个href="https://github.com/debezium/">GitHub上一个>,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址<一个href="https://issues.redhat.com/projects/DBZ/issues/">记录问题一个>.