新记录状态提取
这种单一消息转换(SMT)仅支持SQL数据库连接器。开云体育电动老虎机有关MongoDB连接器,请参见MongoDB的文档相当于这个SMT. |
De开云体育官方注册网址bezium数据更改事件具有复杂的结构,可提供丰富的信息。传递Debezium变更事件的Kafka记录包含了所开云体育官方注册网址有这些信息。然而,Kafka生态系统的一部分可能希望Kafka记录提供字段名和值的扁平结构。为了提供这种记录,Debezium提供了事件扁平化单消息转换(S开云体育官方注册网址MT)。当消费者需要格式比包含Debezium更改事件的Kafka记录更简单的Kafka记录时,请配置此转换。开云体育官方注册网址
事件扁平化变换是Kafka连接SMT.
改变事件结构
开云体育官方注册网址Debezium生成的数据更改事件具有复杂的结构。每个活动由三个部分组成:
元数据,包括但不限于:
进行更改的操作
源信息,例如进行更改的数据库和表的名称开云体育电动老虎机
进行更改的时间戳
可选事务信息
更改前的行数据
修改后的行数据
例如,结构的一部分更新
更改事件看起来像这样:
{"op": "u", "source":{…}, " ts_ms " : "...", " 前”:{“field1”:“oldvalue1”、“field2”:“oldvalue2”},“后”:{“field1”:“newvalue1”、“field2”:“newvalue2}}
中提供了有关更改事件结构的更多详细信息每个连接器的文档.
这种复杂的格式提供了关于系统中发生的更改的大部分信息。然而,其他连接器或Kafka生态系统的其他部分通常期望数据是这样的简单格式:
{"field1": "newvalue1", "field2": "newvalue2"}
要为消费者提供所需的Kafka记录格式,请配置事件扁平化SMT。
行为
事件扁平化SMT提取后
字段来自Kafka记录中的开云体育官方注册网址Debezium更改事件。SMT仅用它的更改事件替换原始的更改事件后
字段创建一个简单的Kafka记录。
您可以为Debezium连接器或消耗Debezium连接器发出的消息的接收器连接器配置事件扁平化S开云体育官方注册网址MT。为接收器连接器配置事件扁平化的好处是存储在Apache Kafka中的记录包含了整个Debezium更改事件。开云体育官方注册网址将SMT应用于源连接器或接收器连接器的决定取决于您的特定用例。
您可以配置转换以执行以下任何操作:
将变更事件的元数据添加到简化的Kafka记录中。默认行为是SMT不添加元数据。
保存包含更改事件的Kafka记录
删除
流中的操作。默认的行为是SMT丢弃Kafka记录删除
操作更改事件,因为大多数使用者还不能处理它们。
一个开云体育电动老虎机数据库删除
操作导致Debezium生成两条K开云体育官方注册网址afka记录:
包含以下内容的记录
“人事处”:“d”,
的之前
行数据和一些其他字段。与已删除行具有相同键且值为的墓碑记录
零
.这个记录是Apache Kafka的一个标记。它表明日志压实可以删除具有此键的所有记录。
而不是删除包含之前
行数据,您可以配置事件扁平化SMT执行以下操作之一:
将记录保存在流中,并将其编辑为仅具有
“价值”:“零”
字段。将记录保存在流中,并对其进行编辑以使其具有
价值
属性中的键/值对之前
字段中添加了“__deleted”:“真正的”
条目。
类似地,您可以配置事件扁平化SMT以在流中保留墓碑记录,而不是删除墓碑记录。
配置
通过将SMT配置细节添加到连开云体育官方注册网址接器的配置中,在Kafka Connect源或接收器连接器中配置Debezium事件扁平化SMT。若要获取默认行为,请在. properties
文件,您将指定如下内容:
变换=打开……transforms.unwrap.type = i开云体育官方注册网址o.debezium.transforms.ExtractNewRecordState
与任何Kafka Connect连接器配置一样,您可以设置变换=
到多个逗号分隔的SMT别名,按照您想要Kafka Connect应用SMT的顺序。
以下. properties
示例设置几个事件扁平化SMT选项:
变换=打开……transforms.unwrap.type = i开云体育官方注册网址o.debezium.transforms.ExtractNewRecordState transforms.unwrap.drop。来mbstones=false transforms.unwrap.delete.handling.mode=rewrite transforms.unwrap.add.fields=table,lsn
-
drop.tombstones = false
-
保存墓碑上的记录
删除
事件流中的操作。 -
delete.handling.mode =重写
-
为
删除
操作,编辑卡夫卡记录通过扁平化价值
在变化事件中的字段。的价值
字段中直接包含的键/值对之前
字段。SMT补充道__deleted
并将其设置为真正的
,例如:“价值”:{“pk”:2,“可乐”:空,“__deleted”:“true”}
-
add.fields =表,lsn
-
属性的更改事件元数据
表格
而且lsn
字段转换为简化的Kafka记录。
连接器可能发出多种类型的事件消息(心跳消息、墓碑消息或关于事务或模式更改的元数据消息)。若要将转换应用于事件的子集,可以定义选择性地应用转换的SMT谓词语句仅用于特定事件。
添加元数据
事件扁平化SMT可以向简化的Kafka记录中添加原始的、更改的事件元数据。例如,您可能希望简化记录的头或值包含以下任何内容:
进行更改的操作类型
已更改的数据库或表的名称开云体育电动老虎机
特定于连接器的字段,例如Postgres LSN字段
有关可用的更多信息,请参阅每个连接器的文档.
要向简化的Kafka记录头添加元数据,请指定add.headers
选择。要向简化的Kafka记录的值添加元数据,请指定add.fields
选择。每个选项都接受一个以逗号分隔的变更事件字段名列表。不要指定空格。当存在重复的字段名时,要为其中一个字段添加元数据,请指定结构体和字段。例如:
变换=打开……transforms.unwrap.type = i开云体育官方注册网址o.debezium.transforms.ExtractNewRecordState transforms.unwrap.add.fields = op,表,lsn、源。ts_ms transforms.unwrap.add。头= db transforms.unwrap.delete.handling.mode =重写
有了这样的配置,一个简化的Kafka记录将包含如下内容:
{…“__op”:“c”,“__table”:“MY_TABLE”、“__lsn”:“123456789”,“__source_ts_ms”:“123456789”,……}
此外,简化的Kafka记录将有一个__db
头。
在简化的Kafka记录中,SMT用双下划线作为元数据字段名的前缀。指定结构时,SMT还在结构名和字段名之间插入下划线。
将元数据添加到简化的Kafka记录中删除
操作时,还必须配置delete.handling.mode =重写
.
选择性地应用事件扁平化转换的选项
除了发生数据库更改时Debezium连接器发出的更改事件消息外,连接器还发出其他类型的消息,包括开云体育官方注册网址心跳消息和关于模式更改和事务的元数据消息。开云体育电动老虎机因为这些其他消息的结构与SMT设计用来处理的更改事件消息的结构不同,所以最好将连接器配置为选择性地应用SMT,以便它只处理预期的数据更改消息。
有关如何有选择地应用SMT的更多信息,请参见为转换配置SMT谓词.
配置选项
下表描述了可以指定用于配置事件扁平化SMT的选项。
选项 | 默认的 | 描述 |
---|---|---|
|
开云体育官方注册网址Debezium为每一个生成一个墓碑记录 |
|
|
开云体育官方注册网址Debezium为每个事件生成一个变更事件记录 |
|
若要使用行数据确定要将记录路由到的主题,请将此选项设置为 |
||
__(双下划线) |
将此可选字符串设置为字段的前缀。 |
|
将该选项设置为一个以逗号分隔,不带空格的元数据字段列表,添加到简化的Kafka记录的值中。例如,当存在重复的字段名时,要为其中一个字段添加元数据,请指定结构体和字段 |
||
__(双下划线) |
将此可选字符串设置为标题的前缀。 |
|
将该选项设置为一个逗号分隔的元数据字段列表,不带空格,添加到简化Kafka记录的头部。例如,当存在重复的字段名时,要为其中一个字段添加元数据,请指定结构体和字段 |