Today, I am pleased to announce the second alpha release in the 2.2 release stream, Debezium2.2.0.Alpha2. This release includes a plethora of bug fixes, improvements, breaking changes, and a number of new features including, but not limited to, a newExtractRecordChangessingle message transformation, a Reactive-based implementation of the Debezium Outbox extension for Quarkus, a Debezium Storage module for Apache RocketMQ, and much more. Let’s take moment and dive into these new features, improvements, and breaking changes.

Breaking Changes

We typically try to avoid any breaking changes, even during minor releases such as this; however, sometimes breaking changes are inevitable given the circumstances. Debezium 2.2.0.Alpha2 includes three breaking changes:

Topic / Schema naming changes

Debezium previously sanitized topic and schema names by using an underscore (_) to replace non-ASCII characters that would lead to unsupported topic or schema names when using schema registries. However, if this non-ASCII character was the only difference between two similar topics or schema names that otherwise only varied by case, this would lead to other problems.

In order to address this in the most compatible way, Debezium now uses a strategy-based approach to map characters uniquely. As a side effect of this change, thesanitize.field.namesconfiguration property has been retired and replaced by this new strategy-based approach.

Each connector supports two configuration properties to control this behavior:

schema.name.adjustment.mode

Specifies how schema names should be adjusted for compatibility with the message converter.

field.name.adjustment.mode

Specifies how field names should be adjusted for compatibility with the message converter.

These two connector configuration properties support three modes:

none

No adjustment is made to the schema or field names, passed as-is.

avro

Replaces characters that cannot be used in Avro with an underscore (_).

avro_unicode

Replaces underscores (_) and characters that cannot be used in Avro with unicode-based characters.

This now allows you to pick the most appropriate strategy based on your table or collection naming convention.

Source info block changes with Oracle connector

All Debezium change events related to inserts, updates, and deletes contain asourceinfo block in the event’s payload. For the Oracle connector, this block contains a special field calledssnthat represents the SQL sequence number for this change.

It has been identified that there were corner cases where the value sourced from the database for this field could exceed the maximum value of2,147,483,647, or the maximum value of anINT32data type. To fix this corner case, we’ve changed the data type fromINT32toINT64, which allows up to a maximum value of9,223,372,036,854,775,807.

This change should be entirely non-invasive, but we wanted to bring attention to this should you have pipelines that could be storing this value in a sink system or if you are using a schema registry.

Debezium Server moved to new repository

Debezium Server is a standalone Quarkus-based runtime for Debezium source connectors enabling the integration with various platforms like EventHubs, PubSub, Pulsar, Redis, and Kafka, to name a few. With this release, we have moved the code related to Debezium Server to its ownGitHub repository.

This change was required in order to support building Debezium Server to include connectors that are not part of the main Debezium repository, connectors such as Db2, Google Spanner, Cassandra 4, and Vitess. Therefore, this means that starting with this release, Debezium Server now ships with all connectors (excluding Cassandra 3) by default.

Cassandra 3 is excluded due to some technical limitations with class loading that creates conflicts with Cassandra 4. We are aware of this and plan to deliver a solution to include Cassandra 3 in the future.

New ExtractChangedRecordState SMT

We have heard from the community on several occasions that it would great to have an out-of-the-box way to determine what values have changed in a Debezium change event. The new single message transform (SMT)ExtractChangedRecordStateaims to deliver on this request by adding metadata to the event identifying which fields changed or were unchanged.

In order to get started with this new transformation, configure it as part of your connector configuration:

transforms=changes transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState transforms.changes.header.changed=ChangedFields transforms.changes.header.unchanged=UnchangedFields

This transformation can be configured to disclose either what fields changed by settingheader.changed, what fields are unchanged by settingheader.unchanged, or both by setting both properties as shown above. The transformation will add a new header with the specified name and it’s value will include a collection of field names based on whether you’ve configured changes, non-changes, or both.

Drop fields using ExtractNewRecordState SMT

TheExtractNewRecordStatesingle message transformation is extremely useful in situations where you need to consume the Debezium change event in aflattenedformat. This SMT has been changed in this release to add the ability to drop fields from the payload and the message key of the event.

This new feature introduces three new configuration properties for the transformation:

drop.fields.header.name

The Kafka message header name to use for listing field names in the source message that are to be dropped.

drop.fields.from.key

Specifies whether to remove fields also from the key, defaults tofalse.

drop.fields.keep.schema.compatible

Specifies whether to remove fields that are only optional, defaults totrue.

When using Avro, schema compatibility is extremely important. This is why we opted to enforce schema compatibility by default. If a field is configured to be dropped but it is non-optional, the field will not be removed from the key nor the payload unless schema compatibility is disabled.

These new configuration options allow for some exciting ways to manipulate change events. For example, to emit events with only changed fields, pairing theExtractNewRecordStatewith the newExtractChangedRecordStatetransformation makes this extremely simple and straightforward. An example configuration to only emit changed columns would look like the following:

transforms=changes,extract transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState transforms.changes.header.unchanged=UnchangedFields transforms.extract.type=io.debezium.transforms.ExtractNewRecordState transforms.extract.drop.fields.header.name=UnchangedFields

The above configuration will explicitly not include unchanged fields from the event’s payload value. If a field in the key did not change, it will be unaffected becausedrop.fields.from.keywas left as its default offalse. And finally, if a field in the event’s payload is to be dropped because it did not change, but it’s not optional, it will continue to be included in the transformation’s output event to comply with schema compatibility.

Reactive Debezium Outbox Quarkus Extension

Theoutbox patternis an approach that many microservices leverage to share data across microservice boundaries. We introduced the Debezium Outbox Quarkus Extension in Debezium 1.1 back in early 2020, and it has allowed Quarkus users to leverage the outbox pattern with ease using Debezium.

Thanks toIngmar Fjolla, Debezium 2.2.0.Alpha2 includes a new reactive-based implementation of the Debezium Outbox Quarkus Extension. This new implementation is based on Vert.x and Hibernate Reactive, providing a fully asynchronous solution to the outbox pattern using Debezium.

This new extension will be included in the Quarkus Platform releases latter this quarter or early Q2, however, if you want to get started with it today, you can easily drop it directly into your project’s configuration using the following coordinates:

Maven coordinates
io.debezium< / groupId >debezium-quarkus-outbox-reactive2.2.0.Alpha2
Gradle coordinates
io.debezium:debezium-quarkus-outbox-reactive:2.2.0.Alpha2

New Rocket MQ Schema History Storage

Debezium’s new storage API has been a huge success over this past year. We initially started with our original file and Kafka based implementations for offset and schema history storage, but that has since grown to support storing schema history on other platforms such as Amazon S3 and Redis.

在这通过添加这个版本继续扩张a new schema history storage implementation for Rocket MQ. In order to get started with storing your schema history into Rocket MQ, thedebezium-storage-rocketmq首先必须在类路径和访问的依赖ssible by the connector runtime.

Once the dependency exists, the only remaining step will be configuring the schema history connector configuration. The following example shows basic usage of the Rocket MQ schema history:

schema.history.internal.rocketmq.topic=schema-history schema.history.internal.rocketmq.name.srv.addr=172.17.15.2 schema.history.internal.rocketmq.acl.enabled=true schema.history.internal.rocketmq.access.key= schema.history.internal.rocketmq.secret.key= schema.history.internal.rocketmq.recovery.attempts=5 schema.history.internal.rocketmq.recovery.poll.interval.ms=1000 schema.history.internal.rocketmq.store.record.timeout.ms=2000
schema.history.internal.rocketmq.topic

Specifies the topic name where the schema history will be stored.

schema.history.internal.rocketmq.name.srv.addr

Specifies the service discovery service nameserver for Rocket MQ.

schema.history.internal.rocketmq.acl.enabled

Specifies whether access control lists (ACLs) are enabled, defaults tofalse.

schema.history.internal.rocketmq.access.key

Specifies the Rocket MQ access key, required only if ACLs are enabled.

schema.history.internal.rocketmq.secret.key

Specifies the Rocket MQ secret key, required only if ACLs are enabled.

schema.history.internal.rocketmq.recovery.attempts

Specifies the number of sequential attempts that no data is returned before recovery completes.

schema.history.internal.rocketmq.recovery.poll.interval.ms

Specifies the number of milliseconds for each poll attempt to recover the history.

schema.history.internal.rocketmq.store.record.timeout.ms

Specifies the number of milliseconds for a write to Rocket MQ to complete before timing out.

Other fixes

有相当多的其他改进,bug fixes, and stability changes in this release, some noteworthy are:

  • Better control on debezium GTID usageDBZ-2296

  • Data type conversion failed for mysql bigintDBZ-5798

  • ActivateTracingSpan wrong timestamps reportedDBZ-5827

  • Unable to specify column or table include list if name contains a backslash \DBZ-5917

  • debezium-connector-cassandra 2.1.0.Alpha2 plugin can no longer run "out of the box"DBZ-5925

  • MongoDB Incremental Snapshot not WorkingDBZ-5973

  • Nullable columns marked with "optional: false" in DDL eventsDBZ-6003

  • Upgrade to Quarkus 2.16.0.FinalDBZ-6005

  • Vitess: Handle the shard list difference between current db shards and persisted shardsDBZ-6011

  • Offsets are not flushed on connect offsets topic when encountering an error on Postgres connectorDBZ-6026

  • Unexpected format for TIME column: 8:00DBZ-6029

  • Oracle does not support compression/logging clauses after an LOB storage clauseDBZ-6031

  • debezium-server Pulsar support non-default tenant and namespaceDBZ-6033

  • Debezium is logging the full message along with the errorDBZ-6037

  • Improve resilience during internal schema history recovery from KafkaDBZ-6039

  • Vitess: Support Mapping unsigned bigint mysql column type to longDBZ-6043

  • Incremental snapshot sends the events from signalling DB to KafkaDBZ-6051

  • Upgrade Kafka to 3.3.2DBZ-6054

  • Mask password in log statementDBZ-6064

  • Loading Custom offset storage fails with Class not found errorDBZ-6075

  • 增加查询.fetch.size default to something sensible above zeroDBZ-6079

  • SQL Server tasks fail if the number of databases is smaller than maxTasksDBZ-6084

  • When using LOB support, an UPDATE against multiple rows can lead to inconsistent event dataDBZ-6107

  • Expose sequence field in CloudEvents message idDBZ-6089

  • Reduce verbosity of skipped transactions if transaction has no events relevant to captured tablesDBZ-6094

  • Upgrade Kafka client to 3.4.0DBZ-6102

What’s Next?

We’re still very early in the development cycle of Debezium 2.2 and many other features are still in development, including:

  • Configurable signal channels, enabling users to send signals not only from a database table or a Kafka topic, but also from other means such as an HTTP endpoint, the file system, etc.

  • The Debezium JDBC sink connector that supports native Debezium change events out-of-the-box, without requiring the use of the Event Flattening transformation.

  • And a plethora of Debezium UI enhancements

We are about middle way through the quarter and Debezium 2.2 will begin to enter beta phase very soon. We would love to hear your feedback or suggestions regarding the roadmap, changes in this release, those that are outstanding, or anything we haven’t mentioned. Be sure to get in touch with us on themailing listor ourchatif there is.

Also be on the lookout for our first installment of our 2023 Newsletter as well as the upcoming and conclusion to the blog series, "Debezium for Oracle" where I cover performance, debugging, and frequently asked questions about the Oracle connector.

Until next time…

Chris Cranford

Chris is a software engineer at Red Hat. He previously was a member of the Hibernate ORM team and now works on Debezium. He lives in North Carolina just a few hours from Red Hat towers.


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top ofKafkaand providesKafka Connectcompatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium isopen sourceunder theApache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter@debezium,chat with us on Zulip, or join ourmailing listto talk with the community. All of the code is open sourceon GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know orlog an issue.

Baidu
map