使用Testcontainers进行集成测试
概述
当使用Debezium设置变更数据捕获管道时,为了确保这一点,最好也有一些自动测试开云体育官方注册网址
源数据库已经设置好,因开云体育电动老虎机此更改可以从其中流出
连接器配置正确
Debe开云体育官方注册网址zium扩展Testcontainers旨在简化这些测试,通过Linux容器运行所有必需的基础设施(Apache Kafka, Kafka Connect等),并使其易于访问基于java的测试。
它尽可能应用合理的默认值(例如,连接器的数据库凭据可以从配置的数据库容器中获得),允许您专注于测试的基本逻辑开云体育电动老虎机。
开始
为了使用Debezium的Tes开云体育官方注册网址tcontainers集成,在项目中添加以下依赖项:
io.开云体育官方注册网址debezium debey -testing-testcontainers 1.4.2. .Final test org。testcontainers kafka test org。testcontainers postgresql test . testcontainers postgresql
根据您的测试策略,您可能还需要数据库的JDBC驱动程序和Apache Kafka的客户端,这样您就可以在Kafka中插入一些测试数据并断言相应的更改事件。开云体育电动老虎机
测试设置
在为Debezium连接器配置编写集成测试时,您还需要设置Apache Kaf开云体育官方注册网址ka和一个数据库,该数据库应该是更改事件的源。开云体育电动老虎机现有的测试容器支持Apache卡夫卡和开云体育电动老虎机数据库可以用来做这个。
还有Debezium的开云体育官方注册网址开云体育官方注册网址DebeziumContainer
类,典型的设置是这样的:
公共类DebeziumCo开云体育官方注册网址ntainerTest{私有静态网络网络= Network. newnetwork ();(1)KafkaContainer = new KafkaContainer() .withNetwork(network);(2)PostgreSQLContainer> PostgreSQLContainer = new PostgreSQLContainer<>开云体育官方注册网址("debezium/postgres:11") .withNetwork(network) .withNetworkAliases("postgres");(3)public static 开云体育官方注册网址DebeziumContainer DebeziumContainer = new DebeziumContainer("debezium/connect:1.4.2.Final") .withNetwork(network) .withKafka(kafkaContainer) .dependsOn(kafkaContainer);(4)@BeforeClass公共静态void startContainers() {(5)Startables.deepStart(Stream.of(kafkaContainer, postgresContainer, d开云体育官方注册网址ebeziumContainer)) .join();}}
1 | 定义一个Docker网络供所有服务使用 |
2 | 为Apache Kafka设置一个容器 |
3. | 为Postgres 11设置一个容器(使用Debezium的Postgres容器映像开云体育官方注册网址) |
4 | 使用Debezium 1.4.2.Final为Kafka Connect设置一个容器开云体育官方注册网址 |
5 | 启动所有三个容器 |
测试实现
声明了所有需要的容器之后,你现在可以注册Debezium Postgres连接器的实例,将一些测试数据插入到Postgres中,并使用Apache Kafka客户端从相应开云体育官方注册网址的Kafka主题中读取预期的更改事件记录:
@测试公共无效canRegisterPostgreSqlConnector()抛出异常{try(连接连接= getConnection(postgresContainer);语句Statement = connection.createStatement();KafkaConsumer consumer = getConsumer(kafkaContainer)){语句。Execute ("create schema todo");(1)声明。执行("create table todo. "Todo (id int8 not null, " + "title varchar(255),主键(id))");声明。执行("alter table todo. "Todo副本身份全”);声明。执行("insert into todo. "Todo值(1," + "'学习CDC')");声明。execute("insert into todo.Todo values (2, " + "'Learn Debezium')"); ConnectorConfiguration connector = ConnectorConfiguration .forJdbcContainer(postgresContainer) .with("database.server.name", "dbserver1"); debeziumContainer.registerConnector("my-connector", connector);(2)“dbserver1.todo.todo”consumer.subscribe (arrays . aslist ());List> changeEvents = drain(consumer, 2);(3)为了(JsonPath。<整数>阅读(changeEvents.get(0)。key(),“.id美元”)).isEqualTo (1);为了(JsonPath。<字符串>阅读(changeEvents.get (0) value(),“.op美元”)).isEqualTo (r);为了(JsonPath。 read(changeEvents.get(0).value(), "$.after.title")))。isEqualTo(“学习中心”);为了(JsonPath。<整数>阅读(changeEvents.get(1)。key(),“.id美元”)).isEqualTo (2);为了(JsonPath。<字符串>阅读(changeEvents.get (1) value(),“.op美元”)).isEqualTo (r); assertThat(JsonPath. read(changeEvents.get(1).value(), "$.after.title")).isEqualTo("Learn Debezium"); consumer.unsubscribe(); } } // Helper methods below private Connection getConnection( PostgreSQLContainer> postgresContainer) throws SQLException { return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword()); } private KafkaConsumer getConsumer( KafkaContainer kafkaContainer) { return new KafkaConsumer<>( ImmutableMap.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), new StringDeserializer(), new StringDeserializer()); } private List> drain( KafkaConsumer consumer, int expectedRecordCount) { List> allRecords = new ArrayList<>(); Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { consumer.poll(Duration.ofMillis(50)) .iterator() .forEachRemaining(allRecords::add); return allRecords.size() == expectedRecordCount; }); return allRecords; }
1 | 在Postgres数据库中创建一个表并插入两条记录开云体育电动老虎机 |
2 | 注册一个Debezium Postgres连接器实例;开云体育官方注册网址连接器类型以及数据库主机、数据库名称、用户等属性都派生自数据库容器开云体育电动老虎机 |
3. | 从Kafka中的更改事件主题中读取两条记录,并断言它们的属性 |