维护某种形式的审计日志是业务应用程序的常见需求,即对应用程序数据的所有更改进行持久跟踪。如果你仔细观察,带有Debezium数据更改事件的Kafka主题与此非常相似:它开云体育官方注册网址来自数据库事务日志,描述了应用程序记录的所有更改。开云体育电动老虎机但是缺少的是一些元数据:为什么、什么时候以及由谁更改了数据?在这篇文章中,我们将探索如何通过变更数据捕获(CDC)提供和公开元数据,以及如何使用流处理来丰富实际的数据变更事件。

维护数据审计跟踪的原因有很多:例如,监管要求可能要求企业保留其客户、采购订单、发票或其他数据的完整历史信息。此外,对于企业自身的目的来说,洞察某些数据为什么以及如何发生变化是非常有用的,例如,允许改进业务流程或分析错误。

创建审计跟踪的一种常用方法是应用程序端库。连接到选定的持久化库中,它们将维护数据表中的特定列(“createdBy”,“lastUpdated”等),和/或将早期记录版本复制到某种形式的历史表中。

但这也有一些缺点:

  • 作为OLTP事务的一部分在历史表中写入记录会增加事务中执行的语句的数量(对于每一次更新或删除,也必须将插入写入相应的历史表中),因此可能会导致应用程序的响应时间更长

  • 在大量更新和删除的情况下,通常不能提供审计事件。删除状态为“已发货”的订单),因为用于将库挂钩到持久性框架的侦听器并不知道所有受影响的记录

  • 无法跟踪直接在数据库中执行的更改,例如,在运行数据加载、在开云体育电动老虎机存储过程中进行批处理或在紧急数据补丁期间绕过应用程序时

另一种技术是数据库触发器。开云体育电动老虎机它们不会错过任何操作,无论是来自应用程序还是数据库本身。开云体育电动老虎机它们还能够处理受大容量语句影响的每条记录。缺点是,当作为OLTP事务的一部分执行触发器时,仍然存在延迟增加的问题。此外,必须有一个用于安装和更新每个表的触发器的过程。

基于变更数据捕获的审计日志

当利用事务日志作为审计跟踪的源,使用变更数据捕获检索变更信息并将其发送到消息代理或日志(如Apache Kafka)时,就不存在上述问题。

CDC流程以异步方式运行,可以在不影响OLTP事务的情况下提取更改数据。每当有数据更改时,无论是从应用程序发出还是直接在数据库中执行,事务日志都包含一个条目。开云体育电动老虎机对于在批量操作中更新或删除的每个记录都有一个日志条目,因此可以为每个记录生成一个更改事件。此外,这对数据模型也没有影响,即不需要创建特殊的列或历史表。

但疾控中心如何访问我们最初讨论过的元数据呢?例如,这可以是执行数据更改的应用程序用户、他们的IP地址和设备配置、跟踪范围id或应用程序用例的标识符等数据。

由于元数据通常不会(也不应该)存储在应用程序的实际业务表中,因此必须单独提供。一种方法是使用单独的表存储元数据。对于每个执行的事务,业务应用程序在该表中生成一条记录,其中包含所有必需的元数据,并使用事务id作为主键。当运行手动数据更改时,很容易为元数据记录提供一个额外的插入。由于D开云体育官方注册网址ebezium的数据更改事件包含导致特定更改的事务的id,因此数据更改事件和元数据记录可以相互关联。

在这篇文章的剩余部分,我们将进一步研究业务应用程序如何提供事务范围的元数据,以及如何使用Kafka Streams API使用相应的元数据来丰富数据更改事件。

解决方案概述

下图是基于管理蔬菜数据的微服务实例的整体解决方案设计:

使用变更数据捕获和流处理进行审计">
          </div>
          <div class=

涉及两项服务:

  • vegetables-service:一个简单的REST服务,用于插入和更新蔬菜数据到Postgres数据库中;开云体育电动老虎机作为其处理的一部分,它不仅将更新其实际的“业务表”蔬菜,但也将一些审计元数据插入到专用的元数据表中transaction_context_data;开云体育官方注册网址Debezium用于将两个表中的更改事件流到Apache Kafka中相应的主题中

  • log-enricher:一个使用Kafka Streams和Quarkus构建的流处理应用程序,它丰富了CDC主题包含蔬菜变化事件的消息(dbserver1.inventory.vegetable)内的元数据dbserver1.inventory.transaction_context_data主题,并将丰富的蔬菜变化事件写回卡夫卡dbserver1.inventory.vegetable.enriched的话题。

你可以找到完整的示例以及在GitHub上运行它们的所有组件和指令。

提供审计元数据

让我们首先讨论蔬菜服务等应用程序如何提供所需的审计元数据。例如,以下元数据应该用于审计目的:

  • 进行数据更改的应用程序用户,由JWT令牌的声明(JSON Web令牌

  • 请求时间戳,由日期HTTP报头

  • 用例标识符,通过调用的REST资源方法上的自定义Java注释提供

方法持久化一个新蔬菜的REST资源的基本实现jax - rs API:

@ path/蔬菜@RequestScoped与@(MediaType.APPLICATION_JSON)@Consumes(MediaType.APPLICATION_JSON)公共VegetableResource@ injectVegetableService VegetableService;@POST@RolesAllowed({农民})@ transactional@Audited(useCase =创建蔬菜公共响应creatvegegetable(蔬菜蔬菜){如果(vegetable.getId () ! =) {返回Response.status (Status.BAD_REQUEST.getStatusCode ()) .build ();} vegetable = vegetablesservice . creatvegetable(蔬菜);返回Response.ok(蔬菜).status (Status.CREATED) .build ();}//更新,删除…

如果您以前曾经用JAX-RS构建过REST服务,那么这个实现对您来说应该很熟悉:注释的资源方法@POST接收传入的请求负载并将其传递给通过CDI注入的服务bean。的@Audited不过,注释是特殊的。它是一个自定义注释类型,有两个目的:

  • 指定审计日志中应该引用的用例(“CREATE VEGETABLE”)

  • 绑定一个拦截器对于注释的方法的每次调用将触发哪个@Audited

该拦截器在方法注释时起作用@Audited调用并实现用于写入事务范围的审计元数据的逻辑。它是这样的:

@Interceptor(1)@Audited(useCase =@Priority(value = Interceptor.Priority.APPLICATION +One hundred.(2)公共TransactionInterceptor@ injectJsonWebToken jwt;(3)@ injectEntityManager EntityManager;@ injectHttpServletRequest请求;@AroundInvoke公共对象manageTransaction (InvocationContext ctx)抛出异常BigIntegertxtId = (BigInteger) entityManager(4).createNativeQuery (选择txid_current ()) .getSingleResult ();字符串useCase = ctx.getMethod(). gettannotation (Audited.class).useCase();TransactionContextData context =TransactionContextData ();(5)上下文。transactionId = txtId.longValueExact();上下文。userName = jwt.<字符串>要求() .orElse (匿名);上下文。clientDate = getRequestDate();上下文。useCase = useCase; entityManager.persist(context);返回ctx.proceed ();(6)私人zone datetime getRequestDate() {字符串requestDate = request.getHeader(HttpHeaders.DATE);返回requestDate ! =?ZonedDateTime。解析(requestDate, datetimeformater . rfc_1123_date_time):;}}
1 @Interceptor而且@Audited将此标记为绑定到自定义的拦截器@Auditedannotion。
2 @Priority注释控制应该在拦截器堆栈中的哪个点调用审计拦截器。任何应用程序提供的拦截器的优先级都应该大于优先级。一个PPLICATION(2000);方法可以确保事务在之前已经启动@ transactional注释及其附带的拦截器优先级。PLATFORM_BEFORE范围(< 1000)。
3. 调用者的JWT令牌通过微档案JWT RBACAPI

对于每个被审计的方法,拦截器都会触发并执行

  • 获取当前事务id(执行此操作的确切方法是特定于数据库的,在示例中的开云体育电动老虎机txid_current ()调用Postgres函数)

  • 坚持一个TransactionContextData实体通过JPA;它的主键值是之前选择的事务id,它具有用户名(从JWT令牌获得)、请求日期(从JWT令牌获得)的属性日期HTTP请求头)和用例标识符(从@Audited被调用方法的注释)

  • 继续被调用方法的调用流

当调用REST服务来创建和更新一些蔬菜时,应该在数据库中创建以下记录(请参阅所提供示例中的README,以了解如何构建示例代码和开云体育电动老虎机调用蔬菜服务使用合适的JWT令牌):

vegetablesdb >选择inventory.vegetable;+------+---------------+---------+| id |描述|名称| | .使用实例------+---------------+---------||1|辣!土豆| |11|美味!|南瓜| |10|好吃!番茄| +------+---------------+---------+
vegetablesdb >选择inventory.transaction_context_data;+------------------+---------------------+------------------+----------------+| transaction_id | client_date | usecase | user_name | |------------------+---------------------+------------------+----------------||608|2019-08年-22年08:12:31|创建蔬菜|农民鲍勃| |609|2019-08年-22年08:12:31|创建蔬菜|农民鲍勃| |610|2019-08年-22年08:12:31|更新蔬菜|农民玛格丽特| +------------------+---------------------+------------------+----------------+

用审计元数据丰富变更事件

在数据库中存储了业务数据(蔬菜)和事务范围的元数据之后,是时候设置开云体育电动老虎机开云体育官方注册网址Debezium Postgres连接器和流数据变化从蔬菜而且transaction_context_data表到相应的Kafka主题。的详细信息,请再次参考示例README文件部署连接器

dbserver1.inventory.vegetable主题应包含创建、更新和删除蔬菜记录的更改事件,而dbserver1.inventory.transaction_context_data主题应仅为每个插入的元数据记录包含创建消息。

主题保留

为了管理所涉及主题的增长,每个主题的保留策略应该定义良好。例如,对于具有丰富更改事件的实际审计日志主题,基于时间的保留策略可能是合适的,根据您的需求保留每个日志事件。另一方面,事务元数据主题的寿命可能相当短,因为一旦处理了所有相应的数据更改事件,就不再需要它的条目了。对端到端延迟进行一些监视可能是一个好主意,以确保日志丰富流应用程序跟上传入消息,不会落后太多,从而在处理相应的更改事件之前有事务消息被丢弃的风险。

现在,如果我们查看来自两个主题的消息,我们可以看到它们可以基于事务id进行关联。它是蔬菜变化事件的结构,是事务元数据事件的消息键:

蔬菜和事务元数据消息">
          </div>
          <div class=

一旦我们为给定的蔬菜变化事件找到了相应的事务事件,那么client_dateusecase而且user_name前者的属性可以添加到后者:

丰富蔬菜信息">
          </div>
          <div class=

这种消息转换是一个完美的用例卡夫卡流,一个Java API,用于在Kafka主题上实现流处理应用程序,提供操作符,让您可以过滤、转换、聚合和连接Kafka消息。

作为我们将要使用的流处理应用程序的运行时环境Quarkus,这是“为GraalVM和OpenJDK HotSpot量身定制的Kubernetes本机Java堆栈,由最好的Java库和标准精心制作”。

用Quarkus构建Kafka流应用程序

在许多其他的,夸克带有一个Kafka流扩展,它允许构建运行在JVM上的流处理应用程序,并将其作为预先编译的本机代码。它负责流拓扑的生命周期,因此您不必处理诸如注册JVM关闭钩子、等待所有输入主题的创建等细节。

该扩展还提供了“实时开发”支持,它会自动重新加载流处理应用程序,而您正在处理它,允许在开发期间非常快速的周转周期。

连接逻辑

在考虑充实逻辑的实际实现时,a停工检修Join可能是一个合适的解决方案。通过创建KStream对于这两个主题,我们可以尝试实现连接功能。一个挑战是如何定义一个合适的加入窗口,因为两个主题的消息之间没有时间保证,我们不能错过任何事件。

另一个问题出现在变更事件的排序保证方面。默认情况下,Debezi开云体育官方注册网址um将使用表的主键作为对应Kafka消息的消息键。这意味着相同蔬菜记录的所有消息将具有相同的键,因此将进入蔬菜Kafka主题的相同分区。这反过来又保证了这些事件的消费者可以看到与同一蔬菜记录相关的所有消息,其顺序与创建它们的顺序完全相同。

现在,为了连接两个流,两边的消息键必须相同。这意味着必须通过事务id重新键入蔬菜主题(我们不能重新键入事务元数据主题,因为元数据事件中没有包含有关蔬菜的信息;即使是这样,一笔交易可能会影响多个蔬菜记录)。这样做,我们就会失去原有的订购保证。一个蔬菜记录可能在随后的两个事务中被修改,其更改事件可能在重新输入键的主题的不同分区中结束,这可能导致消费者在第一个更改事件之前接收第二个更改事件。

如果一个KStream-KStream加入不可行,还能做什么?一个连接之间的KStream而且GlobalKTable看起来也很有希望。它没有co-partitioning需求的所有分区GlobalKTable存在于分布式Kafka Streams应用程序的所有节点上。这似乎是一种可以接受的折衷,因为来自事务元数据主题的消息可以相当快地丢弃,相应表的大小应该在合理的范围内。所以我们可以得到aKStream来源蔬菜话题和一个GlobalKTable基于事务元数据主题。

但不幸的是,存在一个时间问题:由于消息来自多个主题,在处理来自蔬菜流的元素时,可能发生相应的事务元数据消息还不可用的情况。因此,根据我们使用的是内部连接还是左连接,在这种情况下,我们要么跳过更改事件,要么在没有使用事务元数据丰富它们的情况下传播它们。这两种结果都是不可取的。

自定义连接缓冲

结合KStream而且GlobalKTable仍然暗示着正确的方向。只是,我们必须实现一个自定义的连接逻辑,而不是依赖于内置的连接操作符。其基本思想是缓冲到达蔬菜的信息KStream的对应事务元数据消息可用之前GlobalKTableS状态存储。这可以通过创建自定义来实现变压器实现所需的缓冲逻辑并应用于蔬菜KStream

让我们从流拓扑本身开始。多亏了Quarkus Kafka Streams扩展,一个CDI生成器方法返回拓扑结构对象所需要的全部:

@ApplicationScoped公共TopologyProducer静态最后字符串STREAM_BUFFER_NAME =stream-buffer-state-store静态最后字符串STORE_NAME =transaction-meta-data@ConfigProperty(name =audit.context.data.topic字符串txContextDataTopic;@ConfigProperty(name =audit.vegetables.topic字符串vegetablesTopic;@ConfigProperty(name =audit.vegetables.enriched.topic字符串vegetablesEnrichedTopic;与@公共拓扑buildTopology() {StreamsBuilder构建器=StreamsBuilder ();StoreBuilder < KeyValueStore <, JsonObject>> streamBufferStateStore = Stores .keyValueStoreBuilder(Stores. persistentkeyvaluestore (STREAM_BUFFER_NAME),Serdes.LongSerde (),JsonObjectSerde()) .withCachingDisabled();builder.addStateStore (streamBufferStateStore);(1)构建器。globalTable (txContextDataTopic Materialized.as (STORE_NAME));(2)构建器。< JsonObject, JsonObject >流(vegetablesTopic)(3).filter((id, changeEvent) -> changeEvent !=) .filter((id, changeEvent) -> !人事处) .equals (r)) .transform(() ->ChangeEventEnricher(), STREAM_BUFFER_NAME) .to(vegetablesEnrichedTopic);返回builder.build ();}}
1 状态存储,它将作为尚不能处理的更改事件的缓冲区
2 GlobalKTable基于事务元数据主题
3. KStream基于蔬菜话题;在这个流中,任何传入的墓碑标记都会被过滤,原因是审计跟踪主题的保留策略通常应该基于时间,而不是基于日志压缩;

类似地,快照事件也会被过滤,假设它们与审计跟踪无关,并且应用程序不会为Debezium连接器发起的快照事务提供任何相应的元数据开云体育官方注册网址

任何其他消息都通过自定义丰富了相应的事务元数据变压器(见下文),最后写入输出主题

类注入主题名称微配置接口,这些值是在Quarkus中提供的application.properties配置文件。除了主题名,这个文件还有Kafka引导服务器的信息,默认serdes:

audit.context.data.topic = dbserver1.inventory。transaction_context_dataaudit.vegetables.topic=dbserver1.inventory.vegetable audit.vegetables.enriched.topic=dbserver1.inventory.vegetable.enriched # may be overridden with env vars quarkus.kafka-streams.bootstrap-servers=localhost:9092 quarkus.kafka-streams.application-id=auditlog-enricher quarkus.kafka-streams.topics=${audit.context.data.topic},${audit.vegetables.topic} # pass-through kafka-streams.cache.max.bytes.buffering=10240 kafka-streams.commit.interval.ms=1000 kafka-streams.metadata.max.age.ms=500 kafka-streams.auto.offset.reset=earliest kafka-streams.metrics.recording.level=DEBUG kafka-streams.default.key.serde=io.debezium.demos.auditing.enricher.JsonObjectSerde kafka-streams.default.value.serde=io.debezium.demos.auditing.enricher.JsonObjectSerde kafka-streams.processing.guarantee=exactly_once

在下一步中,让我们来看看ChangeEventEnricher类,我们的自定义转换器。该实现基于更改事件序列化为JSON的假设,但当然,使用其他格式(如Avro或Protocol buffer)也可以做到同样好。

这是一段代码,但希望它分解成多个更小的方法,使其易于理解:

ChangeEventEnricher实现了变压器> {私人静态最后Buffer_offsets_key = -1 l私人静态最后日志记录器LOG = LoggerFactory.getLogger(changeeventrichher .class);私人ProcessorContext上下文;私人KeyValueStore txMetaDataStore;私人KeyValueStore <, JsonObject> streamBuffer;(5)@Override@SuppressWarnings无节制的公共无效init(ProcessorContext context) {.context =上下文;streamBuffer = (KeyValueStore<, JsonObject>) context。getStateStore (TopologyProducer。STREAM_BUFFER_NAME);txMetaDataStore = (KeyValueStore)getStateStore (TopologyProducer。STORE_NAME);context.schedule (持续时间.ofSeconds (1), PunctuationType。WALL_CLOCK_TIME, ts -> enrichAndEmitBufferedEvents());(4)@Override公共KeyValue transform(JsonObject key, JsonObject value) {布尔enrichedAllBufferedEvents = enrichAndEmitBufferedEvents();(3)如果(!enrichedAllBufferedEvents) {bufferChangeEvent(键,值);返回;} KeyValue enrichment =浓缩铀txmetadata(键,值);(1)如果(丰富= =) {(2)bufferChangeEvent(关键字,值);}返回丰富;}/** *使用来自相关*事务的元数据丰富缓冲的更改事件并转发它们。* * @return {@code true},如果所有缓冲事件都被充实和转发,* {@code false}否则。* /私人布尔enrichAndEmitBufferedEvents () {(3)可选< bufferoffset > seq = bufferoffset ();如果(! seq.isPresent ()) {返回真正的;} bufferoffset序列= seq.get();布尔enrichedAllBuffered =真正的i = sequence.getFirstValue();i < sequence.getNextValue();i++) {JsonObject buffered = streamBuffer.get(i);LOG.info (处理键{}的缓冲更改事件buffered.getJsonObject (关键));KeyValue enrichment =浓缩铀。getjsonobject (关键), buffered.getJsonObject (changeEvent));如果(丰富= =) {enrichedAllBuffered =打破;} context.forward(丰富。键,enriched.value);streamBuffer.delete(我);sequence.incrementFirstValue ();}如果(sequence.isModified()) {streamBuffer。把(BUFFER_OFFSETS_KEY sequence.toJson ());}返回enrichedAllBuffered;}/** *将给定的更改事件添加到流侧缓冲区。* /私人无效bufferChangeEvent(JsonObject key, JsonObject changeEvent) {(2)LOG.info (缓冲键{}的更改事件、关键);bufferoffset sequence = bufferoffset ().orElseGet(bufferoffset::initial);JsonObject包装器= Json.createObjectBuilder() .add(关键, key) .add(changeEvent, changeEvent) .build();streamBuffer.putAll (数组. aslist (KeyValue.pair(sequence.getNextValueAndIncrement(), wrapper), KeyValue.pair。(BUFFER_OFFSETS_KEY, sequence.toJson())));}/** *使用相关*事务的元数据丰富给定的更改事件。* * @return丰富的更改事件或{@code null},如果没有找到*关联事务的元数据。* /私人KeyValue enrichment withtxmetadata (JsonObject key, JsonObject changeEvent) {(1)JsonObject txId = Json.createObjectBuilder() .add(transaction_idchangeEvent.get () .asJsonObject () .getJsonNumber (txId) .longValue ()) .build ();JsonObject metaData = txmetadatstore .get(txId);如果(元数据! =) {LOG.info(键{}的丰富更改事件、关键);元数据= Json.createObjectBuilder() .asJsonObject ()) .remove (transaction_id) .build ();返回KeyValue。(key, Json.createObjectBuilder(changeEvent) .add(审计, metaData) .build());} LOG.warn (没有为事务{}找到元数据, txId);返回;}私人可选< bufferoffset > bufferoffset () {JsonObject bufferoffset = streamBuffer.get(BUFFER_OFFSETS_KEY);如果(bufferOffsets = =) {返回Optional.empty ();}其他的返回Optional.of (BufferOffsets.fromJson (bufferOffsets));}}@Override公共无效Close () {}}
1 当蔬菜变化事件到来时,在事务主题的状态存储中查找相应的元数据GlobalKTable的事务id将更改事件的块作为键;如果可以找到元数据,则将元数据添加到更改事件(在审计字段)并返回该充实的事件
2 如果无法找到元数据,则将传入事件添加到变更事件缓冲区中并返回
3. 在真正到达传入事件之前,处理所有缓冲的事件;这是必需的,以确保原始的变更事件被保留;只有当所有内容都被充实时,传入的事件才会被处理
4 为了在没有新的更改事件时也发出缓冲事件,将安排一个标点符号,定期处理缓冲区
5 对应元数据尚未到达的蔬菜事件缓冲区

关键部分是不可处理的更改事件的缓冲区。为了维护事件的顺序,必须按照插入的顺序处理缓冲区,首先从插入的事件开始(想象一个FIFO队列)。当从a中获取所有条目时,没有保证的遍历顺序KeyValueStore,这是通过使用严格递增序列的值作为键来实现的。一个特殊的条目在键值存储中用于存储关于缓冲区中当前“最古老”索引和下一个序列值的信息。

人们也可以考虑这种缓冲区的替代实现,例如基于Kafka主题或自定义KeyValueStore实现,确保迭代顺序从最老的条目到最新的条目。最终,如果Kafka流带有内置的重试无法连接的流元素的方法,它也可能是有用的;这将避免任何自定义缓冲实现。

如果出了问题

对于一个可靠和一致的处理逻辑,考虑失败情况下的行为是至关重要的,例如,如果流应用程序在向缓冲区添加元素后崩溃,但在更新序列值之前。

关键是exactly_once的值processing.guarantee给出的属性application.properties.这确保了事务上一致的处理;例如,在前面提到的场景中,在重新启动后,原始的更改事件将再次被处理,并且缓冲区状态将与事件第一次被处理之前完全相同。

丰富蔬菜事件的消费者应应用隔离级别read_committed;否则,它们可能会看到未提交的消息,从而出现重复的消息,以防在转发缓冲事件后但从缓冲区中删除该事件之前发生应用程序崩溃。

有了自定义转换器逻辑,我们就可以构建Quarkus项目并运行流处理应用程序。中应该会看到这样的消息dbserver1.inventory.vegetable.enriched主题:

id:10} {之前: {id:10描述:好吃!的名字:番茄},: {id:10描述:好吃!的名字:番茄},: {版本:0.10.0-SNAPSHOT连接器:postgresql的名字:dbserver1ts_ms:1569700445392快照:db:vegetablesdb模式:库存表格:蔬菜txId:610lsn:34204240xmin:},人事处:uts_ms:1569700445537审计: {client_date:1566461551000000usecase:更新蔬菜user_name:farmermargaret}}

当然,缓冲区处理逻辑可以根据您的具体要求进行调整;例如,与其无限期地等待相应的事务元数据,我们还可以决定在等待一段时间后传播未充实的更改事件,或者引发一个异常,指示缺少元数据,这样更有意义。

为了查看缓冲是否如预期的那样工作,可以做一个小实验:直接在数据库中使用SQL修改蔬菜记录。开云体育电动老虎机开云体育官方注册网址Debezium将捕获该事件,但由于没有提供相应的事务元数据,因此该事件不会转发到丰富的蔬菜主题。如果您使用REST API添加另一个蔬菜,这个蔬菜也不会被传播:尽管它有元数据记录,但它被另一个更改事件阻塞了。将第一个更改事务的元数据记录插入到transaction_context_data表中,两个更改事件都将被处理并发送到输出主题。

总结

在这篇博客文章中,我们讨论了如何将变更数据捕获与流处理结合起来,以一种高效、低开销的方式构建审计日志。与基于库和触发器的方法相比,形成审计跟踪的事件是通过CDC从数据库的事务日志中检索的,除了每个事务插入一个元数据记录(任何类型的审计日志都需要类似形式的元数据记录)之外,不会引起OLTP事务的开销。开云体育电动老虎机当数据记录被批量更新或删除时,还可以获得审计日志条目,这在基于库的审计解决方案中通常是不可能的。

通常应该作为审计日志一部分的其他元数据可以由应用程序通过一个单独的表提供,这个表也是通过Debezium捕获的。开云体育官方注册网址在Kafka Streams的帮助下,实际的数据更改事件可以用元数据表中的数据来丰富。

我们还没有讨论的一个方面是查询审计跟踪条目,例如检查特定的早期版本的数据。为此,丰富的变更数据事件通常存储在可查询的数据库中。开云体育电动老虎机与基本数据复制管道不同的是,在这种情况下,不仅每条记录的最新版本将存储在数据库中,而且所有版本,即主键通常将使用每个更改的事务id进行修改。开云体育电动老虎机这将允许选择单个数据记录,甚至多个表的连接,以根据给定的事务id获得有效的数据。具体如何实现,我们将在以后的文章中讨论。

非常欢迎您对这种构建审计日志的方法的反馈,请在下面发表评论。要开始使用自己的实现,可以查看的代码在GitHub开云体育官方注册网址上的Debezium示例库中。

非常感谢克里斯克兰福德汉斯GrahslAshhar哈桑安娜·麦克唐纳以及Jiri Pechanec,感谢他们在写这篇文章和附带的示例代码时的反馈!

贡纳Morling

Gunnar是Decodable的软件工程师,也是一名不折不扣的开源爱好者。多年来,他一直是Debezium的项目负责人。开云体育官方注册网址Gunnar创建了kcctl、JfrUnit和MapStruct等开源项目,并且是Bean验证2.0 (JSR 380)的规范负责人。他在德国汉堡工作。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题