信息过滤

默认情况下,Debezi开云体育官方注册网址um将接收到的每个数据更改事件都传递给Kafka代理。但是,在许多情况下,您可能只对生成器发出的事件的一个子集感兴趣。为了使您能够只处理与您相关的记录,Debezium提供了开云体育官方注册网址过滤器单个消息转换(SMT)。

过滤器SMT正在积极发展中。发出的消息的结构或其他细节可能会随着开发的进展而改变。

虽然可以使用Java创建自定义SMT来编码过滤逻辑,但使用自定义编码的SMT有其缺点。例如:

  • 有必要预先编译转换并将其部署到Kafka Connect中。

  • 每次更改都需要代码重新编译和重新部署,导致操作不灵活。

筛选器SMT支持集成的脚本语言JSR 223(用于Java™平台的脚本)。

开云体育官方注册网址Debezium没有提供任何JSR 223 API的实现。要在Debezium中使用表达式语言,必须下载该语言的JSR 223脚开云体育官方注册网址本引擎实现。例如,对于Groovy 3,可以从以下网站下载它的JSR 223实现https://groovy-lang.org/.GraalVM JavaScript的JSR223实现可在https://github.com/graalvm/graaljs.获得脚本引擎文件后,将它们添加到Debezium连接器插件目录中,以及该语言实现使用的任何其他JAR文件。开云体育官方注册网址

设置

出于安全原因,筛选器SMT没有包含在Debezium连接器存档中。开云体育官方注册网址相反,它是在一个单独的工件中提供的,开云体育官方注册网址debezium-scripting-2.1.2.Final.tar.gz

要使用基于内容的路由SMT和Debezium连接器插件,必须显式地将SMT工件添加到Ka开云体育官方注册网址fka Connect环境中。重要提示:在Kafka Connect实例中出现筛选器SMT之后,任何被允许向实例添加连接器的用户都可以运行脚本表达式。为了确保脚本表达式只能由授权用户运行,在添加筛选器SMT之前,请确保Kafka Connect实例及其配置接口的安全。

动物园管理员卡夫卡卡夫卡连接,并且安装了一个或多个Debeziu开云体育官方注册网址m连接器,安装过滤器SMT的剩余任务是:

  1. 下载脚本SMT存档

  2. 将存档的内容提取到Kafka Connect环境的Debezium插件目录中。开云体育官方注册网址

  3. 获取一个JSR-223脚本引擎实现,并将其内容添加到Kafka Connect环境的Debezium插件目录中。开云体育官方注册网址

  4. 重新启动Kafka Connect进程以获取新的JAR文件。

Groovy语言在类路径上需要以下库:

  • groovy

  • groovy-json(可选)

  • groovy-jsr223

JavaScript语言在类路径上需要以下库:

  • graalvm.js

  • graalvm.js.scriptengine

举例:基本配置

在Debezium连接器的Kafka Connect配置中配置过滤器转换。开云体育官方注册网址在配置中,您可以通过定义基于业务规则的筛选条件来指定感兴趣的事件。当筛选器SMT处理事件流时,它根据配置的筛选条件评估每个事件。只有满足筛选条件条件的事件才被传递给代理。

要配置Debezium连接器以开云体育官方注册网址过滤更改事件记录,请配置过滤器SMT在Kafka Connect配置的Debezium连接器。开云体育官方注册网址筛选器SMT的配置要求您指定一个定义筛选条件的正则表达式。

例如,您可以在连接器配置中添加以下配置。

...transforms=filter transforms.filter.type=io.开云体育官方注册网址 debezu .transforms. filter.language=jsr223。groovy transforms.filter.condition =价值。Op == 'u' && value.before.id == 2…

属性的使用Groovy表达式语言。正则表达式价值。人事处== 'u' && value.before.id == 2删除所有消息,除了表示更新(u)记录与id等于的值2

定制配置

前面的示例显示了一个简单的SMT配置,该配置被设计为仅处理DML事件,其中包含人事处字段。连接器可能发出的其他类型的消息(心跳消息、墓碑消息或关于模式更改和事务的元数据消息)不包含此字段。为了避免处理失败,您可以定义选择性地应用转换的SMT谓词语句仅用于特定事件。

过滤器表达式中使用的变量

开云体育官方注册网址Debezium将某些变量绑定到过滤器SMT的计算上下文中。当您创建表达式来指定筛选条件时,您可以使用Debezium绑定到计算上下文中的变量。开云体育官方注册网址通过绑定变量,Debezium使SMT能够在开云体育官方注册网址计算表达式中的条件时查找和解释变量的值。

下表列出了Debezium绑定到过滤器SMT的计算上下文中的变量:开云体育官方注册网址

表1。筛选表达式变量
的名字 描述 类型

关键

信息的关键。

org.apache.kafka.connect.data.Struct

价值

消息的值。

org.apache.kafka.connect.data.Struct

keySchema

消息键的模式。

org.apache.kafka.connect.data.Schema

valueSchema

消息值的模式。

org.apache.kafka.connect.data.Schema

主题

目标主题的名称。

字符串

消息头的Java映射。关键字段是头名称。的变量暴露了以下属性:

  • 价值(类型的对象

  • 模式(类型的org.apache.kafka.connect.data.Schema

java.util。Map < String, i开云体育官方注册网址o.debezium.transforms.scripting.RecordHeader >

表达式可以对其变量调用任意方法。表达式应该解析为一个布尔值,该值决定SMT如何处理消息。当表达式中的筛选条件计算为时真正的,该消息将被保留。当筛选条件计算为时,消息被删除。

表达式不应该导致任何副作用。也就是说,它们不应该修改传递的任何变量。

选择性地应用转换的选项

除了发生数据库更改时Debezium连接器发出的更改事件消息外,连接器还发出其他类型的消息,包括开云体育官方注册网址心跳消息和关于模式更改和事务的元数据消息。开云体育电动老虎机因为这些其他消息的结构与SMT设计用来处理的更改事件消息的结构不同,所以最好将连接器配置为选择性地应用SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一来配置连接器以选择性地应用SMT:

语言细节

表达过滤条件的方式取决于所使用的脚本语言。

例如,如基本配置示例,当你使用Groovy作为表达式语言,下面的表达式删除所有消息,除了已删除的更新记录id设置为2

价值。人事处== 'u' && value.before.id == 2

其他语言用不同的方法来表达相同的条件。

Debe开云体育官方注册网址zium MongoDB连接器发出而且补丁字段是序列化的JSON文档,而不是结构。
要将筛选器SMT与MongoDB连接器一起使用,必须首先将JSON中的数组字段展开到单独的文档中。
可以通过应用MongoDBExtractNewDocumentStateSMT。

还可以采用在表达式中使用JSON解析器的方法,为每个数组项生成单独的输出文档。
例如,如果使用Groovy作为表达式语言,则添加groovy-json工件添加到类路径中,然后添加表达式,如(新groovy.json.JsonSlurper ()) .parseText (value.after)。last_name == 'Kretchmar'

Javascript

如果使用JavaScript作为表达式语言,则可以调用结构体# get ()方法指定过滤条件,示例如下:

value.get (op) = = ' u ' & & value.get(之前). get (id) = = 2
Javascript与Graal.js

如果您使用JavaScript和Graal.js来定义过滤条件,那么您使用的方法与Groovy使用的方法类似。例如:

价值。人事处== 'u' && value.before.id == 2

配置选项

下表列出了可以与筛选器SMT一起使用的配置选项。

表2。filter SMT配置选项

财产

默认的

描述

可选正则表达式,用于计算事件的目标主题名称,以确定是否应用筛选逻辑。中的值匹配topic.regex,转换在将事件传递给主题之前应用筛选逻辑。中的值不匹配topic.regex, SMT将事件未经修改地传递给主题。

表达式所使用的语言。必须从jsr223。,例如,jsr223.groovy,或jsr223.graal.js.开云体育官方注册网址Debezium支持通过JSR 223 API(“Java™平台脚本”)只有。

为每条消息求值的表达式。必须计算为布尔值,其中的结果是真正的保持消息,并有结果删除它。

保持

指定转换如何处理(墓碑)消息。您可以指定以下选项之一:

保持

(默认值)传递消息。

下降

完全删除消息。

评估

对消息应用筛选条件。