发件箱活动路由器

发件箱模式是一种安全、可靠的多个(微)服务之间交换数据。发件箱模式实现服务的内部状态之间避免不一致(通常保存在其数据库)和国家活动被服务需要相同的数据。开云体育电动老虎机

实现发件箱模式在Debezium应用程序中,配置一个Debezium连接器开云体育官方注册网址:

  • 捕捉一个发件箱表的变化

  • 应用Debezium开云体育官方注册网址发件箱活动路由器单消息转换(SMT)

De开云体育官方注册网址bezium连接器配置应用的发件箱SMT应该捕获变化只发生在一个发件箱表。有关更多信息,请参见选项有选择地应用转换

连接器可以捕获变化不止一个发件箱表只有在每一件箱表具有相同的结构。

看到可靠的Microservices数据交换与发件箱模式了解为什么发件箱模式是有用的,它是如何工作的。

对于您可以运行一个例子,看到发件箱模式演示,这是在Debezium示例库中。开云体育官方注册网址它包括如何配置的一个示例Debezium连接器运行事件路由器SMT发件箱。开云体育官方注册网址

发件箱事件路由器SMT与MongoDB连接器不兼容。

用户可以运行MongoDBMongoDB发件箱事件路由器SMT

发件箱消息示例

了解如何Debezium发件箱事件配置路由器开云体育官方注册网址SMT,审查以下的例子Debezium发件箱信息:

#卡夫卡主题:outbox.event。或der # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }

De开云体育官方注册网址bezium连接器配置为应用发件箱事件路由器SMT生成上述消息通过将Debezium原始消息是这样的:

#卡夫卡消息键:“406 c07f3 - 26 - f0 - 4 - eea - a50c - 109940064 - b8f”#卡夫卡消息标头:”、“#卡夫卡消息的时间戳:1556890294484{“前”:空,“后”:{" id ": " 406 c07f3 - 26 - f0 - 4 - eea - a50c - 109940064 - b8f”、“aggregateid”:“1”,“aggregatetype”:“秩序”,“有效载荷”:“{\ \“id”: 1, \“lineitem \”: [{\ \“id”: 1, \“项\”:\“Debezium行动\”,\“\”,\“进入\”,\“\”数量:2,\“tot开云体育官方注册网址alPrice \”: 39.98}, {\ \“id”: 2 \ \“项”:\“Debezium假人\”,\“\”,\“进入\”,\“数量\”:1,\“totalPrice \”: 29.99}], \“orderDate \”: \“2019 - 01 - 31 - t12:13:01 \”, \“customerId \”: 123}”,“时间戳”:1556890294344,“类型”:“OrderCreated”},“源”:{“版本”:“tripwire。最后”、“连接器”:“postgresql”、“名称”:“dbserver1-bare”、“分贝”:“orderdb”、“ts_usec”: 1556890294448870,“txId”: 584年,“lsn”: 24064704,“模式”:“库存”、“表”:“outboxevent”、“快照”:假的,”last_snapshot_record”:空,“xmin”:零},“op”:“c”、“ts_ms”: 1556890294484}

这个例子的一个消息是基于Debezi开云体育官方注册网址um发件箱默认发件箱事件路由器配置假设一个发件箱表结构和事件路由基于聚合。定制行为,事件路由器SMT提供了大量的发件箱配置选项

基本表发件箱

应用默认发件箱活动路由器SMT配置、发件箱表列被认为有以下:

专栏| |类型修饰符- - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - id | uuid | not null aggregatetype |字符不同(255)| not null aggregateid |字符不同(255)|非空类型不同(255)| |字符不是零载荷| jsonb |
表1。预计发件箱表列的描述
效果

id

包含事件的惟一ID。在发件箱信息,这个值是一个头。例如,您可以使用这个ID删除重复的消息。

获取事件的唯一ID从不同的发件箱表列,设置table.field.event.idSMT在连接器配置选项。

aggregatetype

包含一个值的SMT附加主题的名称,连接器会发出消息发件箱。默认行为是该值替换默认的$ {routedByValue}变量route.topic.replacementSMT的选择。

例如,在一个默认的配置,route.by.fieldSMT选项设置为aggregatetyperoute.topic.replacementSMT选项设置为outbox.event。$ {routedByValue}。假设您的应用程序添加了两个发件箱表记录。第一条记录中的值aggregatetype列是客户。在第二个记录中的值aggregatetype列是订单。连接器发出的第一个记录outbox.event.customers的话题。连接器发出第二个记录outbox.event.orders的话题。

获取这个值从一个不同的发件箱表列,设置route.by.fieldSMT在连接器配置选项。

aggregateid

包含事件的关键,为负载提供了一个ID。SMT使用该值作为键在发件箱发出消息。这是重要的维持正确的顺序卡夫卡分区。

获取事件从不同的发件箱关键表列,设置table.field.event.keySMT选项在连接器配置。

有效载荷

发件箱更改事件的表示。默认的结构是JSON。默认情况下,消息值仅由卡夫卡有效载荷价值。然而,如果发件箱事件配置包括附加字段,卡夫卡的消息值包含一个信封封装有效载荷和额外的字段,每个字段分别表示。有关更多信息,请参见排放与其他字段信息

获取事件有效负载从一个不同的发件箱表列,设置table.field.event.payloadSMT在连接器配置选项。

额外的自定义列

任何额外的从发件箱表列添加发件箱事件在有效载荷部分或消息头。

一个例子可能是一个列eventType这传达了一个用户定义的值,有助于分类或组织活动。

基本配置

配置一个Debezium连接器开云体育官方注册网址支持待发箱模式,配置outbox.EventRouterSMT。获得SMT的默认行为,将其添加到连接器配置不指定任何选项,如以下示例:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter
自定义配置

连接器可能发出许多类型的事件消息(例如,心跳消息,墓碑消息,或元数据信息交易或模式变化)。转换只应用于事件起源于发件箱表,定义SMT谓词选择性地应用转换的语句这些事件。

选项有选择地应用转换

除了改变事件消息,Debezium连接器排放数据库变化发生时,连接器还散发出其他类型的消息,包括开云体育官方注册网址心跳消息,和元数据信息模式变化和事务。开云体育电动老虎机因为这些消息的结构不同于SMT的改变事件消息的结构设计过程,最好配置连接器有选择地应用SMT,这样流程只有预期的数据变化信息。您可以使用下列方法之一来配置连接器应用SMT选择性:

使用Avro作为负载格式

发件箱事件路由器SMT支持任意负载格式。的有效载荷发件箱表中的列值是通过透明的。另一个使用JSON是使用Avro。这可以有利于消息格式治理和确保发件箱事件模式向后兼容的方式进化。

源应用程序产生Avro格式化的内容如何发件箱消息负载本文档的范围之内。一种可能性是杠杆KafkaAvroSerializer类序列化GenericRecord实例。确保卡夫卡消息值是确切的Avro二进制数据,应用以下配置连接器:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter value.converter = io.debezium.converters.BinaryDataConverter

默认情况下,有效载荷列值(Avro数据)是唯一的消息值。的配置BinaryDataConverter值转换器传播有效载荷列值初始到卡夫卡消息值。

Debe开云体育官方注册网址zium连接器可以配置为发出的心跳,事务的元数据,或模式更改事件(支持不同的连接器)。这些事件不能被序列化的BinaryDataConverter所以必须提供额外的配置转换器知道如何序列化这些事件。作为一个例子,下面的配置说明了使用Apache卡夫卡JsonConverter没有模式:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter value.converter = io.debezium.converters.BinaryDataConverter value.converter.delegate.converter.type = org.apache.kafka.connect.json.JsonConverter value.converter.delegate.converter.type.schemas.enable = false

委托转换器实现指定的delegate.converter.type选择。如果需要任何额外的配置选项的转换器,也可以指定,如上面所示的损坏模式使用schemas.enable = false

转换器io.开云体育官方注册网址debezium.converters.ByteBufferConverterDebezium 1.9版本以来已被弃用,在2.0被开云体育官方注册网址移除。此外,当使用卡夫卡连接连接器的配置必须升级到Debezium之前更新2. x开云体育官方注册网址

排放与其他字段信息

你的发件箱表可能包含列的值你想添加发件箱发出的消息。例如,考虑一个发件箱表的值定购单aggregatetype列,另一个列,eventType的可能值顺序order-shipped。可以添加其他字段的语法列:位置:别名

允许的值放置是:-- - - - - -信封- - - - - -分区

发出的eventType列值在发件箱消息头,配置SMT是这样的:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = eventType:标题:类型

结果将是卡夫卡的头信息类型作为其关键和的值eventType列的值。

发出的eventType列值在发件箱消息信封,配置SMT是这样的:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = eventType:信封:类型

控制哪个分区发件箱消息产生,配置SMT是这样的:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.fields.additional.placement = partitionColumn:分区

请注意,对分区位置,添加别名将没有影响。

扩大了JSON字符串作为JSON

你可能已经注意到,Debezium消息包含了发件箱开云体育官方注册网址有效载荷表示为一个字符串。所以当这个字符串,实际上是JSON,似乎逃出来的结果卡夫卡消息像如下所示:

#卡夫卡主题:outbox.event。或der # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}" }

发件箱事件路由器允许您扩展消息内容与同伴“真实”JSON模式演绎着从JSON文档本身。这样的结果看起来像卡夫卡信息:

#卡夫卡主题:outbox.event。或der # Kafka Message key: "1" # Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc" # Kafka Message Timestamp: 1556890294484 { "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123 }

要启用这个变换,你必须设置table.expand.json.payload真正的和使用JsonConverter像下图:

变换=发件箱,…transforms.outbox.type = i开云体育官方注册网址o.debezium.transforms.outbox.EventRouter transforms.outbox.table.expand.json。有效载荷= true value.converter = org.apache.kafka.connect.json.JsonConverter

配置选项

下表描述了发件箱事件的选项,您可以指定路由器SMT。在表中,集团列表明卡夫卡的配置选项分类。

表2。发件箱事件的描述路由器SMT配置选项
选项 默认的 集团 描述

警告

决定了SMT的行为有一个更新操作表发件箱。可能的设置:

  • 警告——SMT日志警告并继续下一个表记录发件箱。

  • 错误——SMT日志错误并继续下一件箱表记录。

  • 致命的——SMT日志错误和连接器停止处理。

所有更改的表将发件箱插入操作。也就是说,发件箱表函数作为一个队列;更新记录在一个表不允许发件箱。SMT自动过滤掉删除发件箱表的操作。

id

指定了发件箱表列包含独特的标识符,这个ID将存储在事件发出的头下id关键。

aggregateid

指定了发件箱表列包含事件的关键。这一列包含一个值时,SMT使用该值作为关键在发件箱发出消息。这是重要的维持正确的顺序卡夫卡分区。

默认情况下,在发件箱发出消息的时间戳Debezium事件的时间戳。开云体育官方注册网址发件箱中使用不同的时间戳信息,发件箱设置这个选项表列包含时间戳,你要在发件箱发出的消息。

有效载荷

指定了发件箱表列包含事件有效负载。

指定字符串的JSON扩张有效载荷是否应该做的。如果没有内容发现或解析错误,内容是“是”。

更多细节,请参阅扩大了json部分。

忽略

当启用JSON扩张性质table.expand.json.payload,确定包括一个json的行为的有效负载发件箱表上的价值。可能的设置:

  • 忽略——忽略空值。

  • optional_bytes——保持空值,并将null作为可选的字节的连接。

表,信封

指定一个或多个表的列发件箱,你想添加发件箱消息标头或信封。指定一个以逗号分隔的对。在每一对指定列的名称,以及你想要的值是否在标题或信封。单独的搭配冒号中的值,例如:

id:标题,我的领域:信封

为列指定一个别名,指定三个别名作为第三个值,例如:

id:标题,我的领域:信封:my-alias

第二个值是放置,它必须始终信封

表、模式

当设置,这个值被用作版本中描述的模式卡夫卡连接模式Javadoc。

aggregatetype

路由器

发件箱表中指定列的名称。默认行为是值在本专栏中成为一个部分的名称主题的连接器发出发件箱的消息。的一个例子预计发件箱表的描述

(? < routedByValue >。*)

路由器

指定一个正则表达式的发件箱SMT应用RegexRouter发件箱表记录。这个正则表达式的设置的一部分route.topic.replacementSMT的选择。

默认行为是SMT替换默认的$ {routedByValue}变量的设置route.topic.replacementSMT选项的设置route.by.field发件箱SMT选项。

outbox.event。$ {routedByValue}

路由器

指定主题的连接器的名称发出消息发件箱。默认的主题名称outbox.event。紧随其后的是aggregatetype发件箱表中的列值记录。例如,如果aggregatetype值是客户主题名称outbox.event.customers

改变主题名称,您可以:

路由器

指示是否一个空有效载荷导致连接器发出一个墓碑上的事件。

tracingspancontext

跟踪

字段的名称包含跟踪跨度上下文。

开云体育官方注册网址debezium-read

跟踪

操作名称代表Debezium处理。开云体育官方注册网址

跟踪

真正的只有事件序列化上下文字段应该跟踪。

分布式跟踪

发件箱事件路由SMT支持分布式跟踪。看到跟踪文档为更多的细节。