第二级缓存Hibernate ORM / JPA是一种有效的提高应用程序性能的方法:缓存只读或很少修改的实体避免了到数据库的往返,从而提高了应用程序的响应时间。开云体育电动老虎机

与第一级缓存不同,第二级缓存与会话工厂(或者JPA术语中的实体管理器工厂)相关联,因此其内容在事务和并发会话之间共享。当然,如果缓存的实体被修改,相应的缓存条目也必须更新(或从缓存中清除)。只要数据更改是通过Hibernate ORM完成的,就没有什么可担心的:ORM将自动更新缓存。

但是,当绕过应用程序时,比如直接修改数据库中的记录时,事情就变得棘手了。开云体育电动老虎机Hibernate ORM无法知道缓存的数据已经过时,有必要显式地使受影响的项失效。一种常见的方法是使用一些管理功能来清除应用程序的缓存。要使其工作,重要的是不要忘记调用失效功能,否则应用程序将继续使用过时的缓存数据。

在接下来的文章中,我们将探索缓存失效的另一种方法,它以一种可靠的、完全自动化的方式工作:通过使用Debezium及其开云体育官方注册网址变更数据捕获(CDC)功能,您可以跟踪数据库本身的数据更改,并对任何应用的更改做出反应。开云体育电动老虎机这允许以近乎实时的方式使受影响的缓存项失效,而不会有由于遗漏更改而导致数据陈旧的风险。如果一个条目已经从缓存中移除,Hibernate ORM将在下一次请求时从数据库加载该实体的最新版本。开云体育电动老虎机

应用实例

作为一个例子,考虑这个简单的两个实体模型,PurchaseOrder而且

域模型示例"></div>
          <div class=

采购订单表示一个商品的订单,其总价是订购数量乘以商品的底价。

源代码

源代码GitHub上提供了这个例子。如果您想按照下面的步骤进行操作并尝试下面描述的所有步骤,请克隆该repo并按照中的说明操作README.md用于建设项目。

将订单和项目建模为JPA实体非常简单:

@ entity公共PurchaseOrder@ id@GeneratedValue(发电机=序列generator(name =序列, sequenceName =seq_po, initialValue =1001, allocationSize =50私人id;私人字符串客户;@ManyToOne私人项项目;私人int量;私人BigDecimaltotalPrice;/ /……

由于很少对项目进行更改,因此实体应该被缓存。这可以通过简单地指定JPA来完成@Cacheable注释:

@ entity@Cacheable公共@ id私人id;私人字符串描述;私人BigDecimal价格;/ /……

中启用二级缓存meta - inf / persistence . xml文件。房地产hibernate.cache.use_second_level_cache激活缓存本身,然后ENABLE_SELECTIVE缓存模式导致只将那些带有注释的实体放入缓存中@Cacheable.启用SQL查询日志记录和缓存访问统计信息也是一个好主意。这样你就可以通过检查应用程序日志来验证事情是否如预期的那样工作:

<?XML版本="1.0"编码="utf-8"?><持久性xmlnshttp://xmlns.jcp.org/xml/ns/persistencexmlns: xsihttp://www.w3.org/2001/XMLSchema-instancexsi: schemaLocation...版本2.2><持久化单元的名字orders-PU-JTA交易类型JTA>< jta-data-source >java: jboss /数据源/ OrderDS< / jta-data-source >< shared-cache-mode >ENABLE_SELECTIVE< / shared-cache-mode >< >属性<属性的名字hibernate.cache.use_second_level_cache价值真正的/><属性的名字hibernate.show_sql价值真正的/><属性的名字hibernate.format_sql价值真正的/><属性的名字hibernate.generate_statistics价值真正的/><!——方言等. ...-->< / >属性< /持久化单元>< /持久性>

当运行在Java EE应用服务器(或雅加达EE在将堆栈捐赠给Eclipse Foundation后如何调用它),这就是启用二级缓存所需要的全部内容。在这种情况下WildFly(这是示例项目中使用的),则Infinispan默认使用键/值存储作为缓存提供程序。

现在,通过在数据库中运行一些SQL(绕过应用程序层)来修改商品价格,看看会发生什么情况。开云体育电动老虎机如果您已经检查了示例源代码,请注释掉开云体育电动老虎机DatabaseChangeEventListener类中的描述启动应用程序README.md.然后你可以像这样使用curl下购买订单(在应用程序启动时已经持久化了几个示例项目):

> curl -H "Content-Type: application/json" \ -X POST \——data '{"customer": "Billy-Bob", "itemId": 10003, "quantity": 2}' \ http://localhost:8080/cache-invalidation/rest/orders
{"id": 1002, "customer": "Billy-Bob", "item": {"id":10003, "description": "North By Northwest", "price": 14.99}, "quantity": 2, "totalPrice": 29.98}

反应是意料之中的,因为物品价格是14.99。现在直接在数据库中更新商品的价格。开云体育电动老虎机该示例使用Postgres,因此您可以使用psqlCLI实用程序这样做:

$POSTGRES_USER $POSTGRES_DB -c "UPDATE item SET price = 20.99 where id = 10003"

使用curl下另一个购买同一商品的订单,您将看到计算的总价没有反映更新。不是很好!但考虑到价格更新完全绕过了应用层和Hibernate ORM,这并不太令人惊讶。

变更事件处理程序

现在,让我们探索如何使用Debezium和CDC来响应开云体育官方注册网址表并使相应的缓存项无效。

而Debez开云体育官方注册网址ium大多数时候被部署到卡夫卡连接(因此将更改事件流到Apache Kafka主题中),它有另一种操作模式,对于手头的用例非常方便。使用嵌入式引擎,您可以直接在应用程序中作为库运行D开云体育官方注册网址ebezium连接器。对于从数据库接收到的每个更改事件,将调用已配置的回调方法,在当前情况下,该方法将开云体育电动老虎机从二级缓存中删除受影响的项。

该方法的设计如下图所示:

体系结构概述"></div>
          <div class=

虽然这没有Apache Kafka提供的可伸缩性和容错性,但它很好地满足了给定的需求。由于二级缓存与应用生命周期绑定,因此不需要Kafka Connect框架提供的偏移量管理和重新启动功能。对于给定的用例,在应用程序运行时接收数据更改事件就足够了,而使用嵌入式引擎恰好可以实现这一点。

集群的应用程序

请注意,在运行每个节点都有本地缓存的集群应用程序时,使用Apache Kafka和将Debezium定期部署到Kafka Connect中仍然是有意义的。开云体育官方注册网址而不是在每个节点上注册一个连接器,Kafka和Connect将允许您部署一个连接器实例,并让应用程序节点侦听带有更改事件的主题。这将减少数据库中的资源利用率。开云体育电动老虎机

添加了Debezium嵌入式引擎的依赖项(开云体育官方注册网址io.开云体育官方注册网址debezium: debezium-embedded: 0.9.0.Beta1)和Debezium开云体育官方注册网址 Postgres连接器(io.开云体育官方注册网址debezium: debezium-connector-postgres: 0.9.0.Beta1)到你的项目,一个类开云体育电动老虎机DatabaseChangeEventListener监听数据库中的任何更改可以这样实现:开云体育电动老虎机

@ApplicationScoped公共开云体育电动老虎机DatabaseChangeEventListener@私人ManagedExecutorService executorService;@PersistenceUnit私人被电动势;@PersistenceContext私人EntityManager em;私人EmbeddedEngine引擎;公共无效startEmbeddedEngine (@Observes@Initialized(ApplicationScoped.class)对象init) {配置配置=配置.empty() . withsystemproperties (Function.identity()).edit() .with(EmbeddedEngine.;)CONNECTOR_CLASS, PostgresConnector.class) .with(EmbeddedEngine.ENGINE_NAME,cache-invalidation-engine) (EmbeddedEngine。OFFSET_STORAGE, MemoryOffsetBackingStore.class .with(的名字cache-invalidation-connector) (开云体育电动老虎机database.hostnamepostgres) (开云体育电动老虎机database.port5432) (开云体育电动老虎机database.userpostgresuser) (开云体育电动老虎机database.passwordpostgrespw) (开云体育电动老虎机database.server.namedbserver1) (开云体育电动老虎机database.dbname库存) (开云体育电动老虎机database.whitelist公共) (snapshot.mode从来没有) .build ();.engine = EmbeddedEngine.create() .using(config) .notifying(:: handleDbChangeEvent) .build ();executorService.execute(引擎);}@PreDestroy公共无效shutdownEngine() {engine.stop();}私人无效handleDbChangeEvent(源记录){如果(record.topic () .equals (dbserver1.public.item)) {itemId = (结构体) record.key ()) .getInt64 (id);结构体有效载荷= (结构体) record.value ();操作op =操作.forCode (payload.getString (人事处));如果(op = =操作.UPDATE || op ==操作.DELETE) {emf.getCache().evict(Item.class, itemId);} } } }

的实例进行配置开云体育官方注册网址Debezium Postgres连接器并设置用于运行连接器的嵌入式引擎。的连接器选项(主机名,凭据等)与将连接器部署到Kafka Connect时基本相同。不需要对现有数据进行初始快照,因此快照模式设置为“never”。

偏移量存储选项用于控制如何持久化连接器偏移量。由于在连接器未运行时不需要处理发生的任何更改事件(相反,您只需在重新启动后开始从当前位置读取日志),因此使用Kafka Connect提供的内存中实现。

配置完成后,嵌入式引擎必须通过遗嘱执行人实例。当示例在WildFly中运行时,托管执行程序可以简单地通过@为此目的注入(参见JSR 236).

嵌入式引擎被配置为调用handleDbChangeEvent ()方法用于每个接收到的数据更改事件。在此方法中,首先检查传入事件是否起源于表格如果是这种情况,并且更改事件表示更新删除声明,受影响的人实例从二级缓存中被清除。JPA 2.0提供了简单的API为此目的,可通过

开云体育电动老虎机DatabaseChangeEventListener类的位置,缓存条目现在将在通过进行另一个项更新时自动清除psql.当您在更新后为该商品下第一个购买订单时,您将在应用程序日志中看到Hibernate ORM如何执行查询选择……从项目…以便加载订单引用的项目。此外,缓存统计数据将报告一个“L2C miss”。在同一项的后续订单中,它将再次从缓存中获得。

最终一致性

虽然事件处理几乎是实时发生的,但重要的是指出它仍然应用最终一致性语义。这意味着在事务提交的时间点和更改事件从日志流到事件处理程序以及缓存条目失效的时间点之间有一个非常短的时间窗口。

避免在应用程序触发的数据更改后缓存失效

上面显示的更改事件侦听器满足了在外部数据更改后使缓存项失效的需求。但是在其当前的形式中,它清除缓存项有点过于激进:缓存项也将在更新实例通过应用程序本身。这不仅不需要(因为缓存的项已经是当前版本),而且甚至会产生相反的效果:多余的缓存清除将导致额外的数据库往返,从而导致更长的响应时间。开云体育电动老虎机

因此,有必要区分应用程序本身执行的数据更改和外部数据更改。只有在后一种情况下,受影响的项才应该从缓存中删除。为了做到这一点,可以利用每个Debezium数据更改事件都包含原始事务的id这一事实。开云体育官方注册网址跟踪应用程序本身运行的所有事务允许仅对那些被外部事务更改的项触发缓存清除。

考虑到这个变化,整体架构看起来是这样的:

事务注册中心的体系结构概述"></div>
          <div class=

首先要实现的是事务注册表,即一个用于事务簿记的类:

@ApplicationScoped公共KnownTransactions私人最后DefaultCacheManager缓存管理器;私人最后缓存<布尔> applicationTransactions;公共KnownTransactions() {cacheManager =DefaultCacheManager ();cacheManager.defineConfiguration (tx-id-cacheConfigurationBuilder() .expiration() .lifespan(60TimeUnit.SECONDS) .build());applicationTransactions = cacheManager.getCache(tx-id-cache);}@PreDestroy公共无效stopCacheManager() {cacheManager.stop();}公共无效注册({applicationTransactions.put(txId,真正的);}公共布尔称为(txId) {返回布尔.TRUE.equals (applicationTransactions.get (txId));}}

这就使用了InfinispanDefaultCacheManager用于创建和维护应用程序遇到的事务id的内存缓存。由于数据更改事件几乎是实时到达的,因此缓存条目的TTL可能相当短(事实上,示例中显示的一分钟的值是非常保守的选择,通常事件应该在几秒钟内收到)。

下一步是在应用程序处理请求时检索当前事务id,并在其中注册它KnownTransactions.这应该在每个事务中发生一次。实现这个逻辑有多种方法;在下面的Hibernate ORM中FlushEventListener用于此目的:

TransactionRegistrationListener实现了FlushEventListener {私人挥发性KnownTransactions KnownTransactions;公共TransactionRegistrationListener() {}@Override公共无效onFlush (FlushEvent事件)抛出HibernateException {event.getSession(). getactionqueue()。registerProcess(session -> {数量txId = (数量) event.getSession () .createNativeQuery (选择txid_current ()) .setFlushMode(FlushMode.MANUAL) .getSingleResult();getKnownTransactions () .register (txId.longValue ());});}私人KnownTransactions getKnownTransactions() {KnownTransactions值= KnownTransactions;如果(值= =) {knownTransactions = value = CDI.current().select(knownTransactions .class).get();}返回价值;}}

由于没有获取事务id的可移植方法,因此使用本地SQL查询来完成。在Postgres的案例中txid_current ()函数可以被调用。Hibernate ORM事件监听器不受CDI依赖注入的影响。因此是静态的当前的()方法用于获取应用程序的CDI容器的句柄,并获取对KnownTransactionsbean。

每当Hibernate ORM将其持久性上下文与数据库同步时(“刷新”),这个侦听器都会被调用,这通常在事务提交时只发生一次。开云体育电动老虎机

手动冲水

会话/实体管理器也可以手动刷新,在这种情况下txid_current ()函数将被多次调用。为了简单起见,这里忽略了它。示例repo中的实际代码包含这个类的稍微扩展版本,它确保只获得一次事务id。

要向Hibernate ORM注册刷新侦听器,可以使用积分器方法中创建和声明的meta - inf /服务/ org.hibernate.integrator.spi.Integrator文件:

公共TransactionRegistrationIntegrator实现了积分器{@Override公共无效集成(元数据元数据,SessionFactoryImplementor sessionFactory, SessionFactoryServiceRegistry serviceRegistry) {serviceRegistry. getservice (EventListenerRegistry.class) .appendListeners(EventType.FLUSH,TransactionRegistrationListener ());}@Override公共无效disintegrate(SessionFactoryImplementor sessionFactory, SessionFactoryServiceRegistry serviceRegistry) {}}
io.开云体育官方注册网址debezium.examples.cacheinvalidation.persistence.TransactionRegistrationIntegrator

在引导过程中,Hibernate ORM将检测积分器类(通过Java服务加载器),调用集成()方法的侦听器类注册冲洗事件。

最后一步是在数据库更改事件处理程序中排除来自应用程序自身运行的事务的任何事件:开云体育电动老虎机

@ApplicationScoped公共开云体育电动老虎机DatabaseChangeEventListener/ /……@ inject私人KnownTransactions KnownTransactions;私人无效handleDbChangeEvent(源记录){如果(record.topic () .equals (dbserver1.public.item)) {itemId = (结构体) record.key ()) .getInt64 (id);结构体有效载荷= (结构体) record.value ();操作op =操作.forCode (payload.getString (人事处));txId = (结构体) payload.get () .getInt64 (txId);如果(!knownTransactions.isKnown(txId) && (op ==操作.UPDATE || op ==操作.DELETE)) {emf.getCache().evict(Item.class, itemId);} } } }

这样,你就得到了所有的片段:缓存S只会在外部数据更改后被清除,而不会在应用程序本身进行更改后被清除。要确认,可以调用示例中的项目使用curl的资源:

> curl -H "Content-Type: application/json" \ -X PUT \——data '{"description": "西北偏北","price": 20.99}' \ http://localhost:8080/cache-invalidation/rest/items/10003

在此更新之后对该项目的下一个订单时,您应该看到实体从缓存中获得,也就是说,更改事件不会导致该项的缓存条目被清除。相反,如果你更新项目的价格通过psql另一次,应该从缓存中删除条目,订单请求将产生缓存遗漏,然后是选择表。开云体育电动老虎机

总结

在这篇博客文章中,我们探索了如何使用Debezium和变更数据捕获在外部数开云体育官方注册网址据更改后使应用程序级缓存失效。与手动缓存失效相比,这种方法工作起来非常可靠(通过直接从数据库日志中捕获更改,不会遗漏任何事件)和快速(在数据更改后几乎实时地进行缓存取出)。开云体育电动老虎机

如您所见,为了实现这一点,并不需要太多的粘合代码。虽然所示的实现在某种程度上特定于示例的实体,但应该可以以更通用的方式实现更改事件处理程序,以便它可以处理一组配置的实体类型(实际上,数据库更改侦听器必须以通用的方式将更改事件的主键字段转换为对应实体的主键类型)。开云体育电动老虎机此外,这种通用实现还必须提供获取最常用数据库的当前事务id的逻辑。开云体育电动老虎机

请让我们知道你是否认为这将是一个有趣的扩展Debezium和Hibernate ORM。开云体育官方注册网址例如,这可能是Debezium伞下的一个新模块,它也可能是一个非常棒的项目,如果你有兴趣为Debez开云体育官方注册网址ium做贡献的话。如果你对这个想法有任何想法,请在下方发表评论或来我们的网站邮件列表

非常感谢Guillaume Smet, Hans-Peter Grahsl和Jiri Pechanec在写这篇文章时的反馈!

贡纳Morling

Gunnar是Decodable的软件工程师,也是一名不折不扣的开源爱好者。多年来,他一直是Debezium的项目负责人。开云体育官方注册网址Gunnar创建了kcctl、JfrUnit和MapStruct等开源项目,并且是Bean验证2.0 (JSR 380)的规范负责人。他在德国汉堡工作。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题