Integration Testing with Testcontainers

This feature is currently in incubating state, i.e. exact semantics, configuration options, APIs etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems will using this extension.

Overview

When setting up change data capture pipelines with Debezium, it’s a good idea to also have some automated testing in place, in order to make sure that

  • the source database is set up so changes can be streamed off of it

  • your connectors are configured correctly

The Debezium extension forTestcontainersaims at simplying such tests, by running all the required infrastructure (Apache Kafka, Kafka Connect etc.) via Linux containers and making it easily accessible to Java-based tests.

It applies sensible defaults as far as possible (e.g. database credentials for connectors can be obtained from the configured database container), allowing you to focus on the essential logic of your tests.

Getting Started

In order to use Debezium’s Testcontainers integration, add the following dependency to your project:

 io.debezium debezium-testing-testcontainers 2.1.2.Final test   org.testcontainers kafka test    org.testcontainers postgresql test 

Depending on your testing strategy, you may also need the JDBC driver of your database and a client for Apache Kafka, so you can insert some test data and assert the corresponding change events in Kafka.

Test Set-Up

When writing an integration test for a Debezium connector configuration, you’ll also need to set up Apache Kafka and a database which should be the source of change events. The existing Testcontainers support forApache Kafkaanddatabasescan be used for that.

Together with Debezium’sDebeziumContainerclass, a typical set-up will look like this:

public class DebeziumContainerTest { private static Network network = Network.newNetwork();(1)private static KafkaContainer kafkaContainer = new KafkaContainer() .withNetwork(network);(2)public static PostgreSQLContainer postgresContainer = new PostgreSQLContainer<>("debezium/postgres:11") .withNetwork(network) .withNetworkAliases("postgres");(3)public static DebeziumContainer debeziumContainer = new DebeziumContainer("debezium/connect:2.1.2.Final") .withNetwork(network) .withKafka(kafkaContainer) .dependsOn(kafkaContainer);(4)@BeforeClass public static void startContainers() {(5)Startables.deepStart(Stream.of( kafkaContainer, postgresContainer, debeziumContainer)) .join(); } }
1 Define a Docker network to be used by all the services
2 Set up a container for Apache Kafka
3 Set up a container for Postgres 11 (using Debezium’s Postgres container image)
4 Set up a container for Kafka Connect with Debezium 2.1.2.Final
5 Start all three containers

Test Implementation

Having declared all the required containers, you can now register an instance of the Debezium Postgres connector, insert some test data into Postgres and use the Apache Kafka client for reading the expected change event records from the corresponding Kafka topic:

@Test public void canRegisterPostgreSqlConnector() throws Exception { try (Connection connection = getConnection(postgresContainer); Statement statement = connection.createStatement(); KafkaConsumer consumer = getConsumer( kafkaContainer)) { statement.execute("create schema todo");(1)statement.execute("create table todo.Todo (id int8 not null, " + "title varchar(255), primary key (id))"); statement.execute("alter table todo.Todo replica identity full"); statement.execute("insert into todo.Todo values (1, " + "'Learn CDC')"); statement.execute("insert into todo.Todo values (2, " + "'Learn Debezium')"); ConnectorConfiguration connector = ConnectorConfiguration .forJdbcContainer(postgresContainer) .with("topic.prefix", "dbserver1"); debeziumContainer.registerConnector("my-connector", connector);(2)consumer.subscribe(Arrays.asList("dbserver1.todo.todo")); List> changeEvents = drain(consumer, 2);(3)为了(JsonPath。<整数> (changeEvents.ge阅读t(0).key(), "$.id")).isEqualTo(1); assertThat(JsonPath. read(changeEvents.get(0).value(), "$.op")).isEqualTo("r"); assertThat(JsonPath. read(changeEvents.get(0).value(), "$.after.title")).isEqualTo("Learn CDC"); assertThat(JsonPath. read(changeEvents.get(1).key(), "$.id")).isEqualTo(2); assertThat(JsonPath. read(changeEvents.get(1).value(), "$.op")).isEqualTo("r"); assertThat(JsonPath. read(changeEvents.get(1).value(), "$.after.title")).isEqualTo("Learn Debezium"); consumer.unsubscribe(); } } // Helper methods below private Connection getConnection( PostgreSQLContainer postgresContainer) throws SQLException { return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword()); } private KafkaConsumer getConsumer( KafkaContainer kafkaContainer) { return new KafkaConsumer<>( Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), new StringDeserializer(), new StringDeserializer()); } private List> drain( KafkaConsumer consumer, int expectedRecordCount) { List> allRecords = new ArrayList<>(); Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { consumer.poll(Duration.ofMillis(50)) .iterator() .forEachRemaining(allRecords::add); return allRecords.size() == expectedRecordCount; }); return allRecords; }
1 Create a table in the Postgres database and insert two records
2 注册的一个实例Debezium Postgres开云体育官方注册网址ector; the connector type as well as properties such as database host, database name, user etc. are derived from the database container
3 Read two records from the change event topic in Kafka and assert their attributes