发件箱事件路由器

发件箱模式是在多个(微)服务之间安全可靠地交换数据的一种方式。发件箱模式实现避免了服务的内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件中的状态之间的不一致。开云体育电动老虎机

要在Debezium应用程序中实现发件箱模式,请配置Debezium连接器开云体育官方注册网址:

  • 捕获发件箱表中的更改

  • 应用Debezium开云体育官方注册网址发件箱事件路由器单消息转换(SMT)

配置开云体育官方注册网址为应用发件箱SMT的Debezium连接器应该只捕获发件箱表中发生的更改。有关更多信息,请参见选择性地应用转换的选项

只有当每个发件箱表具有相同的结构时,连接器才能捕获多个发件箱表中的更改。

看到使用发件箱模式进行可靠的微服务数据交换了解发件箱模式为什么有用以及它是如何工作的。

有关可以运行的示例,请参见发件箱模式演示,它在Debezium示例库中。开云体育官方注册网址它包括一个如何配置Debezium连接器以运行发件箱事件路由器SMT的示例。开云体育官方注册网址

发件箱事件路由器SMT与MongoDB连接器不兼容。

MongoDB用户可以运行MongoDB发件箱事件路由器SMT

发件箱消息示例

要了解Debezium发件箱事件路由器SMT开云体育官方注册网址是如何配置的,请查看以下Debezium发件箱消息示例:

# Kafka消息标题:“id=4d47e190-0402-4048-bc2c-89dd54343cdc”# Kafka消息时间戳:1556890294484{“{\“id\”:1,\“item\”:\“Debezium in Action\”,\“status\”:\“ENTERED\”,\“quantity\”:2,\“totalPrice\”:39.98},{\“id\”:2,\“item\”:\“Debezium for Dummi开云体育官方注册网址es\”,\“status\”:\“ENTERED\”,\“quantity\”:1,\“totalPrice\”:29.99}],\“orderDate\”:\“2019-01-31T12:13:01\”,\“customerId\”:123}”}

De开云体育官方注册网址bezium连接器被配置为应用发件箱事件路由器SMT,通过转换Debezium原始消息生成上面的消息,就像这样:

# Kafka消息键:“406c07f3-26f0-4eea-a50c-109940064b8f”# Kafka消息头:“”# Kafka消息时间戳:1556890294484{“id”:“406c07f3-26f0-4eea-a50c- 1099400b8f”,“聚合id”:“1”,“聚合类型”:“订单”,“有效负载”:“{\“id\”:1,\“lineItems\”:[{\“id\”:1,\“item\”:\“Debezium in Action\”,\“status\”:\“ENTERED\”,\“quantity\”:2,\“totalPrice\”:39.98},{\“id\”:2,\“item\”:\“Deb开云体育官方注册网址ezium for Dummies\”,\“status\”:\“ENTERED\”,\“quantity\”:\“quantity\”:\“status\”:\“enter \”,\“status\”:\“ENTERED\”,\“quantity\”:\“quantity\”:1, \“totalPrice \”:29.99}],\“orderDate \”:\“2019 - 01 - 31 - t12:13:01 \”,\“customerId \”:123}”、“时间戳”:1556890294344,“类型”:“OrderCreated”},“源”:{“版本”:“2.1.2。Final", "connector": "postgresql", "name": "dbserver1-bare", "db": "orderdb", "ts_usec": 1556890294448870, "txId": 584, "lsn": 24064704, "schema": "inventory", "table": "outboxevent", "snapshot": false, "last_snapshot_record": null, "xmin": null}, "op": "c", "ts_ms": 1556890294484}

Debezium发件箱消息的示例是基开云体育官方注册网址于默认发件箱事件路由器配置,它假设一个发件箱表结构和基于聚合的事件路由。为了自定义行为,发件箱事件路由器SMT提供了许多配置选项

基本发件箱表

要应用默认的发件箱事件路由器SMT配置,假设您的发件箱表具有以下列:

列| Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null Type | character varying(255) | not null payload | jsonb |
表1。预期发件箱表列的描述
效果

id

包含事件的唯一ID。在发件箱消息中,此值是头。例如,您可以使用此ID删除重复的消息。

要从不同的发件箱表列获取事件的唯一ID,请设置table.field.event.id连接器配置中的SMT选项。

aggregatetype

包含SMT附加到连接器向其发出发件箱消息的主题名称的值。默认行为是用这个值替换默认值$ {routedByValue}中的变量route.topic.replacementSMT的选择。

例如,在默认配置中route.by.fieldSMT选项设置为aggregatetyperoute.topic.replacementSMT选项设置为outbox.event。$ {routedByValue}.假设您的应用程序向发件箱表添加了两条记录。的值aggregatetype列是客户.的值aggregatetype列是订单.连接器发出第一个记录到outbox.event.customers的话题。连接器将第二个记录发出到outbox.event.orders的话题。

若要从不同的发件箱表列获取此值,请设置route.by.field连接器配置中的SMT选项。

aggregateid

包含事件键,该键提供有效负载的ID。SMT将此值用作发出的发件箱消息中的键。这对于维护Kafka分区的正确顺序很重要。

若要从不同的发件箱表列获取事件键,请设置table.field.event.keySMT选项在连接器配置中。

有效载荷

发件箱更改事件的表示形式。默认结构是JSON。默认情况下,Kafka消息值仅由有效载荷价值。然而,如果发件箱事件被配置为包含额外的字段,Kafka消息值包含一个信封,封装有效负载和额外的字段,每个字段都是单独表示的。有关更多信息,请参见发出带有附加字段的消息

要从不同的发件箱表列获取事件有效负载,请设置table.field.event.payload连接器配置中的SMT选项。

其他自定义列

可以删除发件箱表中的任何其他列添加到发件箱事件可以在有效负载部分中,也可以作为消息头。

列就是一个例子eventType它传递一个用户定义的值,该值有助于对事件进行分类或组织。

基本配置

要配置Debezium连接器以开云体育官方注册网址支持发件箱模式,请配置发件箱。EventRouterSMT。例如,a的基本配置. properties文件是这样的:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter
定制配置

连接器可能发出多种类型的事件消息(例如,心跳消息、墓碑消息或关于事务或模式更改的元数据消息)。若要仅将转换应用于发件箱表中产生的事件,请定义选择性地应用转换的SMT谓词语句只对那些事件。

选择性地应用转换的选项

除了发生数据库更改时Debezium连接器发出的更改事件消息外,连接器还发出其他类型的消息,包括开云体育官方注册网址心跳消息和关于模式更改和事务的元数据消息。开云体育电动老虎机因为这些其他消息的结构与SMT设计用来处理的更改事件消息的结构不同,所以最好将连接器配置为选择性地应用SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一来配置连接器以选择性地应用SMT:

使用Avro作为有效载荷格式

发件箱事件路由器SMT支持任意有效负载格式。的有效载荷发件箱表中的列值被透明地传递。使用JSON的另一种选择是使用Avro。这对于消息格式治理和确保发件箱事件模式以向后兼容的方式演进是有益的。

源应用程序如何为发件箱消息有效负载生成Avro格式的内容超出了本文档的范围。一种可能是利用KafkaAvroSerializer要序列化的类GenericRecord实例。为了确保Kafka消息值是精确的Avro二进制数据,对连接器应用以下配置:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter value.converter = io.debezium.converters.BinaryDataConverter

默认情况下,有效载荷列值(Avro数据)是唯一的消息值。的配置BinaryDataConverter当值转换器传播有效载荷列值按原样转换为Kafka消息值。

Debe开云体育官方注册网址zium连接器可以配置为发出心跳、事务元数据或模式更改事件(支持程度因连接器而异)。属性不能序列化这些事件BinaryDataConverter因此,必须提供额外的配置,以便转换器知道如何序列化这些事件。作为一个例子,下面的配置说明了使用Apache KafkaJsonConverter没有模式:

变换=发件箱,…transforms.outbox.type=io.开云体育官方注册网址debezium.transforms.outbox.EventRouter value.converter=io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schema .enable=false

委托转换器方法指定实现delegate.converter.type选择。如果转换器需要任何额外的配置选项,也可以指定它们,例如使用禁用上面所示的模式schemas.enable = false

转换器io.开云体育官方注册网址debezium.converters.ByteBufferConverter自Debezium版本1.9以来已弃用,并在2.0中开云体育官方注册网址被移除。此外,当使用Kafka Connect时,连接器的配置必须在升级到Debezium 2.x之前更新开云体育官方注册网址

发出带有附加字段的消息

发件箱表可能包含要将其值添加到发出的发件箱消息中的列。例如,考虑一个值为的发件箱表定购单aggregatetype一列和另一列,eventType,其可能值为顺序而且order-shipped.可以使用该语法添加其他字段列:位置:别名

的允许值放置是:--信封-分区

发射eventType列的值,配置SMT像这样:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = eventType:标题:类型

结果将是Kafka消息中的一个报头类型作为它的键,而值eventType列作为其值。

发射eventType列值在发件箱消息信封,配置SMT如下:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = eventType:信封:类型

要控制在哪个分区上产生发件箱消息,可以这样配置SMT:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = partitionColumn:分区

注意,对于分区放置时,添加别名将没有任何效果。

将转义的JSON字符串扩展为JSON

您可能已经注意到Debezium发件箱消息包含开云体育官方注册网址有效载荷表示为字符串。所以当这个字符串实际上是JSON时,它在结果Kafka消息中显示为转义,如下所示:

# Kafka消息标题:“id=4d47e190-0402-4048-bc2c-89dd54343cdc”# Kafka消息时间戳:1556890294484{“{\“id\”:1,\“item\”:\“Debezium in Action\”,\“status\”:\“ENTERED\”,\“quantity\”:2,\“totalPrice\”:39.98},{\“id\”:2,\“item\”:\“Debezium for Dummi开云体育官方注册网址es\”,\“status\”:\“ENTERED\”,\“quantity\”:1,\“totalPrice\”:29.99}],\“orderDate\”:\“2019-01-31T12:13:01\”,\“customerId\”:123}”}

发件箱事件路由器允许您将此消息内容扩展为“真正的”JSON,并使用从JSON文档本身推导出的配套模式。这样一来,Kafka消息中的结果看起来就像:

# Kafka主题:outbox.event.order # Kafka消息键:"1" # Kafka消息头:"id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka消息时间戳:1556890294484 {"id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "q开云体育官方注册网址uantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123}

要启用此转换,必须设置table.expand.json.payload以真而用之JsonConverter像下图:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.expand.json。有效载荷= true value.converter = org.apache.kafka.connect.json.JsonConverter

配置选项

下表描述了可以为发件箱事件路由器SMT指定的选项。在表中,集团列表示Kafka的配置选项分类。

表2。发件箱事件路由器SMT配置选项说明
选项 默认的 集团 描述

警告

表格

故障时确定SMT的行为更新对发件箱表的操作。可能的设置有:

  • 警告—SMT记录一个警告,并继续到下一个发件箱表记录。

  • 错误—SMT记录错误并继续到下一个发件箱表记录。

  • 致命的—SMT日志错误,连接器停止处理。

预计发件箱表中的所有更改都将被更改插入操作。也就是说,一个发件箱表作为一个队列;不允许更新发件箱表中的记录。SMT会自动过滤掉删除对发件箱表的操作。

id

表格

指定包含唯一事件ID的发件箱表列。属性下的事件头部中存储此IDid关键。

aggregateid

表格

指定包含事件键的发件箱表列。当此列包含值时,SMT将该值用作发出的发件箱消息中的键。这对于维护Kafka分区的正确顺序很重要。

表格

默认情况下,发出的发件箱消息中的时间戳是Debezium事件时间戳。开云体育官方注册网址若要在发件箱邮件中使用不同的时间戳,请将此选项设置为一个发件箱表列,其中包含希望在发出的发件箱邮件中使用的时间戳。

有效载荷

表格

指定包含事件有效负载的发件箱表列。

表格

指定是否应该对String有效负载进行JSON扩展。如果没有发现内容或在解析错误的情况下,内容将保持“原样”。

有关详情,请参阅展开转义json部分。

忽略

表格

当启用JSON扩展属性时table.expand.json.payload,决定json有效负载的行为,包括值。可能的设置有:

  • 忽略—忽略空值。

  • optional_bytes—保留null值,将null作为connect的可选字节。

表,信封

指定要添加到发件箱邮件标题或信封中的一个或多个发件箱表列。指定以逗号分隔的对列表。在每一对中,指定一个列的名称,以及希望该值在标题中还是信封中。用冒号分隔对中的值,例如:

id:标题,我的领域:信封

要为列指定别名,请指定一个以别名为第三个值的三元组,例如:

id:标题,我的领域:信封:my-alias

第二个值是位置,而且必须始终如此信封

表、模式

方法中所描述的模式版本使用此值Kafka连接模式Javadoc。

aggregatetype

路由器

指定发件箱表中列的名称。默认行为是,此列中的值成为连接器向其发出发件箱消息的主题名称的一部分。一个例子是预期发件箱表的描述

(? < routedByValue >。*)

路由器

指定发件箱SMT在RegexRouter中应用到发件箱表记录的正则表达式。属性的设置的一部分route.topic.replacementSMT的选择。

默认行为是SMT替换默认值$ {routedByValue}变量的设置中route.topic.replacement属性设置的SMT选项route.by.field发件箱SMT选项。

outbox.event。$ {routedByValue}

路由器

指定连接器向其发出发件箱消息的主题的名称。默认的主题名称为outbox.event。其次是aggregatetype发件箱表记录中的列值。例如,如果aggregatetype值是客户,主题名称为outbox.event.customers

要更改主题名称,您可以:

路由器

指示是空的还是有效负载使连接器发出一个墓碑事件。

tracingspancontext

跟踪

包含跟踪跨度上下文的字段的名称。

开云体育官方注册网址debezium-read

跟踪

表示Debezium处理范围的操作名称。开云体育官方注册网址

跟踪

真正的只有具有序列化上下文字段的事件才应该被跟踪。

分布式跟踪

发件箱事件路由SMT支持分布式跟踪。看到跟踪文档欲知详情。