发件箱事件路由器
例子
为了理解这个SMT中使用的配置和术语,让我们研究一下它与给定的预期发件箱消息相关的部分:
# Kafka消息标题:“id=4d47e190-0402-4048-bc2c-89dd54343cdc”# Kafka消息时间戳:1556890294484 {"eventType": "OrderCreated", "payload": "{\"id\": 1, "lineItems\": [{\"id\": 1, "item\": "Debezium in Action\", "status\": "ENTERED\", "quantity\": 2, "totalPrice\": 39.98}, {\"id\"开云体育官方注册网址: 2, "item\": "Debezium for Dummies\", "status\": "ENTERED\", "quantity\": 1, "totalPrice\": 29.99}], \"orderDate\":\"2019-01-31T12:13:01\", \"customerId\": 123}"}
这个消息是通过转换Debezium原始消息生成的,它看起来像:开云体育官方注册网址
# Kafka消息键:“406c07f3-26f0-4eea-a50c-109940064b8f”# Kafka消息头:“”# Kafka消息时间戳:1556890294484{“id”:“406c07f3-26f0-4eea-a50c- 1099400b8f”,“聚合id”:“1”,“聚合类型”:“订单”,“有效负载”:“{\“id\”:1,\“lineItems\”:[{\“id\”:1,\“item\”:\“Debezium in Action\”,\“status\”:\“ENTERED\”,\“quantity\”:2,\“totalPrice\”:39.98},{\“id\”:2,\“item\”:\“Deb开云体育官方注册网址ezium for Dummies\”,\“status\”:\“ENTERED\”,\“quantity\”:\“quantity\”:\“status\”:\“enter \”,\“status\”:\“ENTERED\”,\“quantity\”:\“quantity\”:1, \“totalPrice \”:29.99}],\“orderDate \”:\“2019 - 01 - 31 - t12:13:01 \”,\“customerId \”:123}”、“时间戳”:1556890294344,“类型”:“OrderCreated”},“源”:{“版本”:“0.9.3。Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null}, "op": "c", "ts_ms": 1556890294484}
配置
配置选项
财产 |
默认的 |
集团 |
描述 |
|
表格 |
在发件箱表中包含事件ID的列 |
|
|
表格 |
在发件箱表中包含事件键的列;设置后,该列的值将被用作Kafka消息键 |
|
|
表格 |
在发件箱表中包含事件类型的列 |
|
表格 |
您可以选择使用来自选定字段的值覆盖Kafka消息时间戳,否则它就是Debezium事件处理的时间戳。开云体育官方注册网址 |
||
|
表格 |
在发件箱表中包含事件有效负载的列 |
|
|
表格 |
在发件箱表中包含有效负载ID的列 |
|
表,信封 |
额外的字段可以作为事件信封或消息头的一部分添加;格式是一个以冒号分隔的对或三元的列表,当你想要别名时,例如。 |
||
表、模式 |
类型中的模式版本Kafka连接模式javadoc |
||
|
路由器 |
决定如何路由事件的列,该值将成为主题名称的一部分 |
|
|
路由器 |
在RegexRouter中使用的默认正则表达式,默认捕获将允许将路由字段替换为中定义的新主题名称 |
|
|
路由器 |
将在其中路由事件的主题的名称,替换 |
|
|
路由器 |
无论有无空或 |
|
|
开云体育官方注册网址 |
当Debez开云体育官方注册网址ium监视表时,它并不期望看到'update'行事件,一旦发生,这个转换可以记录为警告、错误或停止进程。选项是 |
默认表列
列| Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null Type | character varying(255) | not null payload | jsonb |
解释默认配置值
在观察了所有这些部分之后,我们可以看到默认配置是怎么做的:
表列 |
效果 |
|
的 |
|
是路由的默认字段,它被附加到主题名称(检查配置 |
|
成为Kafka消息键,这对于保持Kafka分区内的排序很重要 |
|
的 |
|
事件本身的JSON表示形式成为消息的任何一部分 |
使用Avro作为有效载荷格式
发件箱路由SMT支持任意有效负载格式,因为有效负载列值是透明传递的。如上所示,作为使用JSON的替代方法,也可以使用Avro。这对于消息格式治理和确保发件箱事件模式以向后兼容的方式演进是有益的。
源应用程序如何将Avro消息作为发件箱事件有效负载产生超出了本文档的范围。一种可能是利用KafkaAvroSerializer
类,并使用它来序列化GenericRecord
实例。为了确保Kafka消息值是精确的Avro二进制数据,对连接器应用以下配置:
变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional。位置=类型:标题:eventType value.converter = io.debezium.co开云体育官方注册网址nverters.ByteBufferConverter
这将移动eventType
值放入Kafka消息头,只留下有效载荷
列值(Avro数据)作为唯一的消息值。使用ByteBufferConverter
因为值转换器会将该值原样传播到Kafka消息值中。