卡夫卡流gydF4y2Ba是一个基于Apache Kafka开发流处理应用程序的库。引用它的文档,“Kafka Streams应用程序通过拓扑实时地记录流,连续地、并发地、逐条记录地处理数据”。Kafka Streams DSL提供了一系列流处理操作,如映射、过滤、连接和聚合。gydF4y2Ba

Kafka流中的非键连接gydF4y2Ba

开云体育官方注册网址Debezium的CDC源连接器可以很容易地捕获数据库中的数据变化,并将它们近实时地推向Elasticsearch等接收器系统。开云体育电动老虎机默认情况下,这将导致源数据库中的表、对应的Kafka主题和接收器端的数据表示(例如Elasticsearch中的搜索索引)之间的1:1关系。开云体育电动老虎机gydF4y2Ba

在1:n关系的情况下,比如在一个客户表和一个地址表之间,消费者通常对数据的视图感兴趣,它是一个嵌套的数据结构,例如,一个Elasticsearch文档表示一个客户和他们所有的地址。gydF4y2Ba

这就是gydF4y2Bakip - 213gydF4y2Ba(“Kafka改进方案”)和它的外键连接能力进来了:它在gydF4y2BaApache卡夫卡gydF4y2Ba2.4“缩小流中的KTables和关系数据库中的表之间的语义差距”。开云体育电动老虎机在KIP-213之前,为了连接来自两个Debezium更改事件主题的消息,通常必须手动重新输入至少一个开云体育官方注册网址主题的密钥,以确保在连接的两端使用相同的密钥。gydF4y2Ba

多亏了KIP-213,这不再需要了,因为它允许在从Kafka消息值中提取的字段中连接两个Kafka主题,以完全透明的方式自动处理所需的重键。比较,gydF4y2Ba以前的方法gydF4y2Ba,这大大减少了从Debezium的CDC事件创建聚合事件的工作量。开云体育官方注册网址gydF4y2Ba

非键连接gydF4y2Ba外键连接gydF4y2Ba类似于SQL中的连接,如下所示:gydF4y2Ba

选择gydF4y2Ba*gydF4y2Ba从gydF4y2Ba客户gydF4y2Ba加入gydF4y2Ba地址gydF4y2Ba在gydF4y2Ba客户。Id =地址。CUSTOMER_IDgydF4y2Ba

在Kafka流术语中,这种连接的输出是newgydF4y2BaKTablegydF4y2Ba包含连接结果。gydF4y2Ba

开云体育电动老虎机数据库概述gydF4y2Ba

继续前面关于客户和地址的例子,让我们考虑一个具有以下数据模型的应用程序:gydF4y2Ba

开云体育电动老虎机数据库概述"></div>
          <div class=

客户和地址这两个实体共享一个从地址到客户的外键关系,即一个客户可以有多个地址。如上所述,默认情况下Debezium将针对不同主题的每个开云体育官方注册网址表发出事件。使用Kafka流,两个表的更改事件主题将被装入两个表中gydF4y2BaKTablegydF4y2BaS,它们在客户id上连接。Kafka Streams应用程序将处理来自两个Kafka主题的数据。只要任意一个主题上有新的CDC事件(由插入、更新或删除记录触发),就会重新执行连接。gydF4y2Ba

作为Kafka Streams应用程序的运行时,我们将使用gydF4y2BaQuarkusgydF4y2Ba,一个用于构建云原生微服务的堆栈,它(以及许多其他)还提供了一个gydF4y2Ba扩展gydF4y2Ba卡夫卡流。虽然它一般可以通过一个平原运行Kafka流拓扑gydF4y2Bamain ()gydF4y2Ba方法,使用Quarkus和这个扩展作为基础有许多优点:gydF4y2Ba

  • 拓扑的管理(例如,等待所有输入主题被创建)gydF4y2Ba

  • 可通过环境变量、系统属性等进行配置。gydF4y2Ba

  • 暴露运行状况检查gydF4y2Ba

  • 暴露指标gydF4y2Ba

  • 开发模式gydF4y2Ba,一种在代码更改后自动热代码替换流拓扑的工作方式gydF4y2Ba

  • 支持将Kafka Streams管道作为原生二进制通道执行gydF4y2BaGraalVMgydF4y2Ba,从而显著减少内存消耗和启动时间gydF4y2Ba

变更事件概述"></div>
          <div class=

这张图片显示了我们的解决方案的概述。gydF4y2Ba

使用Quarkus Kafka Streams扩展创建应用程序gydF4y2Ba

使用Kafka Streams扩展创建一个新的Quarkus项目,运行以下命令:gydF4y2Ba

mvn io.quarkus: quarkus-maven-plugin: 1.12.2。Final:create \ -DprojectGroupId=org。一个c米e\-DprojectArtifactId=customer-addresses-aggregator \ -Dextensions="kafka-streams" cd customer-addresses-aggregator

了解流处理拓扑gydF4y2Ba

我们有一个聚合器应用程序,它将读取两个Kafka主题,并在流管道中处理它们:gydF4y2Ba

  • 这两个主题在客户id上连接gydF4y2Ba

  • 每个客户都有丰富的地址gydF4y2Ba

  • 这些聚合数据被写入第三个主题,gydF4y2BacustomersWithAddressesTopicgydF4y2Ba

当在Kafka流中使用Quarkus扩展时,我们所需要做的就是声明一个gydF4y2BaCDI生产者法gydF4y2Ba,它返回流处理应用程序的拓扑结构。此方法必须用gydF4y2Ba与@gydF4y2Ba,它必须返回一个gydF4y2Ba拓扑结构gydF4y2Ba实例。Quarkus扩展负责配置、启动和停止Kafka Streams引擎。现在让我们看一下实际的流查询实现本身。gydF4y2Ba

@ApplicationScopedgydF4y2Ba公共gydF4y2Ba类gydF4y2BaTopologyProducergydF4y2Ba{gydF4y2Ba@ConfigPropertygydF4y2Ba(name =gydF4y2Ba"gydF4y2Bacustomers.topicgydF4y2Ba"gydF4y2Ba)gydF4y2Ba(1)gydF4y2Ba字符串gydF4y2BacustomersTopic;gydF4y2Ba@ConfigPropertygydF4y2Ba(name =gydF4y2Ba"gydF4y2Baaddresses.topicgydF4y2Ba"gydF4y2Ba)gydF4y2Ba字符串gydF4y2BaaddressesTopic;gydF4y2Ba@ConfigPropertygydF4y2Ba(name =gydF4y2Ba"gydF4y2Bacustomers.with.addresses.topicgydF4y2Ba"gydF4y2Ba)gydF4y2Ba字符串gydF4y2BacustomersWithAddressesTopic;gydF4y2Ba与@gydF4y2Ba公共gydF4y2Ba拓扑buildTopology() {StreamsBuilder构建器=gydF4y2Ba新gydF4y2BaStreamsBuilder ();gydF4y2Ba(2)gydF4y2BaSerde 长gydF4y2Ba> adressKeySerde = 开云体育官方注册网址DebeziumSerdes.payloadJson(gydF4y2Ba长gydF4y2Ba. class);adressKeySerde.configure (gydF4y2Ba集合gydF4y2Ba.emptyMap (),gydF4y2Ba真正的gydF4y2Ba);Serde
addressSerde = 开云体育官方注册网址DebeziumSerdes.payloadJson(Address.class);addressSerde.configure (gydF4y2Ba集合gydF4y2Ba.singletonMap (gydF4y2Ba"gydF4y2Bafrom.fieldgydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba后gydF4y2Ba"gydF4y2Ba),gydF4y2Ba假gydF4y2Ba);Serde 整数gydF4y2Ba> customerkeyserde = D开云体育官方注册网址ebeziumSerdes.payloadJson(gydF4y2Ba整数gydF4y2Ba. class);customersKeySerde.configure (gydF4y2Ba集合gydF4y2Ba.emptyMap (),gydF4y2Ba真正的gydF4y2Ba);Serde customerserde = D开云体育官方注册网址ebeziumSerdes.payloadJson(Customer.class);customersSerde.configure (gydF4y2Ba集合gydF4y2Ba.singletonMap (gydF4y2Ba"gydF4y2Bafrom.fieldgydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba后gydF4y2Ba"gydF4y2Ba),gydF4y2Ba假gydF4y2Ba);JsonbSerde addressAndCustomerSerde =gydF4y2Ba新gydF4y2BaJsonbSerde < > (AddressAndCustomer.class);gydF4y2Ba(3)gydF4y2BaJsonbSerde customerWithAddressesSerde =gydF4y2Ba新gydF4y2BaJsonbSerde < > (CustomerWithAddresses.class);KTable 长gydF4y2Ba,地址>地址= builder.table(gydF4y2Ba(4)gydF4y2BaaddressesTopic,消耗。w我th(adressKeySerde, addressSerde) ); KTable<整数gydF4y2Ba,客户>客户=建设者。table(customersTopic,已消费。w我th(customersKeySerde, customersSerde) ); KTable<整数gydF4y2Ba, CustomerWithAddresses = addresses.join(gydF4y2Ba(5)gydF4y2Ba客户,地址->地址。customer_id,AddressAndCustomer::新gydF4y2Ba, Materialized.with(Serdes.Long(), addressAndCustomerSerde)) .groupBy(gydF4y2Ba(6)gydF4y2Ba(addressId, addressAndCustomer) -> KeyValue。一对(addressAndCustomer.customer。id,一个ddre年代年代AndCustomer), Grouped.with(Serdes.Integer(), addressAndCustomerSerde) ) .aggregate((7)gydF4y2BaCustomerWithAddresses::gydF4y2Ba新gydF4y2Ba, (customerId, addressAndCustomer, aggregate) -> aggregate。一个ddAddress( addressAndCustomer), (customerId, addressAndCustomer, aggregate) -> aggregate.removeAddress( addressAndCustomer), Materialized.with(Serdes.Integer(), customerWithAddressesSerde) ); customersWithAddresses.toStream()(8)gydF4y2Ba.to(customerwithaddressestopic, Produced.with(Serdes.Integer(), customerWithAddressesSerde));gydF4y2Ba返回gydF4y2Babuilder.build ();}}gydF4y2Ba
1gydF4y2Ba 类注入主题名称gydF4y2Ba微配置接口gydF4y2Ba,这些值是在夸克中提供的gydF4y2Baapplication.propertiesgydF4y2Ba配置文件(例如,可以使用环境变量覆盖它们)gydF4y2Ba
2gydF4y2Ba 创建的实例gydF4y2BaStreamsBuildergydF4y2Ba,这有助于我们构建拓扑结构gydF4y2Ba
3.gydF4y2Ba 为了将流管道中使用的Java类型序列化和反序列化到JSON中,Quarkus提供了gydF4y2Ba类io.quarkus.kafka.client.serialization.JsonbSerdegydF4y2Ba;Serde的实现基于gydF4y2BaJSON-BgydF4y2Ba
4gydF4y2Ba 的gydF4y2BaKTablegydF4y2Ba-gydF4y2BaKTablegydF4y2Ba外键连接功能用于提取gydF4y2Ba客户id号gydF4y2Ba并执行连接;gydF4y2BaStreamsBuilder #表()gydF4y2Ba是用来读取两个Kafka主题到KTablegydF4y2Ba地址gydF4y2Ba而且gydF4y2Ba客户gydF4y2Ba分别gydF4y2Ba
5gydF4y2Ba 来自gydF4y2Ba地址gydF4y2Ba主题与对应的连接gydF4y2Ba客户gydF4y2Ba主题;连接结果包含客户的数据及其其中一个地址gydF4y2Ba
6gydF4y2Ba groupBy ()gydF4y2Ba操作将根据这些记录进行分组gydF4y2Ba客户id号gydF4y2Ba
7gydF4y2Ba 要生成一个客户及其所有地址的嵌套结构,可以使用gydF4y2Ba总()gydF4y2Ba操作应用于每组记录(客户-地址元组),更新一个gydF4y2BaCustomerWithAddressesgydF4y2Ba每一个客户gydF4y2Ba
8gydF4y2Ba 管道的结果被写入gydF4y2BacustomersWithAddressesTopicgydF4y2Ba主题gydF4y2Ba

的gydF4y2BaCustomerWithAddressesgydF4y2Ba类在流管道中处理事件时跟踪聚合的值。gydF4y2Ba

公共gydF4y2Ba类gydF4y2BaCustomerWithAddressesgydF4y2Ba{gydF4y2Ba公共gydF4y2Ba客户的客户;gydF4y2Ba公共gydF4y2Ba列表gydF4y2Ba
addresses =gydF4y2Ba新gydF4y2BaArrayListgydF4y2Ba< > ();gydF4y2Ba公共gydF4y2BaCustomerWithAddresses addAddress(AddressAndCustomer AddressAndCustomer) {customer = AddressAndCustomer .customer;addresses.add (addressAndCustomer.address);gydF4y2Ba返回gydF4y2Ba这gydF4y2Ba;}gydF4y2Ba公共gydF4y2BaCustomerWithAddresses removeAddress(AddressAndCustomer AddressAndCustomer) {gydF4y2Ba迭代器gydF4y2Ba
it = addresses.iterator();gydF4y2Ba而gydF4y2Ba(it.hasNext()){地址a = it.next();gydF4y2Ba如果gydF4y2Ba(a.d id == addressAndCustomer.address.id) {it.remove();gydF4y2Ba打破gydF4y2Ba;}}gydF4y2Ba返回gydF4y2Ba这gydF4y2Ba;}}gydF4y2Ba

Kafka Streams扩展是通过Quarkus配置文件配置的gydF4y2Baapplication.propertiesgydF4y2Ba.除了主题名称,这个文件还包含了Kafka引导服务器和几个流选项的信息:gydF4y2Ba

customers.topic = dbserver1.inventory。客户addresses.topic=db年代erver1.我nventory.一个ddresses customers.with.addresses.topic=customers-with-addresses quarkus.kafka-streams.bootstrap-servers=localhost:9092 quarkus.kafka-streams.application-id=kstreams-fkjoin-aggregator quarkus.kafka-streams.application-server=${hostname}:8080 quarkus.kafka-streams.topics=${customers.topic},${addresses.topic} # streams options kafka-streams.cache.max.bytes.buffering=10240 kafka-streams.commit.interval.ms=1000 kafka-streams.metadata.max.age.ms=500 kafka-streams.auto.offset.reset=earliest kafka-streams.metrics.recording.level=DEBUG kafka-streams.consumer.session.timeout.ms=150 kafka-streams.consumer.heartbeat.interval.ms=100

构建和运行应用程序gydF4y2Ba

你现在可以像这样构建应用程序:gydF4y2Ba

MVN清洁包gydF4y2Ba

为了运行应用程序和所有相关组件(Kafka, Kafka Connect with Debezium, Postgres数据库),我们创建了一个开云体育官方注册网址开云体育电动老虎机gydF4y2BaDocker撰写文件gydF4y2Ba,你可以在gydF4y2Ba开云体育官方注册网址debezium-examplesgydF4y2Ba回购。要启动所有容器,也要构建聚合器容器映像,请运行以下命令:gydF4y2Ba

export 开云体育官方注册网址DEBEZIUM_VERSION=1.4 docker-compose up—buildgydF4y2Ba

为了在Kafka Connect开云体育官方注册网址上注册Debezium连接器,你需要指定一些配置属性,比如连接器的名称、数据库主机名、用户名、密码、端口、数据库名等等。开云体育电动老虎机创建文件gydF4y2Baregister-postgres.jsongydF4y2Ba其内容如下:gydF4y2Ba

{gydF4y2Ba"gydF4y2Baconnector.classgydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Baio.开云体育官方注册网址debezium.connector.postgresql.PostgresConnectorgydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Batasks.maxgydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba1gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba开云体育电动老虎机database.hostnamegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2BapostgresgydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba开云体育电动老虎机database.portgydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba5432gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba开云体育电动老虎机database.usergydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2BapostgresgydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba开云体育电动老虎机database.passwordgydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2BapostgresgydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba开云体育电动老虎机database.dbnamegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2BapostgresgydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba开云体育电动老虎机database.server.namegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Badbserver1gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Baschema.includegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba库存gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Badecimal.handling.modegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba字符串gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Bakey.convertergydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Baorg.apache.kafka.connect.json.JsonConvertergydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Bakey.converter.schemas.enablegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba假gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Bavalue.convertergydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Baorg.apache.kafka.connect.json.JsonConvertergydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Bavalue.converter.schemas.enablegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba假gydF4y2Ba"gydF4y2Ba}gydF4y2Ba

配置Debezium连接器:开云体育官方注册网址gydF4y2Ba

http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.jsongydF4y2Ba

的实例gydF4y2Ba开云体育官方注册网址debezium /工具gydF4y2Ba集装箱图片:gydF4y2Ba

Docker运行——tty——rm \——network kstreams-fk-join-network \ debez开云体育官方注册网址ium/ tools:1.1 \gydF4y2Ba

此映像提供了一些有用的工具,例如gydF4y2BakafkacatgydF4y2Ba.在工具容器中,运行kafkacat来检查流管道的结果:gydF4y2Ba

kafkacat -b kafka:9092 -C -o beginning -q \ -t customers-with-addresses | jq。gydF4y2Ba

您应该看到如下所示的记录,每个记录包含一个客户的所有数据及其所有地址:gydF4y2Ba

{gydF4y2Ba"gydF4y2Ba地址gydF4y2Ba"gydF4y2Ba: [{gydF4y2Ba"gydF4y2Ba城市gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba汉堡gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba国家gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba加拿大gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Bacustomer_idgydF4y2Ba"gydF4y2Ba:gydF4y2Ba1001gydF4y2Ba,gydF4y2Ba"gydF4y2BaidgydF4y2Ba"gydF4y2Ba:gydF4y2Ba100001gydF4y2Ba,gydF4y2Ba"gydF4y2Ba街gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba主街42号gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2BazipcodegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba90210gydF4y2Ba"gydF4y2Ba}, {gydF4y2Ba"gydF4y2Ba城市gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba柏林gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba国家gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba加拿大gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Bacustomer_idgydF4y2Ba"gydF4y2Ba:gydF4y2Ba1001gydF4y2Ba,gydF4y2Ba"gydF4y2BaidgydF4y2Ba"gydF4y2Ba:gydF4y2Ba100002gydF4y2Ba,gydF4y2Ba"gydF4y2Ba街gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba11博士后。gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2BazipcodegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba90211gydF4y2Ba"gydF4y2Ba}),gydF4y2Ba"gydF4y2Ba客户gydF4y2Ba"gydF4y2Ba: {gydF4y2Ba"gydF4y2Ba电子邮件gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Basally.thomas@acme.comgydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Bafirst_namegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba莎莉gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2BaidgydF4y2Ba"gydF4y2Ba:gydF4y2Ba1001gydF4y2Ba,gydF4y2Ba"gydF4y2Balast_namegydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba托马斯。gydF4y2Ba"gydF4y2Ba}}gydF4y2Ba

获取数据库的shell,插入、更新或删除开云体育电动老虎机一些记录,连接将被自动重新处理:gydF4y2Ba

$gydF4y2BadgydF4y2BaogydF4y2BacgydF4y2BakgydF4y2BaegydF4y2BargydF4y2BargydF4y2BaugydF4y2BangydF4y2Ba-gydF4y2Ba-gydF4y2BatgydF4y2BatgydF4y2BaygydF4y2Ba-gydF4y2Ba-gydF4y2BargydF4y2Ba米gydF4y2Ba-gydF4y2Ba我gydF4y2Ba\gydF4y2Ba-gydF4y2Ba-gydF4y2BangydF4y2BaegydF4y2BatgydF4y2BawgydF4y2BaogydF4y2BargydF4y2BakgydF4y2BakgydF4y2Ba年代gydF4y2BatgydF4y2BargydF4y2BaegydF4y2Ba一个gydF4y2Ba米gydF4y2Ba年代gydF4y2Ba-gydF4y2BafgydF4y2BakgydF4y2Ba-gydF4y2BajgydF4y2BaogydF4y2Ba我gydF4y2BangydF4y2Ba-gydF4y2BangydF4y2BaegydF4y2BatgydF4y2BawgydF4y2BaogydF4y2BargydF4y2BakgydF4y2Ba\gydF4y2BadgydF4y2BaegydF4y2BabgydF4y2BaegydF4y2BazgydF4y2Ba我gydF4y2BaugydF4y2Ba米gydF4y2Ba/gydF4y2BatgydF4y2BaogydF4y2BaogydF4y2BalgydF4y2Ba我gydF4y2BangydF4y2BaggydF4y2Ba:gydF4y2Ba1.1gydF4y2Ba\gydF4y2BabgydF4y2Ba一个gydF4y2Ba年代gydF4y2BahgydF4y2Ba-gydF4y2BacgydF4y2Ba'gydF4y2BapgydF4y2BaggydF4y2BacgydF4y2BalgydF4y2Ba我gydF4y2BapgydF4y2BaogydF4y2Ba年代gydF4y2BatgydF4y2BaggydF4y2BargydF4y2BaegydF4y2Ba年代gydF4y2Ba问gydF4y2BalgydF4y2Ba:gydF4y2Ba/gydF4y2Ba/gydF4y2BapgydF4y2BaogydF4y2Ba年代gydF4y2BatgydF4y2BaggydF4y2BargydF4y2BaegydF4y2Ba年代gydF4y2Ba:gydF4y2BapgydF4y2BaogydF4y2Ba年代gydF4y2BatgydF4y2BaggydF4y2BargydF4y2BaegydF4y2Ba年代gydF4y2Ba@gydF4y2BapgydF4y2BaogydF4y2Ba年代gydF4y2BatgydF4y2BaggydF4y2BargydF4y2BaegydF4y2Ba年代gydF4y2Ba:gydF4y2Ba5432gydF4y2Ba/gydF4y2BapgydF4y2BaogydF4y2Ba年代gydF4y2BatgydF4y2BaggydF4y2BargydF4y2BaegydF4y2Ba年代gydF4y2Ba'gydF4y2Ba#gydF4y2Ba我gydF4y2BangydF4y2BapgydF4y2BaggydF4y2BacgydF4y2BalgydF4y2Ba我gydF4y2Ba,gydF4y2BaegydF4y2Ba.gydF4y2BaggydF4y2Ba.gydF4y2BatgydF4y2BaogydF4y2BaugydF4y2BapgydF4y2BadgydF4y2Ba一个gydF4y2BatgydF4y2BaegydF4y2Ba一个gydF4y2BacgydF4y2BaugydF4y2Ba年代gydF4y2BatgydF4y2BaogydF4y2Ba米gydF4y2BaegydF4y2BargydF4y2BargydF4y2BaegydF4y2BacgydF4y2BaogydF4y2BargydF4y2BadgydF4y2Ba:gydF4y2Ba>gydF4y2BaugydF4y2BapgydF4y2BadgydF4y2Ba一个gydF4y2BatgydF4y2BaegydF4y2Ba我gydF4y2BangydF4y2BavgydF4y2BaegydF4y2BangydF4y2BatgydF4y2BaogydF4y2BargydF4y2BaygydF4y2Ba.gydF4y2BacgydF4y2BaugydF4y2Ba年代gydF4y2BatgydF4y2BaogydF4y2Ba米gydF4y2BaegydF4y2BargydF4y2Ba年代gydF4y2Ba年代gydF4y2BaegydF4y2BatgydF4y2BafgydF4y2Ba我gydF4y2BargydF4y2Ba年代gydF4y2BatgydF4y2Ba_gydF4y2BangydF4y2Ba一个gydF4y2Ba米gydF4y2BaegydF4y2Ba=gydF4y2Ba'gydF4y2Ba年代gydF4y2Ba一个gydF4y2BargydF4y2Ba一个gydF4y2BahgydF4y2Ba'gydF4y2BawgydF4y2BahgydF4y2BaegydF4y2BargydF4y2BaegydF4y2Ba我gydF4y2BadgydF4y2Ba=gydF4y2Ba1001gydF4y2Ba;gydF4y2Ba

本机运行gydF4y2Ba

Kafka Streams应用程序可以很容易地向外扩展,即负载将在应用程序的多个实例之间共享,每个实例处理输入主题分区的子集。当Quarkus应用程序通过GraalVM编译成本机代码时,它占用的内存大大减少,启动时间也非常快。不用担心内存管理,你可以并行启动Kafka Streams管道的多个实例。gydF4y2Ba

如果你想运行这个应用程序gydF4y2Ba本地的gydF4y2Ba模式,设置gydF4y2BaQUARKUS_MODEgydF4y2Ba作为gydF4y2Ba本地的gydF4y2Ba并运行以下命令(确保安装了所需的GraalVM工具):gydF4y2Ba

mvn清洁包-PnativegydF4y2Ba

要了解更多关于以原生二进制文件运行Kafka Streams应用程序的信息,请参阅gydF4y2Ba参考指南gydF4y2Ba.gydF4y2Ba

更多关于Kafka流扩展的见解gydF4y2Ba

在为流处理构建微服务时,Quarkus扩展还可以帮助您解决一些常见的需求。为了在生产环境中运行Kafka Streams应用程序,例如,您可以轻松地为数据管道添加健康检查和指标。gydF4y2Ba

测微计指标gydF4y2Ba提供关于Quarkus应用程序的丰富指标,即通过监视应用程序内部发生了什么,以及它的性能特征是什么。Quarkus允许您使用JSON格式或OpenMetrics格式通过HTTP公开这些指标。从那里,它们可以被诸如gydF4y2Ba普罗米修斯gydF4y2Ba并储存起来进行分析和可视化。gydF4y2Ba

一旦应用程序启动,指标将在gydF4y2Baq /指标gydF4y2Ba,默认以OpenMetrics格式返回数据:gydF4y2Ba

# HELP kafka_producer_node_request_total发送的请求总数# TYPE kafka_producer_node_request_total counter kafka_producer_node_request_total{client_id="kstreams-fkjoin-aggregator-b4ac1384-0e0a-4f19-8d52- 8cc1ee4c6dfs - streamthread -1-producer",kafka_version="2.5.0",node_id="node——1",status="up",} 83.0 # HELP kafka_producer_record_send_rate平均每秒发送的记录数。#式kafka_producer_record_send_rate计kafka_producer_record_send_rate {client_id =“kstreams-fkjoin-aggregator-b4ac1384-0e0a-4f19-8d52-8cc1ee4c6dfe-StreamThread-1-producer kafka_version =“2.5.0”状态=“向上”,}0.0 #帮助jvm_gc_memory_allocated_bytes_total递增的增加(年轻的)堆内存池的大小在一个GC之前下一个#类型jvm_gc_memory_allocated_bytes_total计数器jvm_gc_memory_allocated_bytes_total 1.1534336 e8 #……# HELP http_requests_total # TYPE http_requests_total counter http_requests_total{status="up",uri="/api/customers",} 0.0 #…gydF4y2Ba

如果您不使用Prometheus,您有一些选项,如Datadog、Stackdriver等。有关详细指南,请参阅gydF4y2BaQuarkiverse扩展gydF4y2Ba.gydF4y2Ba

另一方面,我们有gydF4y2Ba微表面卫生gydF4y2BaSpec,它提供了关于应用程序的活动状态的信息,即表明应用程序是否正在运行,以及应用程序是否能够处理请求。要监视现有Quarkus应用程序的运行状况状态,可以添加gydF4y2Basmallrye-healthgydF4y2Ba扩展:gydF4y2Ba

mvn quarkus:add-extension -Dextensions="smallrye-health"gydF4y2Ba

Quarkus将通过HTTP下公开所有健康检查gydF4y2Baq /健康gydF4y2Ba,在我们的例子中,它显示了管道的状态和任何缺失的主题:gydF4y2Ba

{gydF4y2Ba"gydF4y2Ba状态gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba下来gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba检查gydF4y2Ba"gydF4y2Ba: [{gydF4y2Ba"gydF4y2Ba的名字gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2BaKafka流主题运行状况检查gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba状态gydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Ba下来gydF4y2Ba"gydF4y2Ba,gydF4y2Ba"gydF4y2Ba数据gydF4y2Ba"gydF4y2Ba: {gydF4y2Ba"gydF4y2Bamissing_topicsgydF4y2Ba"gydF4y2Ba:gydF4y2Ba"gydF4y2Badbserver1.inventory.customers, dbserver1.inventory.addressesgydF4y2Ba"gydF4y2Ba}}]}gydF4y2Ba

总结gydF4y2Ba

Kafka流的Quarkus扩展提供了在JVM上运行流处理管道所需的一切,以及在本机模式下,以及执行健康检查、度量等额外奖励。例如,你可以很容易地使用Quarkus REST支持为交互式查询公开REST api,潜在地从其他扩展的Kafka Streams应用程序实例中检索数据gydF4y2BaMicroProfile REST客户端APIgydF4y2Ba.gydF4y2Ba

在本文中,我们讨论了Kafka Streams中外键连接的流处理拓扑结构,以及如何使用Quarkus Kafka Streams扩展在JVM模式下运行和构建应用程序。你可以找到完整的gydF4y2Ba源代码gydF4y2Ba在Debezium示例repo中实现。开云体育官方注册网址如果你有任何问题或反馈,请在下面的评论中告诉我们。我们期待您的建议!gydF4y2Ba

莫汉蒂AnishagydF4y2Ba

Anisha是红帽公司的副软件工程师。目前与Debezium团队合作。开云体育官方注册网址她住在印度班加罗尔。gydF4y2Ba


关于Debe开云体育官方注册网址ziumgydF4y2Ba

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在gydF4y2Ba卡夫卡gydF4y2Ba并提供gydF4y2Ba卡夫卡连接gydF4y2Ba监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是gydF4y2Ba开源gydF4y2Ba下gydF4y2BaApache许可证,版本2.0gydF4y2Ba.gydF4y2Ba

参与gydF4y2Ba

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们gydF4y2Ba@开云体育官方注册网址debeziumgydF4y2Ba,gydF4y2Ba在Zulip上和我们聊天gydF4y2Ba,或加入我们的gydF4y2Ba邮件列表gydF4y2Ba与社区对话。所有的代码都是开源的gydF4y2BaGitHub上gydF4y2Ba,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址gydF4y2Ba记录问题gydF4y2Ba.gydF4y2Ba