发件箱事件路由器
发件箱模式是在多个(微)服务之间安全可靠地交换数据的一种方式。发件箱模式实现避免了服务的内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件中的状态之间的不一致。开云体育电动老虎机
要在Debezium应用程序中实现发件箱模式,请配置Debezium连接器开云体育官方注册网址:
捕获发件箱表中的更改
应用Debezium开云体育官方注册网址发件箱事件路由器单消息转换(SMT)
配置开云体育官方注册网址为应用发件箱SMT的Debezium连接器应该只捕获发件箱表中发生的更改。有关更多信息,请参见选择性地应用转换的选项.
只有当每个发件箱表具有相同的结构时,连接器才能捕获多个发件箱表中的更改。
看到使用发件箱模式进行可靠的微服务数据交换了解发件箱模式为什么有用以及它是如何工作的。
有关可以运行的示例,请参见发件箱模式演示,它在Debezium示例库中。开云体育官方注册网址它包括一个如何配置Debezium连接器以运行发件箱事件路由器SMT的示例。开云体育官方注册网址
发件箱事件路由器SMT与MongoDB连接器不兼容。 MongoDB用户可以运行MongoDB发件箱事件路由器SMT. |
发件箱消息示例
要了解Debezium发件箱事件路由器SMT开云体育官方注册网址是如何配置的,请查看以下Debezium发件箱消息示例:
# Kafka消息标题:“id=4d47e190-0402-4048-bc2c-89dd54343cdc”# Kafka消息时间戳:1556890294484{“{\“id\”:1,\“item\”:\“Debezium in Action\”,\“status\”:\“ENTERED\”,\“quantity\”:2,\“totalPrice\”:39.98},{\“id\”:2,\“item\”:\“Debezium for Dummi开云体育官方注册网址es\”,\“status\”:\“ENTERED\”,\“quantity\”:1,\“totalPrice\”:29.99}],\“orderDate\”:\“2019-01-31T12:13:01\”,\“customerId\”:123}”}
De开云体育官方注册网址bezium连接器被配置为应用发件箱事件路由器SMT,通过转换Debezium原始消息生成上面的消息,就像这样:
# Kafka消息键:“406c07f3-26f0-4eea-a50c-109940064b8f”# Kafka消息头:“”# Kafka消息时间戳:1556890294484{“id”:“406c07f3-26f0-4eea-a50c- 1099400b8f”,“聚合id”:“1”,“聚合类型”:“订单”,“有效负载”:“{\“id\”:1,\“lineItems\”:[{\“id\”:1,\“item\”:\“Debezium in Action\”,\“status\”:\“ENTERED\”,\“quantity\”:2,\“totalPrice\”:39.98},{\“id\”:2,\“item\”:\“Deb开云体育官方注册网址ezium for Dummies\”,\“status\”:\“ENTERED\”,\“quantity\”:\“quantity\”:\“status\”:\“enter \”,\“status\”:\“ENTERED\”,\“quantity\”:\“quantity\”:1, \“totalPrice \”:29.99}],\“orderDate \”:\“2019 - 01 - 31 - t12:13:01 \”,\“customerId \”:123}”、“时间戳”:1556890294344,“类型”:“OrderCreated”},“源”:{“版本”:“2.1.2。Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null}, "op": "c", "ts_ms": 1556890294484}
Debezium发件箱消息的示例是基开云体育官方注册网址于默认发件箱事件路由器配置,它假设一个发件箱表结构和基于聚合的事件路由。为了自定义行为,发件箱事件路由器SMT提供了许多配置选项.
基本发件箱表
要应用默认的发件箱事件路由器SMT配置,假设您的发件箱表具有以下列:
列| Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null Type | character varying(255) | not null payload | jsonb |
列 | 效果 |
---|---|
|
包含事件的唯一ID。在发件箱消息中,此值是头。例如,您可以使用此ID删除重复的消息。 |
包含SMT附加到连接器向其发出发件箱消息的主题名称的值。默认行为是用这个值替换默认值 |
|
|
包含事件键,该键提供有效负载的ID。SMT将此值用作发出的发件箱消息中的键。这对于维护Kafka分区的正确顺序很重要。 |
|
发件箱更改事件的表示形式。默认结构是JSON。默认情况下,Kafka消息值仅由 |
其他自定义列 |
可以删除发件箱表中的任何其他列添加到发件箱事件可以在有效负载部分中,也可以作为消息头。 |
基本配置
要配置Debezium连接器以开云体育官方注册网址支持发件箱模式,请配置发件箱。EventRouter
SMT。例如,a的基本配置. properties
文件是这样的:
变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter
连接器可能发出多种类型的事件消息(例如,心跳消息、墓碑消息或关于事务或模式更改的元数据消息)。若要仅将转换应用于发件箱表中产生的事件,请定义选择性地应用转换的SMT谓词语句只对那些事件。
选择性地应用转换的选项
除了发生数据库更改时Debezium连接器发出的更改事件消息外,连接器还发出其他类型的消息,包括开云体育官方注册网址心跳消息和关于模式更改和事务的元数据消息。开云体育电动老虎机因为这些其他消息的结构与SMT设计用来处理的更改事件消息的结构不同,所以最好将连接器配置为选择性地应用SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一来配置连接器以选择性地应用SMT:
使用
route.topic.regex
SMT配置选项。
使用Avro作为有效载荷格式
发件箱事件路由器SMT支持任意有效负载格式。的有效载荷
发件箱表中的列值被透明地传递。使用JSON的另一种选择是使用Avro。这对于消息格式治理和确保发件箱事件模式以向后兼容的方式演进是有益的。
源应用程序如何为发件箱消息有效负载生成Avro格式的内容超出了本文档的范围。一种可能是利用KafkaAvroSerializer
要序列化的类GenericRecord
实例。为了确保Kafka消息值是精确的Avro二进制数据,对连接器应用以下配置:
变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter value.converter = io.debezium.converters.BinaryDataConverter
默认情况下,有效载荷
列值(Avro数据)是唯一的消息值。的配置BinaryDataConverter
当值转换器传播有效载荷
列值按原样转换为Kafka消息值。
Debe开云体育官方注册网址zium连接器可以配置为发出心跳、事务元数据或模式更改事件(支持程度因连接器而异)。属性不能序列化这些事件BinaryDataConverter
因此,必须提供额外的配置,以便转换器知道如何序列化这些事件。作为一个例子,下面的配置说明了使用Apache KafkaJsonConverter
没有模式:
变换=发件箱,…transforms.outbox.type=io.开云体育官方注册网址debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schema .enable=false
委托转换器
方法指定实现delegate.converter.type
选择。如果转换器需要任何额外的配置选项,也可以指定它们,例如使用禁用上面所示的模式schemas.enable = false
.
转换器 |
发出带有附加字段的消息
发件箱表可能包含要将其值添加到发出的发件箱消息中的列。例如,考虑一个值为的发件箱表定购单
在aggregatetype
一列和另一列,eventType
,其可能值为顺序
而且order-shipped
.可以使用该语法添加其他字段列:位置:别名
.
的允许值放置
是:-头
-信封
-分区
发射eventType
列的值,配置SMT像这样:
变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = eventType:标题:类型
结果将是Kafka消息中的一个报头类型
作为它的键,而值eventType
列作为其值。
发射eventType
列值在发件箱消息信封,配置SMT如下:
变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = eventType:信封:类型
要控制在哪个分区上产生发件箱消息,可以这样配置SMT:
变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = partitionColumn:分区
注意,对于分区
放置时,添加别名将没有任何效果。
将转义的JSON字符串扩展为JSON
您可能已经注意到Debezium发件箱消息包含开云体育官方注册网址有效载荷
表示为字符串。所以当这个字符串实际上是JSON时,它在结果Kafka消息中显示为转义,如下所示:
# Kafka消息标题:“id=4d47e190-0402-4048-bc2c-89dd54343cdc”# Kafka消息时间戳:1556890294484{“{\“id\”:1,\“item\”:\“Debezium in Action\”,\“status\”:\“ENTERED\”,\“quantity\”:2,\“totalPrice\”:39.98},{\“id\”:2,\“item\”:\“Debezium for Dummi开云体育官方注册网址es\”,\“status\”:\“ENTERED\”,\“quantity\”:1,\“totalPrice\”:29.99}],\“orderDate\”:\“2019-01-31T12:13:01\”,\“customerId\”:123}”}
发件箱事件路由器允许您将此消息内容扩展为“真正的”JSON,并使用从JSON文档本身推导出的配套模式。这样一来,Kafka消息中的结果看起来就像:
# Kafka主题:outbox.event.order # Kafka消息键:"1" # Kafka消息头:"id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka消息时间戳:1556890294484 {"id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "q开云体育官方注册网址uantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123}
要启用此转换,必须设置table.expand.json.payload
以真而用之JsonConverter
像下图:
变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.expand.json。有效载荷= true value.converter = org.apache.kafka.connect.json.JsonConverter
配置选项
下表描述了可以为发件箱事件路由器SMT指定的选项。在表中,集团列表示Kafka的配置选项分类。
选项 | 默认的 | 集团 | 描述 |
---|---|---|---|
|
表格 |
故障时确定SMT的行为
预计发件箱表中的所有更改都将被更改 |
|
|
表格 |
指定包含唯一事件ID的发件箱表列。属性下的事件头部中存储此ID |
|
|
表格 |
指定包含事件键的发件箱表列。当此列包含值时,SMT将该值用作发出的发件箱消息中的键。这对于维护Kafka分区的正确顺序很重要。 |
|
表格 |
默认情况下,发出的发件箱消息中的时间戳是Debezium事件时间戳。开云体育官方注册网址若要在发件箱邮件中使用不同的时间戳,请将此选项设置为一个发件箱表列,其中包含希望在发出的发件箱邮件中使用的时间戳。 |
||
|
表格 |
指定包含事件有效负载的发件箱表列。 |
|
|
表格 |
指定是否应该对String有效负载进行JSON扩展。如果没有发现内容或在解析错误的情况下,内容将保持“原样”。 |
|
|
表格 |
当启用JSON扩展属性时
|
|
表,信封 |
指定要添加到发件箱邮件标题或信封中的一个或多个发件箱表列。指定以逗号分隔的对列表。在每一对中,指定一个列的名称,以及希望该值在标题中还是信封中。用冒号分隔对中的值,例如:
要为列指定别名,请指定一个以别名为第三个值的三元组,例如:
第二个值是位置,而且必须始终如此 |
||
表、模式 |
方法中所描述的模式版本使用此值Kafka连接模式Javadoc。 |
||
|
路由器 |
指定发件箱表中列的名称。默认行为是,此列中的值成为连接器向其发出发件箱消息的主题名称的一部分。一个例子是预期发件箱表的描述. |
|
|
路由器 |
指定发件箱SMT在RegexRouter中应用到发件箱表记录的正则表达式。属性的设置的一部分 |
|
|
路由器 |
指定连接器向其发出发件箱消息的主题的名称。默认的主题名称为
|
|
|
路由器 |
指示是空的还是 |
|
|
跟踪 |
包含跟踪跨度上下文的字段的名称。 |
|
|
跟踪 |
表示Debezium处理范围的操作名称。开云体育官方注册网址 |
|
|
跟踪 |
当 |
分布式跟踪
发件箱事件路由SMT支持分布式跟踪。看到跟踪文档欲知详情。