集成测试与Testcontainers

这个功能目前正在酝酿状态,即精确的语义,配置选项,api等可能会改变在未来修订,根据我们得到的反馈。请让我们知道如果你遇到任何问题将使用这个扩展。

概述

当设置改变数据捕获和Debezium管道,这是一个好主意也有一些自动化测试,以确保开云体育官方注册网址

  • 源数据库设置所以变化可开云体育电动老虎机以流

  • 你的连接器是正确配置

Debe开云体育官方注册网址zium扩展为Testcontainers旨在简单这样的测试,通过运行所需的所有基础设施(Apache卡夫卡,卡夫卡连接等)通过Linux容器和让它容易被java测试。

尽量适用合理的默认值(例如数据库连接器的凭证可以获得从数据库配置的容器),允许您关注您的测试的基本逻辑。开云体育电动老虎机

开始

为了使用Debezium Tes开云体育官方注册网址tcontainers集成,添加以下依赖您的项目:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-testing-testcontainers < / artifactId > <版本> tripwire。最后< /版本> <范围>测试< /范围> < /依赖> <依赖> < groupId > org。卡夫卡testcontainers < / groupId > < artifactId > < / artifactId > <范围>测试< /范围> < /依赖> < !——添加TC依赖匹配数据库- - > <依赖> < groupId > org开云体育电动老虎机。testcontainers < / groupId > < artifactId > postgresql < / artifactId > <范围> < /范围> < /依赖>测试

根据您的测试策略,您可能还需要数据库的JDBC驱动程序和客户端为Apache卡夫卡,所以你可以插入一些测试数据并维护相应的改变发生在卡夫卡。开云体育电动老虎机

测试设置

当编写集成测试Debezium连接器配置,您还需要设置Apache卡夫卡和数据开云体育官方注册网址库应该改变事件的来源。开云体育电动老虎机现有的Testcontainers支持Apache卡夫卡开云体育电动老虎机数据库可以使用。

Debezium一起的开云体育官方注册网址开云体育官方注册网址DebeziumContainer类,一个典型的设置是这样的:

网络公开课Debezium开云体育官方注册网址ContainerTest{私有静态网络= Network.newNetwork ();(1)私有静态KafkaContainer KafkaContainer = new KafkaContainer () .withNetwork(网络);(2)公共静态PostgreSQLContainer < ?> postgresContainer = new PostgreSQLContainer < > (DockerImageName.parse (quay.i开云体育官方注册网址o / debezium postgres: 15) .asCompatibleSubstituteFor (“postgres”)) .withNetwork(网络).withNetworkAliases (“postgres”);(3)公共静态DebeziumCo开云体育官方注册网址ntainer DebeziumContainer = new DebeziumContainer (quay.io / debezium /连接:2.3.0.Final) .withNetwork(网络).withKafka (kafkaContainer) .dependsOn (kafkaContainer);(4)@BeforeClass公共静态孔隙startContainers () {(5)Startables.deepStart (Stream.of (kafkaContainer postgresContainer d开云体育官方注册网址ebeziumContainer)) . join ();}}
1 定义一个码头工人网络使用的所有服务
2 建立一个Apache卡夫卡的容器
3 建立一个容器Postgres 15(使用Debezium Postgres的集装箱图开云体育官方注册网址片)
4 建立一个容器卡夫卡与Debezium 2.3.0.Final开云体育官方注册网址
5 开始这三个容器

测试实现

宣布所有所需的容器,你现在可以注册Debezium Postgres连接器的一个实例,将一些测试数据插入Postgres并使用Apache卡夫卡客户端阅读预期变化事件记录开云体育官方注册网址从相应的卡夫卡主题:

@Test公共空canRegisterPostgreSqlConnector()抛出异常{尝试(连接连接= getConnection (postgresContainer);声明语句= connection.createStatement ();KafkaConsumer <字符串,字符串>消费者= getConsumer (kafkaContainer)){语句。执行(“创建模式待办事项”);(1)声明。执行(“创建待办事项表。待办事项(id int8不是零,“+”标题varchar(255),主键(id)) ");声明。执行(“alter table todo。Todo复制品身份已满”);声明。执行(“插入todo。Todo值(“+”“学习中心”)”);声明。execute("insert into todo.Todo values (2, " + "'Learn Debezium')"); ConnectorConfiguration connector = ConnectorConfiguration .forJdbcContainer(postgresContainer) .with("topic.prefix", "dbserver1"); debeziumContainer.registerConnector("my-connector", connector);(2)“dbserver1.todo.todo”consumer.subscribe (arrays . aslist ());列表< ConsumerRecord <字符串,字符串> > changeEvents =排水(消费者,2);(3)为了(JsonPath。<整数>阅读(changeEvents.get (0)。key (),“.id美元”)).isEqualTo (1);为了(JsonPath。<字符串>阅读(changeEvents.get (0) value (),“.op美元”)).isEqualTo (r);为了(JsonPath。<字符串>阅读(changeEvents.get (0) value (),“.after.title美元”))。isEqualTo(“学习中心”);为了(JsonPath。<整数>阅读(changeEvents.get (1)。key (),“.id美元”)).isEqualTo (2);为了(JsonPath。<字符串>阅读(changeEvents.get (1) value (),“.op美元”)).isEqualTo (r); assertThat(JsonPath. read(changeEvents.get(1).value(), "$.after.title")).isEqualTo("Learn Debezium"); consumer.unsubscribe(); } } // Helper methods below private Connection getConnection( PostgreSQLContainer postgresContainer) throws SQLException { return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword()); } private KafkaConsumer getConsumer( KafkaContainer kafkaContainer) { return new KafkaConsumer<>( Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), new StringDeserializer(), new StringDeserializer()); } private List> drain( KafkaConsumer consumer, int expectedRecordCount) { List> allRecords = new ArrayList<>(); Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { consumer.poll(Duration.ofMillis(50)) .iterator() .forEachRemaining(allRecords::add); return allRecords.size() == expectedRecordCount; }); return allRecords; }
1 Postgres数据库中创建表并插入两个记录开云体育电动老虎机
2 注册的一个实例Debezium Postgres连接器;开云体育官方注册网址连接器类型以及属性数据库主机、数据库名称、用户等来自数据库容器开云体育电动老虎机
3 读卡夫卡的两个记录更改事件的话题和维护自己的属性