作为业务逻辑的一部分,微服务通常不仅需要更新自己的本地数据存储,还需要将发生的数据更改通知其他服务。发件箱模式描述了一种让服务以安全和一致的方式执行这两个任务的方法;它为源服务提供即时的“读自己写的”语义,同时跨服务边界提供可靠的、最终一致的数据交换。

更新(2019年9月13日):为了简化发件箱模式的使用,Debezium现在提供了一个现成的开云体育官方注册网址用于路由发件箱事件的SMT.本文中讨论的自定义SMT不再需要了。

如果您已经构建了几个微服务,您可能会同意最难的部分是数据:微服务不是孤立存在的,它们经常需要在彼此之间传播数据和数据更改。

例如,考虑一个管理购买订单的微服务:当下了一个新订单时,关于该订单的信息可能必须传递给发货服务(这样它就可以组装一个或多个订单的发货)和客户服务(这样它就可以根据新订单更新诸如客户总信用余额之类的内容)。

有不同的方法可以让订单服务了解关于新购买订单的另外两种方法;例如,它可以调用一些休息grpc或由这些服务提供的其他(同步)API。不过,这可能会产生一些不必要的耦合:发送服务必须知道要调用哪些其他服务以及在哪里找到它们。它还必须为这些服务暂时不可用做好准备。服务网格如Istio可以在这里提供有用的功能,如请求路由,重试,断路器和更多。

任何同步方法的一般问题是,一个服务在没有它所调用的其他服务的情况下无法真正发挥作用。而缓冲和重试在其他服务只需要的情况下可能会有所帮助通知对于某些事件,如果服务确实需要,则不是这样查询其他信息服务。例如,在下达购买订单时,订单服务可能需要从库存服务获取所购买商品的库存次数。

这种同步方法的另一个缺点是缺乏可重玩性,即在事件发送后,新消费者到达的可能性仍然能够从头消费整个事件流。

这两个问题都可以通过使用异步数据交换方法来解决:即让订单、库存和其他服务通过持久消息日志传播事件,例如Apache卡夫卡.通过订阅这些事件流,每个服务都将收到其他服务数据更改的通知。它可以对这些事件做出反应,如果需要,还可以在自己的数据存储中创建该数据的本地表示,使用适合自己需求的表示。例如,可以对这种视图进行反规范化,以有效地支持特定的访问模式,或者它可能只包含与消费服务相关的原始数据的子集。

持久日志还支持可重玩性,即可以根据需要添加新的消费者,启用您最初可能没有想到的用例,并且不涉及源服务。例如,考虑一个数据仓库,它应该保存所有曾经下过的订单的信息,或者基于购买订单的一些全文搜索功能Elasticsearch.一旦购买订单事件在Kafka主题中(Kafka的主题保留策略设置可用于确保事件在给定的用例和业务需求所需的时间内保持在主题中),新的消费者可以订阅,从一开始就处理主题,并实现微服务数据库、搜索索引、数据仓库等中所有数据的视图。开云体育电动老虎机

如何处理主题增长

根据数据量(记录的数量和大小,更改的频率),在主题中长时间保存事件可能可行,也可能不可行,甚至可能不确定。通常,从业务角度来看,在给定时间点之后,与给定数据项相关的一些甚至所有事件(例如,特定的采购订单)都可以删除。请参见下面的“从Kafka主题中删除事件”框,了解更多关于从Kafka主题中删除事件以保持其大小在限制范围内的想法。

双写入的问题

为了提供它们的功能,微服务通常会有自己的本地数据存储。例如,订单服务可以使用关系数据库持久化关于购买订单的信息。开云体育电动老虎机当下一个新订单时,这可能会导致一个插入表中的操作PurchaseOrder在服务的数据库中。开云体育电动老虎机同时,该服务可能希望向Apache Kafka发送关于新订单的事件,以便将该信息传播到其他感兴趣的服务。

但是,仅仅发出这两个请求可能会导致潜在的不一致。原因是我们不能拥有一个跨服务数据库和Apache Kafka的共享事务,因为后者不支持在分布式(XA)事务中登记。开云体育电动老虎机所以在不幸的情况下,可能会发生这样的情况:我们最终将新的采购订单保存在本地数据库中,但没有将相应的消息发送到Kafka(例如由于一些网络问题)。开云体育电动老虎机或者,反过来,我们可能已经将消息发送给Kafka,但未能在本地数据库中持久化购买订单。开云体育电动老虎机这两种情况都不可取;这可能导致看似成功下单的订单无法创建发货。或者创建了货物,但是在订单服务本身中没有相应的购买订单的痕迹。

那么如何避免这种情况呢?答案是只修改一个这两个资源(数据库开云体育电动老虎机Apache Kafka),并以最终一致的方式驱动第二个版本的更新。让我们首先考虑只向Apache Kafka写入的情况。

当收到新的购买订单时,订单服务将不会执行插入同步进入其数据库;开云体育电动老虎机相反,它只会向Kafka主题发送一个描述新秩序的事件。因此,一次只能修改一个资源,如果该资源出现问题,我们将立即发现并向订单服务的调用者报告请求失败。

同时,服务本身也会订阅这个Kafka主题。这样,当新消息到达主题时,它将得到通知,并且可以在其数据库中持久化新的购买订单。开云体育电动老虎机不过,这里有一个微妙的挑战,那就是缺乏“读自己写”的语义。例如,让我们假设订单服务也有一个API,用于搜索给定客户的所有购买订单。当在下了新订单后立即调用该API时,由于Kafka主题处理消息的异步性质,可能会发生购买订单还没有保存在服务的数据库中,因此查询不会返回。开云体育电动老虎机这可能会导致非常混乱的用户体验,例如,用户可能会在他们的购物历史中错过新下的订单。有很多方法可以处理这种情况,例如,该服务可以将新下的购买订单保存在内存中,并基于此回答后续的查询。但是,当实现更复杂的查询或考虑到订单服务可能还包含集群设置中的多个节点时,这很快就变得不简单了,这将需要在集群中传播该数据。

现在,如果只同步写入数据库,并基于此驱动消息导出到Apache Kafka,情况会如何呢?开云体育电动老虎机这就是发件箱模式的用武之地。

发件箱模式

这种方法的思想是在服务的数据库中有一个“发件箱”表。开云体育电动老虎机当收到下采购订单的请求时,不仅仅是一个插入PurchaseOrder表完成,但是,作为同一事务的一部分,还将表示要发送的事件的记录插入到发件箱表中。

记录描述了服务中发生的事件,例如,它可以是一个JSON结构,表示已经下了一个新的购买订单,包括订单本身的数据、订单行以及上下文信息(如用例标识符)。通过发件箱表中的记录显式地发出事件,可以确保事件的结构适合外部使用者。这也有助于确保事件使用者不会在例如更改内部域模型或PurchaseOrder表格

异步进程监视该表的新条目。如果有,它会将事件作为消息传播到Apache Kafka。这为我们提供了一个非常好的特性平衡:通过同步写入PurchaseOrder表中,源服务受益于“读自己写”语义。一旦提交了第一个事务,后续的采购订单查询将返回新持久化的订单。同时,我们通过Apache Kafka获得可靠的、异步的、最终一致的数据传播到其他服务。

现在,发件箱模式实际上并不是一个新想法。它已经使用了相当长的一段时间。事实上,即使在使用jms风格的消息代理(实际上可以参与分布式事务)时,避免任何耦合和远程资源(如消息代理)停机的潜在影响也是一个更好的选择。你也可以在克里斯·理查德森的优秀作品中找到对图案的描述microservices.io网站。

然而,该模式得到的关注远远少于它应得的关注,它在微服务上下文中特别有用。正如我们将看到的,可以使用变更数据捕获和Debezium以一种非常优雅和高效的方式实现发件箱模式。开云体育官方注册网址下面,让我们来探讨一下如何做到这一点。

基于变更数据捕获的实现

基于日志的变更数据捕获(CDC)非常适合在发件箱表中捕获新条目,并将它们传输到Apache Kafka。与任何基于轮询的方法相反,事件捕获在接近实时的情况下以非常低的开销发生。开云体育官方注册网址Debezium附带美国疾病控制与预防中心的连接器用于多个数据库,如MyS开云体育电动老虎机QL, Postgres和SQL Server。下面的示例将使用开云体育官方注册网址Debezium连接器Postgres

你可以找到完整的示例的源代码在GitHub上。请参阅README.md有关构建和运行示例代码的详细信息。这个例子围绕两个微服务展开,order服务而且shipment-service.两者都是用Java实现的,使用CDI作为组件模型和JPA/Hibernate来访问各自的数据库。开云体育电动老虎机订单服务继续运行WildFly并公开了一个简单的REST API,用于下订单和取消特定的订单行。它使用Postgres数据库作为本地数开云体育电动老虎机据存储。装运服务是基于Thorntail;通过Apache Kafka,它接收订单服务导出的事件,并在自己的MySQL数据库中创建相应的发货条目。开云体育电动老虎机开云体育官方注册网址Debezium跟踪订单服务Postgres数据库的事务日志(“预写日志”,WAL),以便捕获发件箱表中的任何新事件,并将它们传播到Apache Kafka。开云体育电动老虎机

解决方案的整体架构如下图所示:

发件箱模式概述">
          </div>
          <div class=

请注意,模式与这些特定的实现选择没有任何关系。它同样可以使用Spring Boot等替代技术来实现(例如利用Spring Data的支持域事件)、普通JDBC或其他编程语言,而不是Java。

现在,让我们仔细研究解决方案的一些相关组件。

发件箱表

发件箱表存在于订单服务的数据库中,其结构如下:开云体育电动老虎机

列| Type | Modifiers --------------+------------------------+----------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null Type | character varying(255) | not null payload | jsonb | not null

它的列如下:

  • id:每条消息的唯一id;消费者可以使用它来检测任何重复的事件,例如在失败后重新启动以读取消息时。在创建新事件时生成。

  • aggregatetype:节点的类型聚合根某一特定事件与之相关;其思想是,依赖于领域驱动设计的相同概念,导出的事件应该引用一个聚合(“可以作为单个单元处理的领域对象集群”),其中聚合根为访问聚合中的任何实体提供了唯一的入口点。例如,这可以是“购买订单”或“客户”。

    这个值将用于将事件路由到Kafka中相应的主题,因此所有与购买订单相关的事件都有一个主题,所有与客户相关的事件都有一个主题,等等。请注意,属于此类聚合中包含的子实体的事件也应该使用相同的类型。例如,一个事件表示取消一个单独的订单行(它是购买订单聚合的一部分),也应该使用它的聚合根类型“order”,确保这个事件也将进入“order”Kafka主题。

  • aggregateid:受给定事件影响的聚合根的id;例如,这可以是购买订单的id或客户id;与聚合类型类似,属于聚合中包含的子实体的事件应该使用包含聚合根的id,例如,订单行取消事件的购买订单id。这个id稍后将被用作Kafka消息的密钥。这样,所有与一个聚合根或它所包含的任何子实体相关的事件都将进入该Kafka主题的同一个分区,这确保了该主题的消费者将按照与它们产生的确切顺序消费与同一个聚合相关的所有事件。

  • 类型:事件类型,例如:“订单已创建”或“订单取消”。允许使用者触发合适的事件处理程序。

  • 有效载荷:包含实际事件内容的JSON结构,例如包含购买订单、购买者信息、包含的订单行、订单价格等。

向发件箱发送事件

为了将事件“发送”到发件箱,订单服务中的代码通常只需执行一个插入进入发件箱表。但是,使用稍微抽象一点的API是个好主意,如果需要,可以稍后更容易地调整发件箱的实现细节。CDI事件在这方面非常方便。它们可以在应用程序代码中引发,并将被处理同步由发件箱事件发送者发送,这将执行所需的操作插入进入发件箱表。

所有发件箱事件类型都应该实现以下契约,类似于前面所示的发件箱表结构:

公共接口ExportedEvent字符串getAggregateId ();字符串getAggregateType ();JsonNode getPayload ();字符串方法();}

为了产生这样的事件,应用程序代码使用注入的事件实例,例如这里的OrderService类:

@ApplicationScoped公共OrderService@PersistenceContext私人EntityManager EntityManager;@ inject私人事件< ExportedEvent >事件;@ transactional公共PurchaseOrder addOrder(PurchaseOrder订单){订单= entityManager.merge(订单);event.fire (OrderCreatedEvent.of(顺序));event.fire (InvoiceCreatedEvent.of(顺序));返回秩序;}@ transactional公共PurchaseOrder updateOrderLine (orderId、orderLineId, OrderLineStatus newStatus) {/ /……}}

addOrder ()方法时,JPA实体管理器用于在数据库和注入的数据库中持久化传入订单开云体育电动老虎机事件是用来防火的吗OrderCreatedEvent和一个InvoiceCreatedEvent.同样,请记住,尽管存在“事件”的概念,但这两件事发生在同一个事务中。也就是说,在这个事务中,将有三条记录插入到数据库中:一条在采购订单表中,两条在发件箱表中。开云体育电动老虎机

实际的事件实现非常简单;举个例子,这是OrderCreatedEvent类:

公共OrderCreatedEvent实现了ExportedEvent {私人静态ObjectMapper mapper =objectmap ();私人最后id;私人最后JsonNode秩序;私人OrderCreatedEvent (id, JsonNode order) {.id = id;.order =订单;}公共静态OrderCreatedEvent of(PurchaseOrder) {ObjectNode asJson = mapper.createObjectNode() .put(id, order.getId()) .put(customerId, order.getCustomerId()) .put(向数据库, .toString order.getOrderDate () ());ArrayNode items = asJson.putArray(lineitem);(OrderLine OrderLine: order.getLineItems()){项目。add(mapper.createObjectNode() .put(id, orderLine.getId()) .put(, orderLine.getItem()) .put(数量, orderLine.getQuantity()) .put(totalPrice, orderLine.getTotalPrice()) .put(状态, orderLine.getStatus().name()));}返回OrderCreatedEvent (order.getId (), asJson);}@Override公共字符串getAggregateId () {返回字符串.valueOf (id);}@Override公共字符串getAggregateType () {返回订单;}@Override公共字符串方法(){返回OrderCreated;}@Override公共JsonNode getPayload() {返回秩序;}}

请注意杰克逊的objectmap用于创建事件有效负载的JSON表示。

现在让我们看一下消耗任何触发的代码ExportedEvent并相应写入发件箱表:

@ApplicationScoped公共EventSender@PersistenceContext私人EntityManager EntityManager;公共无效onExportedEvent (@Observesextedevent事件){OutboxEvent OutboxEvent =OutboxEvent(event.getAggregateType(), event.getAggregateId(), event.getType(), event.getPayload());entityManager.persist (outboxEvent);entityManager.remove (outboxEvent);}}

这相当简单:对于每个事件,CDI运行时将调用onExportedEvent ()方法。的实例OutboxEvent实体持久化在数据库中-并立即删除!开云体育电动老虎机

一开始这可能会令人惊讶。但是记住基于日志的CDC是如何工作的是有意义的:它不检查数据库中表的实际内容,而是对仅追加的事务日志进行跟踪。开云体育电动老虎机调用persist ()而且remove ()将创建一个插入和一个删除事务提交后,记录在日志中的条目。之后,Debezium将开云体育官方注册网址处理这些事件:对于任何事件插入,带有事件有效负载的消息将被发送到Apache Kafka。删除另一方面,可以忽略事件,因为从发件箱表中删除事件仅仅是一种技术细节,不需要向消息代理进行任何传播。因此,我们能够通过CDC捕获添加到发件箱表中的事件,但是在查看表本身的内容时,它总是空的。这意味着表不需要额外的磁盘空间(除了在某个时刻会自动丢弃的日志文件元素),也不需要单独的维护过程来阻止它无限增长。

注册Debezium连接器开云体育官方注册网址

发件箱实现到位后,是时候注册Debezium Postgres连接器了,这样它就可以捕获发件箱表中的任何新事件,并将它们转发给Apac开云体育官方注册网址he Kafka。这可以通过post以下JSON请求到Kafka Connect的REST API来实现:

的名字outbox-connector配置:{connector.classio.开云体育官方注册网址debezium.connector.postgresql.PostgresConnectortasks.max1开云体育电动老虎机database.hostnameorder-db开云体育电动老虎机database.port5432开云体育电动老虎机database.userpostgresuser开云体育电动老虎机database.passwordpostgrespw开云体育电动老虎机database.dbnameorderdb开云体育电动老虎机database.server.namedbserver1schema.whitelist库存table.whitelistinventory.outboxeventtombstones.on.delete转换路由器transforms.router.typeio.开云体育官方注册网址debezium.examples.outbox.routingsmt.EventRouter}}

的实例io.开云体育官方注册网址debezium.connector.postgresql.PostgresConnector,从指定的Postgres实例中捕获更改。注意,通过表白名单,仅从outboxevent表被捕获。它还应用名为EventRouter

从Kafka主题删除事件

通过设置tombstones.on.delete,当事件记录从发件箱表中删除时,连接器将不会发出删除标记(“墓碑”)。这是有道理的,因为从发件箱表中删除不应该影响事件在相应的Kafka主题中的保留。相反,可以在Kafka中配置事件主题的特定保留时间,例如保留所有购买订单事件30天。

或者,也可以用压实的话题.这将需要对发件箱表中的事件设计进行一些更改:

  • 它们必须描述整个集合;例如,表示取消单个订单行的事件应该描述包含购买订单的完整当前状态;这样,在运行日志压缩后,消费者只看到与给定订单相关的最后一个事件时,也可以获得购买订单的整个状态。

  • 他们必须再来一个布尔属性,该属性指示特定事件是否表示删除事件的聚合根。这样的事件(如类型的OrderDeleted)可以被下一节中描述的事件路由SMT用于为该聚合根生成删除标记。然后,日志压缩将删除与给定采购订单相关的所有事件OrderDeleted事件已写入主题。

当然,当删除事件时,事件流将不能从一开始就重新播放。根据特定的业务需求,只保留给定的采购订单、客户等的最终状态可能就足够了。这可以通过使用紧凑的主题和主题的值来实现delete.retention.ms设置。另一种选择是将历史事件移动到某种冷存储中(例如Amazon S3桶),在需要时可以从那里检索它们,然后从Kafka主题中读取最新的事件。采用哪种方法取决于开发和操作解决方案的团队的具体需求、预期的数据量和专业知识。

主题的路由

默认情况下,Debezium连接开云体育官方注册网址器会将所有起源于一个给定表的更改事件发送到同一个主题,也就是说,我们最终会得到一个名为dbserver1.inventory.outboxevent它将包含所有事件,无论是订单事件还是客户事件等等。

为了简化只对特定事件类型感兴趣的消费者的实现,有多个主题更有意义,例如:OrderEventsCustomerEvents等等。例如,运输服务可能对任何客户事件都不感兴趣。只订阅OrderEvents主题时,它将确保永远不会接收任何客户事件。

为了将从发件箱表捕获的更改事件路由到不同的主题,该自定义SMTEventRouter使用。这里是它的代码应用()方法,Kafka Connect将为Debezium连接器发出的每条记录调用该方法:开云体育官方注册网址

@Override公共R应用(R记录){//忽略墓碑以防万一如果(record.value () = =) {返回记录;}结构体Struct = (结构体) record.value ();字符串op = struct.getString(人事处);//忽略发件箱表中的删除如果(op.equals (d)) {返回;}其他的如果(op.equals (c)) {timestamp = struct.getInt64(ts_ms);结构体after = struct.getStruct();字符串key = after.getString(aggregateid);字符串topic = after.getString(aggregatetype) +事件字符串eventId = after.getString(id);字符串eventType = after.getString(类型);字符串有效载荷= after.getString(有效载荷);模式valueSchema = SchemaBuilder.struct() .field(eventType, .field after.schema () (类型) . schema ()) .field (ts_ms, .field struct.schema () (ts_ms) . schema ()) .field (有效载荷, .field after.schema () (有效载荷) . schema ()) .build ();结构体值=结构体(valueSchema) .put (eventType, eventType) .put(ts_ms,时间戳).put(有效载荷、有效载荷);Headers Headers = record.headers();headers.addString (eventId, eventId);返回record.newRecord(主题,模式.STRING_SCHEMA, key, valueSchema, value, record.timestamp(), headers);}//不期望更新事件,因为发件箱表是“仅追加”,//例如,事件记录将永远不会更新其他的IllegalArgumentException意外操作类型记录:+记录);}}

当接收到删除事件时(人事处d),它将丢弃该事件,因为从发件箱表中删除事件记录与下游消费者无关。事情变得更有趣,当接收一个创建事件(人事处c).这样的记录将被传播到Apache Kafka。

开云体育官方注册网址Debezium的更改事件有一个复杂的结构,其中包含旧的(之前)和新()表示行的状态。要传播的事件结构从状态。的aggregatetype值用于构建要将事件发送到的主题的名称。例如,事件aggregatetype设置为订单会被送到哪里OrderEvents的话题。aggregateid用作消息键,确保该聚合的所有消息将进入该主题的相同分区。消息值是由原始事件有效负载(编码为JSON)、指示事件产生时间的时间戳和事件类型组成的结构。最后,事件UUID作为Kafka报头字段传播。这允许使用者有效地检测重复,而不必检查实际的消息内容。

Apache Kafka中的事件

现在让我们来看看OrderEvents而且CustomerEvents的话题。

如果你已经签出了示例源代码,并通过Docker Compose启动了所有组件(请参阅README.md你可以通过订单服务的REST API下订单,就像这样:

猫/数据/ create-order-request资源。json|http POST http://localhost:8080/order-service/rest/orders

类似地,特定的订单行可以取消:

猫/数据/ cancel-order-line-request资源。json|http PUT http://localhost:8080/order-service/rest/orders/1/lines/2

等工具使用时非常实用kafkacat实用程序,您现在应该在OrderEvents主题:

kafkcat -b kafka:9092 -C -o beginning -f 'Headers: %h\nKey: %k\nValue: %s\n' -q -t OrderEvents
标题:eventId = d03dfb18 - 8 af8 - 464 d - 890 - b - 09 - eb8b2dbbdd关键:“4”价值:{“eventType”:“OrderCreated”、“ts_ms”:1550307598558,“有效载荷”:“{\ \“id”:4,\“lineitem \”:[{\ \ \“id”:7日“项\”:\“Debez开云体育官方注册网址ium行动\”,\“\”,\“进入\”,\“数量\”:2,\“totalPrice \”:39.98},{\ \ \“id”:8日“项\”:\“Debezium假人\”,\“\”,\“进入\”,\“数量\”:1,\“totalPrice \”:29.99}],\“orderDate \”:\“2019 - 01 - 31 - t12:13:01 \”,\“customerId \”:123}”}头:eventId=49f89ea0-b344-421f-b66f-c635d212f72c键:" 4"值:{"eventType":"OrderLineUpdated","ts_ms":1550308226963,"payload":"{\"orderId\": 4, "newStatus\": "CANCELLED\", "oldStatus\": "ENTERED\", "orderLineId\": 7}"}

有效载荷字段和消息值是原始事件的字符串化JSON表示。Debe开云体育官方注册网址zium Postgres连接器发出JSONB列作为字符串(使用io.开云体育官方注册网址debezium.data.Json逻辑类型名),这就是引号被转义的原因。的金桥效用,更具体地说,它fromjson操作符,可以方便地以更易读的方式显示事件有效负载:

kafkacat -b kafka:9092 -C -o beginning -t Order | jq '。负载| fromjson'
id4lineitem: [{id7开云体育官方注册网址Debezium在行动状态进入数量2totalPrice39.98}, {id8开云体育官方注册网址Debezium for Dummies状态进入数量1totalPrice29.99}),向数据库2019 - 01 - 31 - t12:13:01customerId123} {orderId4newStatus取消了oldStatus进入orderLineId7

你也可以看看CustomerEvents主题来检查在添加采购订单时表示发票创建的事件。

消费服务中的重复检测

至此,我们对发件箱模式的实现功能齐全;当订单服务接收到下订单(或取消订单行)的请求时,它将在purchaseorder而且orderline其数据库的表。开云体育电动老虎机同时,在同一事务中,相应的事件条目将被添加到同一数据库中的发件箱表中。开云体育电动老虎机Debe开云体育官方注册网址zium Postgres连接器捕获对该表的任何插入,并将事件路由到与给定事件表示的聚合类型对应的Kafka主题中。

最后,让我们来研究另一个微服务(如装运服务)如何使用这些消息。进入该服务的入口点是一个普通的Kafka消费者实现,它并不太令人兴奋,因此为了简洁起见,这里省略了它。你可以找到它源代码在示例存储库中。上的每个传入消息订单主题时,使用者调用OrderEventHandler

@ApplicationScoped公共OrderEventHandler私人静态最后日志记录器LOGGER = LoggerFactory.getLogger(ordereventhhandler .class);@ inject私人MessageLog日志;@ inject私人ShipmentService ShipmentService;@ transactional公共无效onOrderEvent (UUIDeventId,字符串键,字符串事件){如果(log.alreadyProcessed(eventId)) {LOGGER.info(UUID{}事件已被检索,忽略它, eventId);返回;} JsonObject json = json . createreader (StringReader(事件)).readObject ();JsonObject有效载荷= json.containsKey(模式) ?json.getJsonObject (有效载荷): json;字符串eventType = payload.getString(eventType);ts = payload.getJsonNumber(ts_ms) .longValue ();字符串eventPayload = payload.getString(有效载荷);JsonReader payloadReader = json . createrreader (StringReader(eventPayload));JsonObject payloadObject = payloadReader.readObject();如果(eventType.equals (OrderCreated)) {shipmentService.orderCreated(payloadObject);}其他的如果(eventType.equals (OrderLineUpdated)) {shipmentService.orderLineUpdated(payloadObject);}其他的{LOGGER.warn (未知事件类型);} log.processed (eventId);}}

做的第一件事onOrderEvent ()是检查具有给定UUID的事件以前是否被处理过。如果是,对该事件的任何进一步调用都将被忽略。这是为了防止由该数据管道的“至少一次”语义引起的事件的任何重复处理。例如,在分别通过源数据库或消息传递代理确认特定事件的检索之前,可能会发生De开云体育官方注册网址bezium连接器或消费服务失败的情况。开云体育电动老虎机在这种情况下,在重新启动Debezium或消费服务之后,可能会第二开云体育官方注册网址次处理一些事件。将事件UUID传播为Kafka消息头,可以有效地检测和排除消费者中的重复信息。

类的业务方法将解析消息值ShippingService方法与事件有效负载一起调用与特定事件类型对应的方法。最后,消息日志将消息标记为已处理。

MessageLog简单地在服务的本地数据库的一个表中跟踪所有消耗的事件:开云体育电动老虎机

@ApplicationScoped公共MessageLog@PersistenceContext私人EntityManager EntityManager;@ transactional(值= TxType.MANDATORY)公共无效处理(UUIDeventId) {entityManager.persist(ConsumedMessage (eventId Instant.now ()));}@ transactional(值= TxType.MANDATORY)公共布尔alreadyProcessed (UUIDeventId) {返回entityManager.find(ConsumedMessage.class, eventId) !=;}}

这样,如果由于某种原因事务被回滚,原始消息也不会被标记为已处理,并且异常会出现在Kafka事件消费者循环中。这允许稍后重新尝试处理消息。

注意,在将任何不可处理的消息重新路由到死信队列或类似队列之前,更完整的实现应该只对给定消息进行一定次数的重新尝试。此外,在消息日志表上应该有一些内部管理;定期地,所有比消费者提交给代理的当前偏移量更老的事件都可能被删除,因为这样可以确保此类消息不会在下次传播给消费者。

总结

发件箱模式是在不同微服务之间传播数据的好方法。

通过只修改单个资源(源服务自己的数据库),它避免了同时修改不共享一个公共事务上下文(数据库和Apache Kafka)的多个资开云体育电动老虎机源的任何潜在不一致。通过先写入数据库,源服务具有即时的“开云体育电动老虎机读取自己的写入”语义,这对于一致的用户体验非常重要,允许在写入之后调用的查询方法立即反映任何数据更改。

同时,该模式支持将异步事件传播到其他微服务。Apache Kafka充当了服务间消息传递的高度可伸缩和可靠的骨干。如果有正确的主题保留设置,新的消费者可能会在最初生成事件很久之后出现,并根据事件历史建立自己的本地状态。

将Apache Kafka置于整个体系结构的中心也确保了相关服务的解耦。例如,如果解决方案的单个组件失败或在一段时间内不可用,例如在更新期间,事件将简单地在稍后处理:重新启动后,Debezium连接器将继续从它之前停止的位置跟踪发件箱表。开云体育官方注册网址类似地,任何使用者都将继续处理来自其先前偏移量的主题。通过跟踪已经成功处理的消息,可以检测到重复的消息并将其排除在重复处理之外。

当然,不同服务之间的事件管道最终是一致的,例如,运送服务等消费者可能会落后于订单服务等生产者。不过,通常这是可以的,并且可以根据应用程序的业务逻辑进行处理。例如,通常不需要在下订单的同一秒钟内创建发货。此外,整个解决方案的端到端延迟通常很低(秒级甚至次秒级范围),这要归功于基于日志的更改数据捕获,它允许以近乎实时的方式发射事件。

最后要记住的一点是,通过发件箱公开的事件的结构应该被视为发出服务的API的一部分。也就是说,在需要时,它们的结构应该仔细调整,并考虑到兼容性。这是为了确保在升级生产服务时不会意外地破坏任何消费者。与此同时,消费者在处理消息时应该宽容,例如,在遇到接收到的事件中的未知属性时不要失败。

非常感谢Hans-Peter Grahsl, Jiri Pechanec, Justin Holmes和René Kerner在写这篇文章时的反馈!

贡纳Morling

Gunnar是Decodable的软件工程师,也是一名不折不扣的开源爱好者。多年来,他一直是Debezium的项目负责人。开云体育官方注册网址Gunnar创建了kcctl、JfrUnit和MapStruct等开源项目,并且是Bean验证2.0 (JSR 380)的规范负责人。他在德国汉堡工作。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题