MongoDB发件箱事件路由器

此SMT仅用于Debezium MongoDB连接器。开云体育官方注册网址有关为关系数据库使用发件箱事件路由器SMT的信息,请参见开云体育电动老虎机发件箱事件路由器

发件箱模式是在多个(微)服务之间安全可靠地交换数据的一种方式。发件箱模式实现避免了服务的内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件中的状态之间的不一致。开云体育电动老虎机

要在Debezium应用程序中实现发件箱模式,请配置Debezium连接器开云体育官方注册网址:

  • 捕获发件箱集合中的更改

  • 应用Debezium开云体育官方注册网址 MongoDB发件箱事件路由器单消息转换(SMT)

配置开云体育官方注册网址为应用MongoDB发件箱SMT的Debezium连接器应该只捕获发件箱集合中发生的更改。有关更多信息,请参见选择性地应用转换的选项

只有当每个发件箱集合具有相同的结构时,连接器才能捕获多个发件箱集合中的更改。

要使用此SMT,对实际业务集合的操作和插入到发件箱集合的操作必须作为多文档事务的一部分进行,这从MongoDB 4.0开始支持,以防止业务集合和发件箱集合之间潜在的数据不一致。对于未来的更新,为了在没有多文档事务的情况下更新现有数据并在ACID事务中插入发件箱事件,我们计划支持额外的配置,以现有集合的子文档的形式存储发件箱事件,而不是独立的发件箱集合。

有关发件箱模式的详细信息,请参见使用发件箱模式进行可靠的微服务数据交换

发件箱消息示例

要了解如何配置Debezium MongoDB发件箱事件路由器SMT,开云体育官方注册网址请考虑以下Debezium发件箱消息的示例:

# Kafka消息标题:"id=596e275826f08b2730779e1f" # Kafka消息时间戳:1556890294484 {"{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": 开云体育官方注册网址\"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\":\"2019-01-31T12:13:01\", \"customerId\": 123}"}

De开云体育官方注册网址bezium连接器被配置为应用MongoDB发件箱事件路由器SMT,通过转换原始Debezium更改事件消息生成上述消息,示例如下:

#卡夫卡消息键:{" id ": "{\“美元oid \”:\“596 e275826f08b2730779e1f \“}”}#卡夫卡消息标头:”、“#卡夫卡消息的时间戳:1556890294484{“补丁”:空,“后”:“{\“_id \”:{\“美元oid \”:\}“596 e275826f08b2730779e1f \”,\“aggregateid \”:{\“美元oid \”:\}“b2730779e1f596e275826f08 \”,\“aggregatetype \”,\“\”,\“\”,\“OrderCreated \”,\“载荷\”:{\“_id \”:{\“美元oid \”:\}“da8d6de63b7745ff8f4457db \”,\“lineitem \”:[{\ \“id”:1,\“项\”:\“Debezium行动\”,\“\”:开云体育官方注册网址\“进入\”,\“数量\”:2,\“totalPrice \”:39.98},{\ \“id”:2 \ \“项”:\“Debezium假人\”,\“\”,\开云体育官方注册网址“进入\”,\“数量\”:1,\“totalPrice \”:29.99}],\“orderDate \”:\“2019 - 01 - 31 - t12:13:01 \”,\“customerId \”:123}}”、“源”:{“版本”:“2.1.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999}, "op": "c", "ts_ms": 1556890294484}

Debezium发件箱消息的示例是基开云体育官方注册网址于默认发件箱事件路由器配置,它假设一个发件箱集合结构和基于聚合的事件路由。为了自定义行为,发件箱事件路由器SMT提供了许多配置选项

基本发件箱收集

要应用默认的MongoDB发件箱事件路由器SMT配置,您的发件箱集合假设具有以下字段:

{"_id": "objectId", "aggregatetype": "string", "aggregateid": "objectId", "type": "string", "payload": "object"}
表1。预期发件箱收集字段的描述
效果

id

包含事件的唯一ID。在发件箱消息中,此值是头。例如,您可以使用此ID删除重复的消息。

若要从不同的发件箱收集字段获取事件的唯一ID,请设置collection.field.event.id连接器配置中的SMT选项。

aggregatetype

包含SMT附加到连接器向其发出发件箱消息的主题名称的值。默认行为是用这个值替换默认值$ {routedByValue}中的变量route.topic.replacementSMT的选择。

例如,在默认配置中route.by.fieldSMT选项设置为aggregatetyperoute.topic.replacementSMT选项设置为outbox.event。$ {routedByValue}.假设您的应用程序向发件箱集合添加了两个文档。中的值aggregatetype字段是客户.中的值aggregatetype字段是订单.连接器将第一个文档发出到outbox.event.customers的话题。连接器将第二个文档发出到outbox.event.orders的话题。

若要从不同的发件箱集合字段获取此值,请设置route.by.field连接器配置中的SMT选项。

aggregateid

包含事件键,该键提供有效负载的ID。SMT将此值用作发出的发件箱消息中的键。这对于维护Kafka分区的正确顺序很重要。

若要从不同的发件箱收集字段获取事件密钥,请设置collection.field.event.key连接器配置中的SMT选项。

有效载荷

发件箱更改事件的表示形式。默认结构是JSON。默认情况下,Kafka消息值仅由有效载荷价值。然而,如果发件箱事件被配置为包含额外的字段,Kafka消息值包含一个信封,封装有效负载和额外的字段,每个字段都是单独表示的。有关更多信息,请参见发出带有附加字段的消息

要从不同的发件箱收集字段获取事件有效负载,请设置collection.field.event.payload连接器配置中的SMT选项。

其他自定义字段

发件箱集合中的任何其他字段都可以添加到发件箱事件可以在有效负载部分中,也可以作为消息头。

字段就是一个例子eventType它传递一个用户定义的值,该值有助于对事件进行分类或组织。

基本配置

要配置Debezium连接器以开云体育官方注册网址支持发件箱模式,请配置发件箱。EventRouterSMT。a . SMT的基本配置示例如下. properties文件:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
定制配置

连接器可能发出多种类型的事件消息(例如,心跳消息、墓碑消息或关于事务的元数据消息)。若要将转换仅应用于起源于发件箱集合的事件,请定义选择性地应用转换的SMT谓词语句只对那些事件。

选择性地应用转换的选项

除了发生数据库更改时Debezium连接器发出的更改事件消息外,连接器还发出其他类型的消息,包括开云体育官方注册网址心跳消息和关于模式更改和事务的元数据消息。开云体育电动老虎机因为这些其他消息的结构与SMT设计用来处理的更改事件消息的结构不同,所以最好将连接器配置为选择性地应用SMT,以便它只处理预期的数据更改消息。您可以使用以下方法之一来配置连接器以选择性地应用SMT:

使用Avro作为有效载荷格式

MongoDB发件箱事件路由器SMT支持任意有效负载格式。的有效载荷发件箱集合中的字段值透明地传递。使用JSON的另一种选择是使用Avro。这对于消息格式治理和确保发件箱事件模式以向后兼容的方式演进是有益的。

源应用程序如何为发件箱消息有效负载生成Avro格式的内容超出了本文档的范围。一种可能是利用KafkaAvroSerializer要序列化的类GenericRecord实例。为了确保Kafka消息值是精确的Avro二进制数据,对连接器应用以下配置:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.connector.mongodb.transforms.outbox.MongoEventRouter value.converter = io.debezium.converters.ByteArrayConverter

默认情况下,有效载荷字段值(Avro数据)是唯一的消息值。的配置ByteArrayConverter当值转换器传播有效载荷字段值按原样转换为Kafka消息值。

注意,这与BinaryDataConverter建议用于其他smt。这是因为MongoDB内部存储字节数组的方法不同。

Debe开云体育官方注册网址zium连接器可以配置为发出心跳、事务元数据或模式更改事件(支持程度因连接器而异)。属性不能序列化这些事件ByteArrayConverter因此,必须提供额外的配置,以便转换器知道如何序列化这些事件。作为一个例子,下面的配置说明了使用Apache KafkaJsonConverter没有模式:

变换=发件箱,…transforms.outbox.type=io.开云体育官方注册网址 debezum .connector.mongodb.transforms.outbox. mongoeventrouter value.converter=io. debezum .converters. bytearrayconverter value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schema .enable=false

委托转换器方法指定实现delegate.converter.type选择。如果转换器需要任何额外的配置选项,也可以指定它们,例如使用禁用上面所示的模式schemas.enable = false

发出带有附加字段的消息

发件箱集合可能包含希望将其值添加到发出的发件箱消息的字段。例如,考虑一个值为的发件箱集合定购单aggregatetype一个领域和另一个领域,eventType,其可能值为顺序而且order-shipped.可以使用该语法添加其他字段领域:位置:别名

的允许值放置是:--信封-分区

发射eventType字段的值,配置SMT如下所示:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement = eventType:标题:类型

结果将是Kafka消息中的一个报头类型作为它的键,而值eventType字段作为它的值。

发射eventType字段值在发件箱消息信封,配置SMT像这样:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement = eventType:信封:类型

要控制在哪个分区上产生发件箱消息,可以这样配置SMT:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement = partitionField:分区

注意,对于分区放置时,添加别名将没有任何效果。

将转义的JSON字符串扩展为JSON

默认情况下,有效载荷Debeziu开云体育官方注册网址m发件箱消息的内容表示为字符串。当字符串的原始来源是JSON格式时,产生的Kafka消息使用转义序列来表示字符串,如下例所示:

# Kafka消息标题:"id=596e275826f08b2730779e1f" # Kafka消息时间戳:1556890294484 {"{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98},开云体育官方注册网址 {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\":\"2019-01-31T12:13:01\", \"customerId\": 123}"}

您可以配置发件箱事件路由器以扩展消息内容,将转义的JSON转换回原始的未转义的JSON格式。在转换后的字符串中,从原始JSON文档推导出相应的模式。下面的例子显示了结果Kafka消息中的扩展JSON:

# Kafka主题:outbox.event.order # Kafka消息键:"1" # Kafka消息头:"id=596e275826f08b2730779e1f" # Kafka消息时间戳:1556890294484 {"id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "q开云体育官方注册网址uantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123}

若要在转换中启用字符串转换,请设置的值collection.expand.json.payload真正的并使用StringConverter示例如下:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.connector.mongodb.transforms.outbox.MongoEventRouter transforms.outbox.collection.expand.json。有效载荷= true value.converter = org.apache.kafka.connect.storage.StringConverter

配置选项

下表描述了可以为发件箱事件路由器SMT指定的选项。在表中,集团列表示Kafka的配置选项分类。

表2。发件箱事件路由器SMT配置选项说明
选项 默认的 集团 描述

警告

集合

确定在发件箱集合上有更新操作时SMT的行为。可能的设置有:

  • 警告—SMT记录警告并继续到下一个发件箱收集文档。

  • 错误—SMT记录错误,并继续到下一个发件箱收集文档。

  • 致命的—SMT日志错误,连接器停止处理。

发件箱集合中的所有更改都应该是插入或删除操作。也就是说,发件箱集合作为队列;不允许更新发件箱集合中的文档。SMT自动过滤掉发件箱集合上的删除操作(用于删除正在进行的发件箱事件)。

_id

集合

指定包含唯一事件ID的发件箱集合字段。属性下的事件头部中存储此IDid关键。

aggregateid

集合

指定包含事件键的发件箱集合字段。当此字段包含值时,SMT将使用该值作为发出的发件箱消息中的键。这对于维护Kafka分区的正确顺序很重要。

集合

默认情况下,发出的发件箱消息中的时间戳是Debezium事件时间戳。开云体育官方注册网址若要在发件箱邮件中使用不同的时间戳,请将此选项设置为一个发件箱集合字段,其中包含您希望在发出的发件箱邮件中使用的时间戳。

有效载荷

集合

指定包含事件有效负载的发件箱收集字段。

集合

指定是否应该对String有效负载进行JSON扩展。如果没有发现内容或在解析错误的情况下,内容将保持“原样”。

有关详情,请参阅展开转义json部分。

收集、信封

指定要添加到发件箱邮件头或信封中的一个或多个发件箱集合字段。指定以逗号分隔的对列表。在每一对中,指定字段的名称,以及希望该值在头中还是在信封中。用冒号分隔对中的值,例如:

id:标题,我的领域:信封

要为字段指定别名,请指定一个以别名为第三个值的三元组,例如:

id:标题,我的领域:信封:my-alias

第二个值是位置,而且必须始终如此信封

收集、模式

方法中所描述的模式版本使用此值Kafka连接模式Javadoc。

aggregatetype

路由器

指定发件箱集合中字段的名称。默认情况下,此字段中指定的值成为连接器向其发出发件箱消息的主题名称的一部分。有关示例,请参见预期的发件箱集合的描述

(? < routedByValue >。*)

路由器

指定发件箱SMT在RegexRouter中应用于发件箱集合文档的正则表达式。属性的设置的一部分route.topic.replacementSMT的选择。

+默认行为是SMT替换默认$ {routedByValue}变量的设置中route.topic.replacement属性设置的SMT选项route.by.field发件箱SMT选项。

outbox.event。$ {routedByValue}

路由器

指定连接器向其发出发件箱消息的主题的名称。默认的主题名称为outbox.event。其次是aggregatetype发件箱集合文档中的字段值。例如,如果aggregatetype值是客户,主题名称为outbox.event.customers

+要更改主题名称,您可以:

路由器

指示是空的还是有效负载使连接器发出一个墓碑事件。

tracingspancontext

跟踪

包含跟踪跨度上下文的字段的名称。

开云体育官方注册网址debezium-read

跟踪

表示Debezium处理范围的操作名称。开云体育官方注册网址

跟踪

真正的只有具有序列化上下文字段的事件才应该被跟踪。

分布式跟踪

发件箱事件路由SMT支持分布式跟踪。看到跟踪文档欲知详情。