您正在查看过时的Debezium版本的文档。开云体育官方注册网址
如果您想查看本页最新的稳定版本,请前往在这里

使用Testcontainers进行集成测试

此功能目前处于孵化状态,即准确的语义,配置选项,api等可能会根据我们收到的反馈在未来的版本中更改。请让我们知道,如果你遇到任何问题将使用这个扩展。

概述

当使用Debezium设置变更数据捕获管道时,为了确保这一点,最好也有一些自动测试开云体育官方注册网址

  • 源数据库已经设置好,因开云体育电动老虎机此更改可以从其中流出

  • 连接器配置正确

Debe开云体育官方注册网址zium扩展Testcontainers旨在简化这些测试,通过Linux容器运行所有必需的基础设施(Apache Kafka, Kafka Connect等),并使其易于访问基于java的测试。

它尽可能应用合理的默认值(例如,连接器的数据库凭据可以从配置的数据库容器中获得),允许您专注于测试的基本逻辑开云体育电动老虎机。

开始

为了使用Debezium的Tes开云体育官方注册网址tcontainers集成,在项目中添加以下依赖项:

 io.开云体育官方注册网址debezium  debey -testing-testcontainers 1.4.2. .Final test   org。testcontainers kafka test    org。testcontainers postgresql test  . testcontainers postgresql

根据您的测试策略,您可能还需要数据库的JDBC驱动程序和Apache Kafka的客户端,这样您就可以在Kafka中插入一些测试数据并断言相应的更改事件。开云体育电动老虎机

测试设置

在为Debezium连接器配置编写集成测试时,您还需要设置Apache Kaf开云体育官方注册网址ka和一个数据库,该数据库应该是更改事件的源。开云体育电动老虎机现有的测试容器支持Apache卡夫卡开云体育电动老虎机数据库可以用来做这个。

还有Debezium的开云体育官方注册网址开云体育官方注册网址DebeziumContainer类,典型的设置是这样的:

公共类DebeziumCo开云体育官方注册网址ntainerTest{私有静态网络网络= Network. newnetwork ();(1)KafkaContainer = new KafkaContainer() .withNetwork(network);(2)PostgreSQLContainer PostgreSQLContainer = new PostgreSQLContainer<>开云体育官方注册网址("debezium/postgres:11") .withNetwork(network) .withNetworkAliases("postgres");(3)public static 开云体育官方注册网址DebeziumContainer DebeziumContainer = new DebeziumContainer("debezium/connect:1.4.2.Final") .withNetwork(network) .withKafka(kafkaContainer) .dependsOn(kafkaContainer);(4)@BeforeClass公共静态void startContainers() {(5)Startables.deepStart(Stream.of(kafkaContainer, postgresContainer, d开云体育官方注册网址ebeziumContainer)) .join();}}
1 定义一个Docker网络供所有服务使用
2 为Apache Kafka设置一个容器
3. 为Postgres 11设置一个容器(使用Debezium的Postgres容器映像开云体育官方注册网址)
4 使用Debezium 1.4.2.Final为Kafka Connect设置一个容器开云体育官方注册网址
5 启动所有三个容器

测试实现

声明了所有需要的容器之后,你现在可以注册Debezium Postgres连接器的实例,将一些测试数据插入到Postgres中,并使用Apache Kafka客户端从相应开云体育官方注册网址的Kafka主题中读取预期的更改事件记录:

@测试公共无效canRegisterPostgreSqlConnector()抛出异常{try(连接连接= getConnection(postgresContainer);语句Statement = connection.createStatement();KafkaConsumer consumer = getConsumer(kafkaContainer)){语句。Execute ("create schema todo");(1)声明。执行("create table todo. "Todo (id int8 not null, " + "title varchar(255),主键(id))");声明。执行("alter table todo. "Todo副本身份全”);声明。执行("insert into todo. "Todo值(1," + "'学习CDC')");声明。execute("insert into todo.Todo values (2, " + "'Learn Debezium')"); ConnectorConfiguration connector = ConnectorConfiguration .forJdbcContainer(postgresContainer) .with("database.server.name", "dbserver1"); debeziumContainer.registerConnector("my-connector", connector);(2)“dbserver1.todo.todo”consumer.subscribe (arrays . aslist ());List> changeEvents = drain(consumer, 2);(3)为了(JsonPath。<整数>阅读(changeEvents.get(0)。key(),“.id美元”)).isEqualTo (1);为了(JsonPath。<字符串>阅读(changeEvents.get (0) value(),“.op美元”)).isEqualTo (r);为了(JsonPath。 read(changeEvents.get(0).value(), "$.after.title")))。isEqualTo(“学习中心”);为了(JsonPath。<整数>阅读(changeEvents.get(1)。key(),“.id美元”)).isEqualTo (2);为了(JsonPath。<字符串>阅读(changeEvents.get (1) value(),“.op美元”)).isEqualTo (r); assertThat(JsonPath. read(changeEvents.get(1).value(), "$.after.title")).isEqualTo("Learn Debezium"); consumer.unsubscribe(); } } // Helper methods below private Connection getConnection( PostgreSQLContainer postgresContainer) throws SQLException { return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword()); } private KafkaConsumer getConsumer( KafkaContainer kafkaContainer) { return new KafkaConsumer<>( ImmutableMap.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), new StringDeserializer(), new StringDeserializer()); } private List> drain( KafkaConsumer consumer, int expectedRecordCount) { List> allRecords = new ArrayList<>(); Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { consumer.poll(Duration.ofMillis(50)) .iterator() .forEachRemaining(allRecords::add); return allRecords.size() == expectedRecordCount; }); return allRecords; }
1 在Postgres数据库中创建一个表并插入两条记录开云体育电动老虎机
2 注册一个Debezium Postgres连接器实例;开云体育官方注册网址连接器类型以及数据库主机、数据库名称、用户等属性都派生自数据库容器开云体育电动老虎机
3. 从Kafka中的更改事件主题中读取两条记录,并断言它们的属性