使用Debezium设置变更数据捕获(CDC)管道通常是一个配置问题,不涉及任何编程。开云体育官方注册网址对CDC设置进行自动化测试仍然是一个非常好的主意,以确保正确配置了所有内容,并且按照预期设置了Debezium连接器。开云体育官方注册网址
这里涉及到两个主要组件,需要考虑它们的配置:
源数据库:开云体育电动老虎机必须设置它以便Debezium可以连接到它并检索更改开云体育官方注册网址事件;具体取决于具体的数据库,例如,MySQL的binlog必须在“开云体育电动老虎机行”模式,Postgres,必须安装一个支持的逻辑解码插件,等等。
Debe开云体育官方注册网址zium连接器:它必须使用正确的数据库主机和凭证进行配置,可能使用SSL,应用表和列过滤器,开云体育电动老虎机可能使用一个或多个单个消息转换(smt)等。
这就是新添加的Debezium开云体育官方注册网址支持集成测试与Testcontainers出现的原因。它允许使用Linux容器映像设置所有必需的组件(Apache Kafka, Kafka Connect等),配置和部署Debezium连接器,并针对产生的变更数据事件运行断言。开云体育官方注册网址
让我们来看看它是如何做到的。
项目设置
假设您正在使用Apache Maven进行依赖项管理,请将以下依赖项添加到您的pom.xml,引入了Debezium Test开云体育官方注册网址containers集成和Testcontainers模块的Apache Kafka:
< >的依赖< groupId >io.开云体育官方注册网址debezium< / groupId >< artifactId >开云体育官方注册网址debezium-testing-testcontainers< / artifactId ><版本>1.1.0.CR1> < /版本< >范围测试< / >范围< / >的依赖< >的依赖< groupId >org.testcontainers< / groupId >< artifactId >卡夫卡< / artifactId >< >范围测试< / >范围< / >的依赖
同时为你的数据库添加Testcontainers依赖项,例如Postgres:开云体育电动老虎机
< >的依赖< groupId >org.testcontainers< / groupId >< artifactId >postgresql< / artifactId >< >范围测试< / >范围< / >的依赖
中可以找到具有完整配置的示例项目开云体育官方注册网址debezium-examples在GitHub上回购。
初始化Testcontainers
声明完所有必需的依赖项之后,就可以编写CDC集成测试了。使用Testcontainers,集成测试使用Linux容器和Docker实现。它为启动和管理测试所需的资源提供了一个Java API。我们可以用它来启动Apache Kafka, Kafka Connect和Postgres数据库:开云体育电动老虎机
公共类CdcTest{私人静态网络网络= Network. newnetwork ();(1)私人静态KafkaContainer =新KafkaContainer () .withNetwork(网络);(2)公共静态PostgreSQLContainer < ?> postgresContainer =新PostgreSQLContainer < > (”开云体育官方注册网址debezium / postgres: 11”) .withNetwork(network) .withNetworkAliases(”postgres”);(3)公共静态开云体育官方注册网址DebeziumContainer =新开云体育官方注册网址DebeziumContainer (”1.1.0.CR1”) .withNetwork(network) .withKafka(kafkaContainer) .dependsOn(kafkaContainer);(4)@BeforeClass公共静态无效startContainers () {(5)Startables.deepStart(Stream.of(kafkaContainer, postgresContainer, d开云体育官方注册网址ebeziumContainer)) .join();}}
1 | 定义一个Docker网络供所有服务使用 |
2 | 为Apache Kafka设置一个容器 |
3. | 为Postgres 11设置一个容器(使用Debezium的Postgres容器映像开云体育官方注册网址) |
4 | 用Debezium为Kafka Connect设置一个容器开云体育官方注册网址 |
5 | 启动a中的所有三个容器@BeforeClass 方法 |
请注意,为了使用Testcontainers,您需要安装Docker。
测试实施
有了所需的基础设施,我们就可以为CDC设置编写测试了。测试的总体流程如下:
为Postgres数据库开云体育官方注册网址配置一个Debezium连接器开云体育电动老虎机
执行一些SQL语句来更改一些数据
使用Kafka消费者从相应的Kafka主题检索结果变更数据事件
针对这些事件运行一些断言
下面是测试的shell:
@Test公共无效canObtainChangeEventsFromPostgres ()抛出异常{试一试(连接connection = getConnection(postgresContainer);声明statement = connection.createStatement();KafkaConsumer <字符串,字符串> consumer = getConsumer(kafkaContainer)) {// todo…}}
数据库连接的凭据可以从Testcontainer开云体育电动老虎机s启动的Postgres容器中获得,很好地避免了任何冗余:
私人连接getConnection (PostgreSQLContainer < ?> postgresContainer)抛出SQLException异常{返回DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword());}
卡夫卡的消费者也一样:
私人KafkaConsumer <字符串,字符串> getConsumer(KafkaContainer) {返回新KafkaConsumer < > (ImmutableMap。(ConsumerConfig。BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), ConsumerConfig。GROUP_ID_CONFIG,”tc -”+UUIDConsumerConfig .randomUUID()。AUTO_OFFSET_RESET_CONFIG,”最早的”),新StringDeserializer (),新StringDeserializer ());}
现在让我们实现实际的测试逻辑:
statement.execute (”创建模式todo”);(1)statement.execute (”创建表todo。待办事项(”+”Id int8不为空,”+”标题varchar (255),”+”主键(id)”);statement.execute (”修改表todo。Todo副本标识已满”);statement.execute (”插入到todo中。Todo值(1,“学习CDC”)”);statement.execute (”插入到todo中。Todo价值观(2,“学习Debezium”)开云体育官方注册网址”);connector = ConnectorConfiguration .forJdbcContainer(postgresContainer) .with(”开云体育电动老虎机database.server.name”,”dbserver1”);开云体育官方注册网址debeziumContainer.registerConnector (”连接器”、连接器);(2)consumer.subscribe (数组.asList (”dbserver1.todo.todo”));列表< ConsumerRecord <字符串,字符串>> changeEvents = drain(consumer,2);(3)ConsumerRecord <字符串,字符串> changeEvent = changeEvents.get(0);为了(JsonPath。<整数>阅读(changeEvent.key (),”.id美元”) .isEqualTo (1);为了(JsonPath。<字符串>阅读(changeEvent.value (),”.op美元”) .isEqualTo (”r”);为了(JsonPath。<字符串>阅读(changeEvent.value (),”.after.title美元”) .isEqualTo (”学习中心”);changeEvent = changeEvents.get(1);为了(JsonPath。<整数>阅读(changeEvent.key (),”.id美元”) .isEqualTo (2);为了(JsonPath。<字符串>阅读(changeEvent.value (),”.op美元”) .isEqualTo (”r”);为了(JsonPath。<字符串>阅读(changeEvent.value (),”.after.title美元”) .isEqualTo (”学习Debe开云体育官方注册网址zium”);consumer.unsubscribe ();
1 | 在Postgres数据库中创建一个表并插入两条记录开云体育电动老虎机 |
2 | 注册Debezium Postgres连接器的实例开云体育官方注册网址 |
3. | 从Kafka中的更改事件主题中读取两条记录,并断言它们的属性 |
请注意Debezi开云体育官方注册网址um的Testcontainers支持如何允许从数据库容器中播种连接器配置,从而避免需要显式地给出数据库连接属性。开云体育电动老虎机只有独一无二的开云体育电动老虎机database.server.name
必须给出,当然您可以应用其他配置选项,如表或列过滤器,smt等。
的源代码排水管()
为了简洁起见,省略了从卡夫卡主题中读取给定数量记录的方法。你可以找到它在GitHub上的完整示例中。
JsonPath-based断言对于断言预期的数据更改事件的属性非常方便,当然您也可以使用任何其他JSON API来完成这项工作。当使用Apache Avro而不是JSON作为序列化格式时,您必须使用Avro api。
总结
测试容器和Debezium对它的支持使开云体育官方注册网址得为CDC设置编写自动化集成测试变得相当容易。
本文中讨论的测试方法可以以多种方式进行扩展。例如,可能需要将连接器配置置于修订控制之下(这样您就可以管理和跟踪任何配置更改),并使用这些配置文件驱动测试。您还可以进一步测试整个数据流管道。为此,您不仅需要部署Debezium连接器,还需要部署接收器连接器,例如用于数据仓库或搜开云体育官方注册网址索服务器。然后可以针对这些接收器系统中的数据运行断言,确保端到端数据管道的正确性。
你对疾控中心设置和管道的测试有什么看法?请在下面的评论中告诉我们!
关于Debe开云体育官方注册网址zium
开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源下Apache许可证,版本2.0.
参与
我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium,在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题.