选择性地应用转换

为连接器配置单个消息转换(SMT)时,可以为转换定义谓词。谓词指定如何有条件地将转换应用于连接器处理的消息子集。您可以将谓词分配给为源连接器(如Debezium)或接收器连接器配置的转换。开云体育官方注册网址

SMT谓词

开云体育官方注册网址Debezium提供了一些单消息转换(smt),您可以使用它们在Kafka Connect将记录保存到Kafka主题之前修改事件记录。默认情况下,当您为Debezium连接器配置其中一个smt时,Kafka Connect将该转换应用于连接器发开云体育官方注册网址出的每条记录。然而,在某些情况下,您可能希望选择性地应用转换,以便它只修改具有共同特征的变更事件消息的子集。

例如,对于Debezium连接器,您可开云体育官方注册网址能希望仅对来自特定表或包含特定头键的事件消息运行转换。在运行Apache Kafka 2.6或更高版本的环境中,您可以在转换中附加一个谓词语句,以指示Kafka Connect仅对某些记录应用SMT。在谓词中,指定Kafka Connect用来计算它处理的每条消息的条件。当Debezi开云体育官方注册网址um连接器发出更改事件消息时,Kafka Connect根据配置的谓词条件检查消息。如果事件消息的条件为真,Kafka Connect应用转换,然后将消息写入Kafka主题。不符合条件的消息将不加修改地发送到Kafka。

对于为接收器连接器SMT定义的谓词,情况与此类似。连接器从Kafka主题读取消息,Kafka Connect根据谓词条件计算消息。如果消息符合条件,Kafka Connect应用转换,然后将消息传递给接收器连接器。

定义谓词后,可以重用它并将其应用于多个转换。谓词还包括a否定选项,可用于反转谓词,以便谓词条件仅应用于执行反转操作的记录匹配谓词语句中定义的条件。您可以使用否定选项,将谓词与基于条件求反的其他转换配对。

谓词的元素

谓词包括以下元素:

  • 谓词前缀

  • 别名(例如,isOutboxTable

  • 类型(例如,org.apache.kafka.connect.transforms.predicates.TopicNameMatches).Kafka Connect提供了一组默认的谓词类型,您可以通过定义自己的自定义谓词来补充它。

  • 条件语句和任何附加的配置属性,取决于谓词的类型(例如,正则表达式命名模式)

默认谓词类型

默认情况下,以下谓词类型是可用的:

HasHeaderKey

在事件消息的头中指定一个你想要Kafka Connect计算的键名。对于包含具有指定名称的头键的任何记录,谓词的计算结果为true。

RecordIsTombstone

匹配卡夫卡墓碑上记录。谓词求值为真正的任何有价值。将此谓词与筛选器SMT结合使用可删除墓碑记录。该谓词没有配置参数。

在Kafka中,墓碑是一个0字节键的记录,有效载荷。当Debezi开云体育官方注册网址um连接器处理源数据库中的删除操作时,连接器为删除操作发出两个更改事件:开云体育电动老虎机

  • 删除操作("op": "d")事件,提供数据库记录的前一个值。开云体育电动老虎机

  • 具有相同键的墓碑事件,但是价值。

    墓碑表示该行的删除标记。当日志压实在压缩过程中,Kafka会删除所有与墓碑共享相同键的事件。日志压缩定期进行,压缩间隔由delete.retention.ms设置主题。

    尽管这是可能的配置Debezium开云体育官方注册网址,使它不发出墓碑事件,最好允许Debezium发出墓碑,以在日志开云体育官方注册网址压缩期间保持预期的行为。压制墓碑可以防止Kafka在日志压缩过程中删除已删除键的记录。如果您的环境包含不能处理墓碑的接收器连接器,则可以将接收器连接器配置为使用SMTRecordIsTombstone谓词过滤出墓碑记录。

TopicNameMatches

一个正则表达式,指定你想要Kafka Connect匹配的主题的名称。对于主题名称与指定正则表达式匹配的连接器记录,谓词为true。使用此谓词可根据源表的名称对记录应用SMT。

定义SMT谓词

配置Kafka Connect谓词类似于配置转换。您指定谓词别名,将别名与转换关联,然后定义谓词的类型和配置。

先决条件
  • Debe开云体育官方注册网址zium环境运行Apache Kafka 2.6或更高版本。

  • Debezium连接器配置了SMT。开云体育官方注册网址

过程
  1. 在Debezi开云体育官方注册网址um连接器配置中,为谓词参数,例如,IsOutboxTable

  2. 通过将谓词别名附加到连接器配置中的转换别名,将谓词别名与您想有条件地应用的转换关联起来:

    变换。< TRANSFORM_ALIAS >.predicate =< PREDICATE_ALIAS >

    例如:

    transforms.outbox.predicate = IsOutboxTable
  3. 通过指定谓词的类型并为配置参数提供值来配置谓词。

    1. 对于类型,指定Kafka Connect中可用的以下默认类型之一:

      • HasHeaderKey

      • RecordIsTombstone

      • TopicNameMatches

        例如:

        predicates.IsOutboxTable.type = org.apache.kafka.connect.transforms.predicates.TopicNameMatches
    2. TopicNameMatch或HasHeaderKey谓词,为要匹配的主题或标题名称指定正则表达式。

      例如:

      predicates.IsOutboxTable.pattern = outbox.event。*
  4. 如果要对条件求反,请附加否定关键字设置为转换别名,并将其设置为真正的

    例如:

    transforms.outbox.negate = true

    上面的属性颠倒谓词匹配的记录集,这样Kafka Connect就可以对任何不匹配谓词中指定条件的记录应用转换。

示例:发件箱事件路由器转换的TopicNameMatch谓词

下面的例子展示了一个Debezium连接器配置,它只对Deb开云体育官方注册网址ezium发出到Kafka的消息应用发件箱事件路由器转换outbox.event.order的话题。

因为TopicNameMatch谓词求值为真正的仅针对来自发件箱表(outbox.event。*),则转换不会应用于来自数据库中其他表的消息。开云体育电动老虎机

转换transforms.outbox =发件箱。predicate=IsOutboxTable transforms.outbox.type=io.开云体育官方注册网址 debezum .transforms.outbox. eventrouter predicates=IsOutboxTable predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates。TopicNameMatches predicates.IsOutboxTable.pattern = outbox.event。*

忽略墓碑事件

您可以控制Debezium是否发出墓碑事件,以及开云体育官方注册网址Kafka保留它们多长时间。根据您的数据管道,您可能希望设置tombstones.on.delete属性,这样Debezium就不会发出墓碑事件。开云体育官方注册网址

是否启用Debezium发射墓碑取决于开云体育官方注册网址环境中如何消费主题以及接收器消费者的特征。一些接收器连接器依赖墓碑事件从下游数据存储中删除记录。在接收器连接器依赖墓碑记录来指示何时删除下游数据存储中的记录的情况下,可以配置Debezium来发出它们。开云体育官方注册网址

在配置Debezium以生成墓碑时,需开云体育官方注册网址要进一步配置以确保接收器连接器接收墓碑事件。必须设置主题的保留策略,以便在Kafka在日志压缩期间删除事件消息之前,连接器有时间读取事件消息。主题在压缩之前保留墓碑的时间长度由delete.retention.ms属性。

默认情况下,tombstones.on.delete属性设置为真正的这样连接器就可以在每次删除事件之后生成一个墓碑。如果将属性设置为为了防止Debeziu开云体育官方注册网址m将墓碑记录保存到Kafka主题,墓碑记录的缺失可能会导致意想不到的后果。Kafka在日志压缩过程中依赖墓碑来删除与已删除键相关的记录。

如果您需要支持接收器连接器或下游Kafka消费者,它们不能处理带有空值的记录,而不是阻止Debezium发出墓碑,那么可以考虑为连接器配置一个SMT,该连接器的谓词使用开云体育官方注册网址RecordIsTombstone谓词类型,以便在使用者读取墓碑消息之前删除它们。

过程
  • 要防止Debezium开云体育官方注册网址对已删除的数据库记录发出墓碑事件,请设置连接器选项开云体育电动老虎机tombstones.on.delete

    例如:

    “tombstones.on.delete”:“假”