MongoDB发件箱活动路由器

这SMT仅供使用Debezium MongoDB连接器。开云体育官方注册网址信息使用关系数据库的发件箱事件路由器SMT,明白了开云体育电动老虎机发件箱活动路由器

发件箱模式是一种安全、可靠的多个(微)服务之间交换数据。发件箱模式实现服务的内部状态之间避免不一致(通常保存在其数据库)和国家活动被服务需要相同的数据。开云体育电动老虎机

实现发件箱模式在Debezium应用程序中,配置一个Debezium连接器开云体育官方注册网址:

  • 发件箱收集捕捉变化

  • 应用Debezium开云体育官方注册网址 MongoDB发件箱活动路由器单消息转换(SMT)

De开云体育官方注册网址bezium连接器配置为使用MongoDB发件箱SMT应该捕获变化只发生在一个发件箱集合。有关更多信息,请参见选项有选择地应用转换

连接器可以捕获变化不止一个发件箱收集只有每个发件箱组都有相同的结构。

使用这个SMT,实际业务的操作集合(s)和插入件箱收集必须作为一个多文档事务的一部分,一直以来支持MongoDB 4.0,以防止潜在的数据业务集合之间的矛盾集合(s)和发件箱。为未来的更新,使更新现有数据和插入件箱事件在没有多文档的ACID事务交易,我们计划以支持额外的配置将发件箱事件存储在现有的子文档集合的一种形式,而不是一个独立的发件箱。

发件箱模式的更多信息,请参阅可靠的Microservices数据交换与发件箱模式

发件箱消息示例

了解如何配置路由器SMT Debezium MongoDB发件箱事件,开云体育官方注册网址考虑以下的例子Debezium发件箱信息:

#卡夫卡主题:outbox.event。或der # Kafka Message key: "b2730779e1f596e275826f08" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }

De开云体育官方注册网址bezium连接器配置为使用MongoDB发件箱事件路由器SMT生成之前的消息通过将原始Debezium改变事件消息如以下示例:

#卡夫卡消息键:{" id ": "{\“美元oid \”: \“596 e275826f08b2730779e1f \“}”} #卡夫卡消息标头:”、“#卡夫卡消息的时间戳:1556890294484{“补丁”:空,“后”:“{\“_id \”:{\“美元oid \”: \}“596 e275826f08b2730779e1f \”, \“aggregateid \”:{\“美元oid \”: \}“b2730779e1f596e275826f08 \”, \“aggregatetype \”, \“\”, \“\”, \“OrderCreated \”, \“载荷\”:{\“_id \”:{\“美元oid \”: \}“da8d6de63b7745ff8f4457db \”, \“lineitem \”: [{\ \“id”: 1, \“项\”:\“Debezium行动\”,\“\”,\“进入\”,\“\”数量:2,\“totalPrice \”: 39.98}, {\ 开云体育官方注册网址\“id”: 2 \ \“项”:\“Debezium假人\”,\“\”,\“进入\”,\“数量\”:1,\“totalPrice \”: 29.99}], \“orderDate \”, \“2019 - 01 - 31 - t12:13:01 \”, \“customerId \”: 123}}”,“源”:{“版本”:“tripwire。最后”、“连接器”:“mongodb”、“名称”:“满足”、“ts_ms”: 1558965508000,“快照”:假的,“分贝”:“库存”、“rs”:“rs0”、“收藏”:“客户”、“奥德”:31日“h”: 1546547425148721999},“人事处”:“c”、“ts_ms”: 1556890294484}

这个例子的一个消息是基于Debezi开云体育官方注册网址um发件箱默认发件箱事件路由器配置假设一个发件箱收集结构和基于聚合的事件路由。定制行为,事件路由器SMT提供了大量的发件箱配置选项

收集基本的发件箱

应用默认MongoDB配置路由器SMT发件箱事件,你的发件箱收集被认为有以下字段:

{" _id ": objectId”、“aggregatetype”:“弦”、“aggregateid”:“objectId”、“类型”:“弦”、“有效载荷”:“对象”}
表1。预计发件箱收集字段的描述
效果

id

包含事件的惟一ID。在发件箱信息,这个值是一个头。例如,您可以使用这个ID删除重复的消息。

获取事件的唯一ID从不同的发件箱收集字段,设置collection.field.event.idSMT在连接器配置选项。

aggregatetype

包含一个值的SMT附加主题的名称,连接器会发出消息发件箱。默认行为是该值替换默认的$ {routedByValue}变量route.topic.replacementSMT的选择。

例如,在一个默认的配置,route.by.fieldSMT选项设置为aggregatetyperoute.topic.replacementSMT选项设置为outbox.event。$ {routedByValue}。假设您的应用程序增加了发件箱的两个文档集合。第一个文档中的值aggregatetype字段是客户。在第二个文档中的值aggregatetype字段是订单。连接器发出的第一个文档outbox.event.customers的话题。连接器发出第二个文档outbox.event.orders的话题。

获取这个值从一个不同的发件箱收集字段,设置route.by.fieldSMT在连接器配置选项。

aggregateid

包含事件的关键,为负载提供了一个ID。SMT使用该值作为键在发件箱发出消息。这是重要的维持正确的顺序卡夫卡分区。

从不同的发件箱收集获取事件的关键字段,设置collection.field.event.keySMT在连接器配置选项。

有效载荷

发件箱更改事件的表示。默认的结构是JSON。默认情况下,消息值仅由卡夫卡有效载荷价值。然而,如果发件箱事件配置包括附加字段,卡夫卡的消息值包含一个信封封装有效载荷和额外的字段,每个字段分别表示。有关更多信息,请参见排放与其他字段信息

获取事件有效负载从一个不同的发件箱收集字段,设置collection.field.event.payloadSMT在连接器配置选项。

额外的定制字段

发件箱的任何额外的字段集合添加发件箱事件在有效载荷部分或消息头。

一个例子可能是一个字段eventType这传达了一个用户定义的值,有助于分类或组织活动。

基本配置

配置一个Debezium Mo开云体育官方注册网址ngoDB连接器支持待发箱模式,配置outbox.MongoEventRouterSMT。获得SMT的默认行为,将其添加到连接器配置不指定任何选项,如以下示例:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
自定义配置

连接器可能发出许多类型的事件消息(例如,心跳消息,墓碑消息,或元数据信息事务)。转换只应用于事件起源于发件箱集合,定义SMT谓词选择性地应用转换的语句这些事件。

选项有选择地应用转换

除了改变事件消息,Debezium连接器排放数据库变化发生时,连接器还散发出其他类型的消息,包括开云体育官方注册网址心跳消息,和元数据信息模式变化和事务。开云体育电动老虎机因为这些消息的结构不同于SMT的改变事件消息的结构设计过程,最好配置连接器有选择地应用SMT,这样流程只有预期的数据变化信息。您可以使用下列方法之一来配置连接器应用SMT选择性:

使用Avro作为负载格式

MongoDB发件箱事件路由器SMT支持任意负载格式。的有效载荷字段值的发件箱收集是通过透明的。另一个使用JSON是使用Avro。这可以有利于消息格式治理和确保发件箱事件模式向后兼容的方式进化。

源应用程序产生Avro格式化的内容如何发件箱消息负载本文档的范围之内。一种可能性是杠杆KafkaAvroSerializer类序列化GenericRecord实例。确保卡夫卡消息值是确切的Avro二进制数据,应用以下配置连接器:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.connector.mongodb.transforms.outbox.MongoEventRouter value.converter = io.debezium.converters.ByteArrayConverter

默认情况下,有效载荷字段值(Avro数据)是唯一的消息值。的配置ByteArrayConverter值转换器传播有效载荷字段值初始到卡夫卡消息值。

请注意,这不同于BinaryDataConverter建议其他smt。这是由于不同的方法需要MongoDB在内部存储字节数组。

Debe开云体育官方注册网址zium连接器可以配置为发出的心跳,事务的元数据,或模式更改事件(支持不同的连接器)。这些事件不能被序列化的ByteArrayConverter所以必须提供额外的配置转换器知道如何序列化这些事件。作为一个例子,下面的配置说明了使用Apache卡夫卡JsonConverter没有模式:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.connector.mongodb.transforms.outbox.MongoEventRouter value.converter = io.debezium.converters.ByteArrayConverter value.converter.delegate.converter.type = org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable = false

委托转换器实现指定的delegate.converter.type选择。如果需要任何额外的配置选项的转换器,也可以指定,如上面所示的损坏模式使用schemas.enable = false

排放与其他字段信息

你的发件箱集合可能包含字段的值你想添加发件箱发出的消息。例如,考虑一个发件箱的值的集合定购单aggregatetype字段和另一个字段,eventType的可能值顺序order-shipped。可以添加其他字段的语法领域:位置:别名

允许的值放置是:-- - - - - -信封- - - - - -分区

发出的eventType在发件箱消息头字段值,配置SMT是这样的:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement = eventType:标题:类型

结果将是卡夫卡的头信息类型作为其关键和的值eventType字段的值。

发出的eventType字段值在发件箱消息信封,配置SMT是这样的:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement = eventType:信封:类型

控制哪个分区发件箱消息产生,配置SMT是这样的:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.collection.fields.additional.placement = partitionField:分区

请注意,对分区位置,添加别名将没有影响。

扩大了JSON字符串作为JSON

默认情况下,有效载荷Debeziu开云体育官方注册网址m的发件箱消息表示为一个字符串。当字符串的原始JSON格式的,由此产生的卡夫卡消息使用转义序列来表示字符串,如以下示例所示:

#卡夫卡主题:outbox.event。或der # Kafka Message key: "1" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }

您可以配置路由器发件箱事件扩大消息内容,将逃脱JSON回到原来,保有的JSON格式。转换后的字符串,同伴从原始JSON文档模式推导出。下面的例子显示了JSON扩大在生成的卡夫卡的信息:

#卡夫卡主题:outbox.event。或der # Kafka Message key: "1" # Kafka Message Headers: "id=596e275826f08b2730779e1f" # Kafka Message Timestamp: 1556890294484 { "id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123 }

使字符串转换在转换中,设置的值collection.expand.json.payload真正的并使用StringConverter如以下示例所示:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.connector.mongodb.transforms.outbox.MongoEventRouter transforms.outbox.collection.expand.json。有效载荷= true value.converter = org.apache.kafka.connect.storage.StringConverter

配置选项

下表描述了发件箱事件的选项,您可以指定路由器SMT。在表中,集团列表明卡夫卡的配置选项分类。

表2。发件箱事件的描述路由器SMT配置选项
选项 默认的 集团 描述

警告

集合

决定了SMT的行为有一个更新操作在发件箱系列。可能的设置:

  • 警告——SMT日志警告并继续下一个发件箱收集文档。

  • 错误——SMT日志错误并继续接下来的发件箱收集文档。

  • 致命的——SMT日志错误和连接器停止处理。

发件箱中的所有变化预计将是一个集合插入或删除操作。也就是说,一个发件箱集合函数作为一个队列;更新文件在发件箱不允许集合。SMT自动过滤掉删除操作(删除发件箱进行事件)发件箱收集。

_id

集合

指定了发件箱收集字段包含独特的标识符,这个ID将存储在事件发出的头下id关键。

aggregateid

集合

指定了发件箱收集字段包含事件的关键。该字段包含一个值时,SMT使用该值作为关键在发件箱发出消息。这是重要的维持正确的顺序卡夫卡分区。

集合

默认情况下,在发件箱发出消息的时间戳Debezium事件的时间戳。开云体育官方注册网址发件箱中使用不同的时间戳信息,设置这个选项将发件箱的收集字段包含时间戳,你要在发件箱发出的消息。

有效载荷

集合

指定了发件箱收集字段包含事件有效负载。

集合

指定字符串的JSON扩张有效载荷是否应该做的。如果没有内容发现或解析错误,内容是“是”。

更多细节,请参阅扩大了json部分。

收集、信封

指定一个或多个发件箱收集字段,您想要添加发件箱消息标头或信封。指定一个以逗号分隔的对。在每一对指定字段的名称,以及你想要的值是否在标题或信封。单独的搭配冒号中的值,例如:

id:标题,我的领域:信封

为字段中,指定别名指定三别名作为第三个值,例如:

id:标题,我的领域:信封:my-alias

第二个值是放置,它必须始终信封

收集、模式

当设置,这个值被用作版本中描述的模式卡夫卡连接模式Javadoc。

aggregatetype

路由器

发件箱中的指定的名称字段。默认情况下,指定的值在这个领域成为一个部分的名称主题连接器发出的发件箱的消息。例如,看到描述预期的发件箱收集

(? < routedByValue >。*)

路由器

指定一个正则表达式的发件箱SMT应用RegexRouter发件箱收集文件。这个正则表达式的设置的一部分route.topic.replacementSMT的选择。

+默认行为是SMT替换默认的$ {routedByValue}变量的设置route.topic.replacementSMT选项的设置route.by.field发件箱SMT选项。

outbox.event。$ {routedByValue}

路由器

指定主题的连接器的名称发出消息发件箱。默认的主题名称outbox.event。紧随其后的是aggregatetype发件箱收集文档中的字段值。例如,如果aggregatetype值是客户主题名称outbox.event.customers

+改变主题名称,您可以:

路由器

指示是否一个空有效载荷导致连接器发出一个墓碑上的事件。

tracingspancontext

跟踪

字段的名称包含跟踪跨度上下文。

开云体育官方注册网址debezium-read

跟踪

操作名称代表Debezium处理。开云体育官方注册网址

跟踪

真正的只有事件序列化上下文字段应该跟踪。

分布式跟踪

发件箱事件路由SMT支持分布式跟踪。看到跟踪文档为更多的细节。