基于内容的路由

默认情况下,Debezi开云体育官方注册网址um将从表中读取的所有更改事件流式传输到单个静态主题。但是,在某些情况下,您可能希望根据事件内容将所选事件重新路由到其他主题。信息中描述了基于消息内容路由消息的过程基于内容的路由消息传递模式。要在Debezium中应用此模式,需要使用基于内容开云体育官方注册网址的路由单个消息转换(SMT)来编写为每个事件求值的表达式。根据事件的计算方式,SMT可以将事件消息路由到原始目标主题,也可以将其重新路由到表达式中指定的主题。

基于内容的路由SMT正在积极开发中。发出的消息的结构或其他细节可能会随着开发的进展而改变。

虽然可以使用Java创建自定义SMT来编码路由逻辑,但使用自定义编码的SMT有其缺点。例如:

  • 有必要预先编译转换并将其部署到Kafka Connect中。

  • 每次更改都需要代码重新编译和重新部署,导致操作不灵活。

基于内容的路由SMT支持集成的脚本语言JSR 223(用于Java™平台的脚本)。

开云体育官方注册网址Debezium没有提供任何JSR 223 API的实现。要在Debezium中使用表达式语言,必须下载该语言的JSR 223脚开云体育官方注册网址本引擎实现。例如,对于Groovy 3,可以从以下网站下载它的JSR 223实现https://groovy-lang.org/.GraalVM JavaScript的JSR223实现可在https://github.com/graalvm/graaljs.获得脚本引擎文件后,将它们添加到Debezium连接器插件目录中,以及该语言实现使用的任何其他JAR文件。开云体育官方注册网址

设置

出于安全原因,Debezium连接器存档中没有包含基于内容的路由SMT。开云体育官方注册网址相反,它是在一个单独的工件中提供的,开云体育官方注册网址debezium-scripting-2.1.2.Final.tar.gz

要使用基于内容的路由SMT和Debezium连接器插件,必须显式地将SMT工件添加到Ka开云体育官方注册网址fka Connect环境中。重要提示:在Kafka Connect实例中出现路由SMT后,任何被允许向实例添加连接器的用户都可以运行脚本表达式。为了确保脚本表达式只能由授权用户运行,在添加路由SMT之前,请确保Kafka Connect实例及其配置接口的安全。

动物园管理员卡夫卡卡夫卡连接,并且安装了一个或多个Debeziu开云体育官方注册网址m连接器,安装过滤器SMT的剩余任务是:

  1. 下载脚本SMT存档

  2. 将存档的内容提取到Kafka Connect环境的Debezium插件目录中。开云体育官方注册网址

  3. 获取一个JSR-223脚本引擎实现,并将其内容添加到Kafka Connect环境的Debezium插件目录中。开云体育官方注册网址

  4. 重新启动Kafka Connect进程以获取新的JAR文件。

Groovy语言在类路径上需要以下库:

  • groovy

  • groovy-json(可选)

  • groovy-jsr223

JavaScript语言在类路径上需要以下库:

  • graalvm.js

  • graalvm.js.scriptengine

举例:基本配置

要根据事件内容配置Debezi开云体育官方注册网址um连接器来路由更改事件记录,可以配置ContentBasedRouterSMT在Kafka Connect配置为连接器。

配置基于内容的路由SMT需要指定一个定义过滤条件的正则表达式。在配置中,您将创建一个定义路由条件的正则表达式。表达式定义了计算事件记录的模式。它还指定目标主题的名称,其中路由与模式匹配的事件。您指定的模式可以指定事件类型,例如表插入、更新或删除操作。您还可以定义与特定列或行中的值匹配的模式。

例如,要重新路由所有更新(u)记录至更新主题,您可以将以下配置添加到连接器配置:

...transforms=route transforms.route.type=io.开云体育官方注册网址 debezu .transforms. contentbasedrouter transforms.route.language=jsr223。groovy transforms.route.topic.expression =价值。Op == 'u' ?'updates': null…

属性的使用Groovy表达式语言。

不匹配模式的记录被路由到默认主题。

定制配置

前面的示例显示了一个简单的SMT配置,该配置被设计为仅处理DML事件,其中包含人事处字段。连接器可能发出的其他类型的消息(心跳消息、墓碑消息或关于事务或模式更改的元数据消息)不包含此字段。为了避免处理失败,您可以定义选择性地应用转换的SMT谓词语句仅用于特定事件。

用于基于内容的路由表达式的变量

开云体育官方注册网址Debezium将某些变量绑定到SMT的计算上下文中。当您创建表达式以指定控制路由目的地的条件时,SMT可以查找和解释这些变量的值,以计算表达式中的条件。

下表列出了Debezium绑定到基于内容的路由SMT的计算上下文中的变量:开云体育官方注册网址

表1。基于内容的路由表达式变量
的名字 描述 类型

关键

信息的关键。

org.apache.kafka.connect.data.Struct

价值

消息的值。

org.apache.kafka.connect.data.Struct

keySchema

消息键的模式。

org.apache.kafka.connect.data.Schema

valueSchema

消息值的模式。

org.apache.kafka.connect.data.Schema

主题

目标主题的名称。

字符串

消息头的Java映射。关键字段是头名称。的变量暴露了以下属性:

  • 价值(类型的对象

  • 模式(类型的org.apache.kafka.connect.data.Schema

java.util。Map < String, i开云体育官方注册网址o.debezium.transforms.scripting.RecordHeader >

表达式可以对其变量调用任意方法。表达式应该解析为一个布尔值,该值决定SMT如何处理消息。当表达式中的路由条件计算为时真正的,该消息将被保留。当路由条件计算为时,消息被删除。

表达式不应该导致任何副作用。也就是说,它们不应该修改传递的任何变量。

选择性地应用转换的选项

除了发生数据库更改时Debezium连接器发出的更改事件消息外,连接器还发出其他类型的消息,包括开云体育官方注册网址心跳消息和关于模式更改和事务的元数据消息。开云体育电动老虎机因为这些其他消息的结构与SMT设计用来处理的更改事件消息的结构不同,所以最好将连接器配置为选择性地应用SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一来配置连接器以选择性地应用SMT:

语言细节

表达基于内容的路由条件的方式取决于所使用的脚本语言。例如,如基本配置示例,当你使用Groovy作为表达式语言,下面的表达式重新路由所有更新(u)向更新主题,同时将其他记录路由到默认主题:

价值。人事处== 'u' ? 'updates' : null

其他语言用不同的方法来表达相同的条件。

Debe开云体育官方注册网址zium MongoDB连接器发出而且补丁字段是序列化的JSON文档,而不是结构。
要与MongoDB连接器一起使用ContentBasedRouting SMT,必须首先将JSON中的数组字段展开到单独的文档中。
可以通过应用MongoDBExtractNewDocumentStateSMT。

还可以采用在表达式中使用JSON解析器的方法,为每个数组项生成单独的输出文档。
例如,如果使用Groovy作为表达式语言,则添加groovy-json工件添加到类路径中,然后添加表达式,如(新groovy.json.JsonSlurper ()) .parseText (value.after)。last_name == 'Kretchmar'

Javascript

当使用JavaScript作为表达式语言时,可以调用结构体# get ()方法指定基于内容的路由条件,示例如下:

Value.get ('op') == 'u' ?'updates': null
Javascript与Graal.js

当您使用JavaScript和Graal.js创建基于内容的路由条件时,您使用的方法与Groovy使用的方法类似。例如:

价值。人事处== 'u' ? 'updates' : null

配置选项

财产

默认的

描述

可选正则表达式,计算事件的目标主题名称,以确定是否应用条件逻辑。中的值匹配topic.regex,转换在将事件传递给主题之前应用条件逻辑。中的值不匹配topic.regex, SMT将事件未经修改地传递给主题。

表达式所使用的语言。必须从jsr223。,例如,jsr223.groovy,或jsr223.graal.js.开云体育官方注册网址Debezium支持通过JSR 223 API(“Java™平台脚本”)只有。

为每条消息求值的表达式。必须计算为a字符串值,其中非空的结果将消息重路由到新主题,而值将消息路由到默认主题。

保持

指定转换如何处理(墓碑)消息。您可以指定以下选项之一:

保持

(默认值)传递消息。

下降

完全删除消息。

评估

将条件逻辑应用到消息。