典型的Debezium用例之一是使用变开云体育官方注册网址更数据捕获来将遗留系统与组织中的其他系统集成。实现这一目标有多种方法

  • 使用Debezium将数据写入Kafka,并结合Ka开云体育官方注册网址fka Streams管道和Kafka Connect连接器将更改交付给其他系统

  • 使用开云体育官方注册网址Debezium嵌入式引擎在Java独立应用程序中,使用纯Java编写集成代码;它通常用于将更改事件发送到其他消息传递基础设施,如Amazon Kinesis,谷歌Pub/Sub等。

  • 使用现有的集成框架或服务总线来表达管道逻辑

本文主要讨论第三种选择——专用集成框架。

Apache骆驼

骆驼是一个开源的集成框架,使开发人员能够从不同的系统和服务中读取、转换、路由和写入数据。它提供大量的现成产品组件提供到第三方系统的接口或提供的实现企业集成模式

这种组合允许开发人员轻松地连接到目标系统,并使用声明性DSL表示集成管道。

Camel和Debe开云体育官方注册网址zium

骆驼3已经于2019年底发布,除了主要的重新架构之外,新的Debezium组件已经添加到代码库中。开云体育官方注册网址它还使Camel能够用作一个连接器在Kafka连接运行时。

这篇文章只关注Debezium组件的使用,后一种选择将在开云体育官方注册网址以后的文章中讨论。

正如你所看到的non-incubating开云体育官方注册网址Debezium连接器由其专用组件表示。这种解决方案的优点是完全隔离依赖项和连接器实例的类型安全配置。

该组件在内部公开了一个Debezium开云体育官方注册网址端点事件驱动的骆驼消费者类的实例开云体育官方注册网址Debezium嵌入式引擎

一个例子

作为一个例子,我们构建了一个简单的问答(Q & a)应用程序,灵感来自StackOverflow等。REST API允许发布新的问题以及现有问题的答案,这些问题存储在数据库中。开云体育电动老虎机

应用程序生成的任何数据更改(例如,如果创建了一个新的问题或答案)都通过Debezium捕获并传递给Camel管道,后者通过SMTP服务器发送电子邮件,并在提供的Twitter帐户上发布相应的tweet。开云体育官方注册网址

你可以找到完整的源代码这个例子在GitHub上。

拓扑结构

解决方案拓扑中有多个组件:

图1。部署拓扑

  • Q & A应用程序使用Quarkus堆栈,并公开一个REST API来创建问题和答案

  • 应用程序将其数据存储在PostgreSQL数据库中开云体育电动老虎机

  • Camel路由作为一个普通的Java应用程序运行,该应用程序使用嵌入式Infinispan存储以保持其状态(用于构建一个将问题与答案链接起来的聚合对象),并通过电子邮件和关联的Twitter帐户向“关于已回答的问题”发送消息

  • 一个MailHog运行在容器中的SMTP服务器,用于发送电子邮件

问答应用

源应用程序是一个基于Quarkus的简单REST服务。它管理两个实体,问题而且回答,以1: n关系存储在PostgreSQL数据库中。开云体育电动老虎机

图2。后端服务实体关系图

实体是使用REST API创建的,它们之间的关联是自动建立的。

骆驼管道

Camel管道是以下业务规则的表达式:

  • 对于每个创建或更新的问题,发送电子邮件给问题创建者

  • 对于每个创建或更新的答案,发送电子邮件给问题和答案创建者

  • 当一个问题达到三个答案时,在专门的推特账户上发布一条推文

业务需求被转换为下面EIP图所描述的管道:

图4。骆驼管道

代码走查

要使用Debe开云体育官方注册网址zium Camel组件,我们至少需要将以下依赖项添加到pom.xml文件

< dependencyManagement >< >的依赖关系<依赖>< groupId >org.apache.camel< / groupId >< artifactId >camel-bom< / artifactId ><版本>$ {version.camel}> < /版本<类型>砰的一声> < /类型< >范围进口< / >范围< / >的依赖<!——使用所需的Debezium版开云体育官方注册网址本——><依赖>< groupId >io.开云体育官方注册网址debezium< / groupId >< artifactId >开云体育官方注册网址debezium-connector-postgres< / artifactId ><版本>$ {version开云体育官方注册网址.debezium}> < /版本< / >的依赖<依赖>< groupId >io.开云体育官方注册网址debezium< / groupId >< artifactId >开云体育官方注册网址debezium-embedded< / artifactId ><版本>$ {version开云体育官方注册网址.debezium}> < /版本< / >的依赖<依赖>< groupId >io.开云体育官方注册网址debezium< / groupId >< artifactId >开云体育官方注册网址debezium-core< / artifactId ><版本>$ {version开云体育官方注册网址.debezium}> < /版本< / >的依赖< / >的依赖关系< / dependencyManagement >< >的依赖关系<依赖>< groupId >org.apache.camel< / groupId >< artifactId >camel-core< / artifactId >< / >的依赖<依赖>< groupId >org.apache.camel< / groupId >< artifactId >camel-开云体育官方注册网址debezium-postgres< / artifactId >< / >的依赖< / >的依赖关系

管道逻辑本身定义在Qa开云体育电动老虎机DatabaseUserNotifier类。它的主要路线如下:

公共Qa开云体育电动老虎机DatabaseUserNotifier扩展RouteBuilder {@Override公共无效配置()抛出异常{(开云体育官方注册网址debezium-postgres: localhost ?+开云体育电动老虎机databaseHostname = {{database.hostname}}+&开云体育电动老虎机databasePort = {{database.port}}+&开云体育电动老虎机databaseUser = {{database.user}}+&开云体育电动老虎机databasePassword = {{database.password}}+&开云体育电动老虎机databaseDbname = postgres+&开云体育电动老虎机databaseServerName = qa+&schemaWhitelist = 开云体育电动老虎机{{database.schema}}+&tableWhitelist = 开云体育电动老虎机{{database.schema}} .question, {{database.schema}} .answer+&offsetStorage = org.apache.kafka.connect.storage.MemoryOffsetBackingStore) .routeId(Qa开云体育电动老虎机DatabaseUserNotifier.class.getName() +.开云体育电动老虎机DatabaseReader(1). log (LoggingLevel.DEBUG传入消息${body}带有标题${headers}) .choice ()(2)当(isQuestionEvent) .filter (isCreateOrUpdateEvent)(3).convertBodyTo (Question.class)(4). log (LoggingLevel.TRACE转换为逻辑类${body}) .bean(商店,readFromStoreAndUpdateIfNeeded(5)文(ROUTE_MAIL_QUESTION_CREATE)(6).endChoice() .when(isAnswerEvent) .filter(isCreateOrUpdateEvent) .convertBodyTo(Answer.class) .log(LoggingLevel.TRACE,转换为逻辑类${body}) .bean(商店,readFromStoreAndAddAnswer) .to(ROUTE_MAIL_ANSWER_CHANGE) .filter(hasManyAnswers)(7).setBody () .simple (问题“$ {exchangeProperty(聚合)。文本}”+许多答案(生成于+ Instant.now() +) .to(TWITTER_SERVER) .end() .endChoice() . else () .log(LoggingLevel.WARN,未知类型${headers[+ 开云体育官方注册网址DebeziumConstants。HEADER_IDENTIFIER +]}) .endParent ();从(ROUTE_MAIL_QUESTION_CREATE)(6).routeId (Q开云体育电动老虎机aDatabaseUserNotifier.class.getName () +.QuestionNotifier) .setHeader () .simple ($ {body.email}) .setHeader (主题) .simple (创建/编辑的问题) .setBody () .simple (“${身体问题。文本}' was created or edited), (SMTP_SERVER);}@Converter公共静态转换器@Converter公共静态问题questionFromStruct (结构体结构体){(4)返回问题(struct.getInt64 (id), struct.getString (文本), struct.getString (电子邮件));}@Converter公共静态回答answerFromStruct (结构体结构体){(4)返回答案(struct.getInt64 (id), struct.getString (文本), struct.getString (电子邮件), struct.getInt64 (question_id));}}}
1 是Debezi开云体育官方注册网址um源端点。URI部分直接映射到连接器配置选项。
2 管道逻辑根据变更事件类型被分割。认识是基于Camel开云体育官方注册网址DebeziumIdentifier头,其中包含标识符(< server_name >, < schema_name >。< table_name >)的源表。
3. 管道现在只能处理更新和删除。认识是基于Camel开云体育官方注册网址DebeziumOperation头文件,包含人事处消息字段信封
4 卡夫卡连接结构体类型转换为管道中使用的逻辑类型。转换由自定义Camel转换器执行。可以使用开箱即用开云体育官方注册网址DebeziumTypeConverter转换结构体成一个地图但是这将管道逻辑紧密地耦合到表结构中。
5 调用与路由通信的补充路由消息存储基于Infinispan缓存构建消息聚合。消息存储将检查是否已经存储了问题。如果没有,则创建并存储一个新的聚合,否则将使用新数据更新存储的聚合。
6 调用补充路由格式化邮件消息,并通过SMTP端点将其交付给问题创建者。
7 与答案消息类型相关的路由部分非常相似(答案被添加到问题聚合中)。主要的区别是当聚合包含三个答案时发布Twitter消息。

另外,为了简单起见,该示例目前使用易失性内存存储Debezium偏移量。开云体育官方注册网址对于持久存储,您可以使用基于文件的偏移存储,也可以基于Infinispan创建自定义偏移存储实现,将偏移存储委托给底层缓存。

演示

为了运行演示,您需要有一个Twitter开发者账户与适当的API密钥和秘密。

进入应用程序目录并构建所有组件:

$ MVN清洁安装

启动服务(提供你自己的Twitter API凭证):

$ env TWITTER_CONSUMER_KEY=<…> TWITTER_CONSUMER_SECRET = <…> TWITTER_ACCESS_TOKEN = <…> TWITTER_ACCESS_TOKEN_SECRET = <…docker-compose up

在另一个终端中创建一个问题和三个答案:

$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/ -d @src/test/resources/messages/create-question。json$ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer1.json $ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer2.json $ curl -v -X POST -H 'Content-Type: application/json' http://0.0.0.0:8080/question/1/answer -d @src/test/resources/messages/create-answer3.json

该推特账户应该包含一条新的推文,标题为“问题‘狗有几条腿?’有很多答案(生成于2020-02-17T08:02:33.744Z)”。也MailHog服务器界面应该显示如下消息:

图4。MailHog的信息

结论

Apache Camel是实现系统集成场景的一个非常有趣的选项。

在不需要任何外部消息传递基础设施的情况下,使用Debezium组件部署一个独立的Camel路由是非常容易的,可以捕获数据更改,并对其执行复杂的路由和转换操作。开云体育官方注册网址Camel为开发人员提供了完整的企业集成模式实现库,以及用于不同系统的超过100个连接器,这些连接器可以包含在复杂的服务编制中。

完整示例的源代码可用GitHub上

雅罗西克Pechanec

Jiri是Red Hat的软件开发人员(前质量工程师)。他职业生涯的大部分时间都花在Java和系统集成项目和任务上。他住在捷克共和国布尔诺附近。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题