Let’s talk about TOAST. Toast? No, TOAST!

So what’s that?TOAST(The Oversized-Attribute Storage Technique) is a mechanism in Postgres which stores large column values in multiple physical rows, circumventing the page size limit of 8 KB.

TOAST!

Typically, TOAST storage is transparent to the user, so you don’t really have to care about it. There’s an exception, though: if a table row has changed, any不变values that were stored using the TOAST mechanism are not included in the message that Debezium receives from the database, unless they are part of the table’sreplica identity. Consequently, such unchanged TOAST column value will not be contained in Debezium data change events sent to Apache Kafka. In this post we’re going to discuss different strategies for dealing with this situation.

当遇到一个土司列值不变in the logical replication message received from the database, the Debezium Postgres connector will represent that value with a configurable placeholder. By default, that’s the literal__debezium_unavailable_value, but that value can be overridden using thetoasted.value.placeholderconnector property.

Let’s consider the following Postgres table definition as an example:

CREATETABLEcustomers ( id SERIALNOTNULLPRIMARYKEY, first_nameVARCHAR(255)NOTNULL, last_nameVARCHAR(255)NOTNULL, emailVARCHAR(255)NOTNULLUNIQUE, biographyTEXT);

Here, thebiography TEXTcolumn is a TOAST-able column, as its value may exceed the page size limit. So when issuing an update such asupdate inventory.customers set first_name = 'Dana' where id = 1004;, you might receive a data change event in Apache Kafka which looks like this (assuming the table has the default replica identity):

{"before":null,"after": {"id":1004,"first_name":"Dana","last_name":"Kretchmar","email":"annek@noanswer.org","biography":"__debezium_unavailable_value"},"source": {"version":"0.10.0.Final","connector":"postgresql","name":"dbserver1","ts_ms":1570448151151,"snapshot":"false","db":"sourcedb","schema":"inventory","table":"customers","txId":627,"lsn":34650016,"xmin":null},"op":"u","ts_ms":1570448151611}

Note how thebiographyfield (whose value hasn’t changed with theUPDATE) has the special__debezium_unavailable_valuemarker value. Now, if change event consumers receive that placeholder value, the question arises how they should react to this.

One way, andcertainly the easiestfrom a consumer’s perspective, is to avoid the situation in the first place. This can be achieved by using a "replica identity" ofFULLfor the Postgres table in question. Alternatively, the replica identity can be based on an index which comprises the TOAST-able column.

Excluding Unchanged Values

If changing the source table’s replica identity is not an option, one approach for consumers that update a sink datastore (e.g. a database, cache or search index) is to ignore any field of a change event which has the placeholder value.

This means that any column with the placeholder value must be omitted from the update statement executed on the sink datastore. E.g. in terms of a SQL database, an specificUPDATEstatement must be built and executed which doesn’t contain the column(s) with the placeholder value. Users of Hibernate ORM may feel reminded of the "dynamic updates" feature which works similar. Some datastores and connectors might only support full updates, though, in which case this strategy isn’t viable.

Triggers

One interesting variation of the "ignore" approach is the usage of triggers in the sink database: registered for the column that may receive the marker value, they can "veto" such change and just keep the previously stored value instead. The following shows an example of such a trigger in Postgres:

CREATEORREPLACEFUNCTIONignore_unchanged_biography() RETURNSTRIGGERAS$BODY$BEGINIFNEW."biography"='__debezium_unavailable_value'THENNEW."biography"= OLD."biography";ENDIF;RETURNNEW;END;$BODY$ LANGUAGE PLPGSQL;CREATETRIGGERcustomer_biography_triggerBEFOREUPDATEOF"biography"ONcustomersFOREACHROWEXECUTEPROCEDUREignore_unchanged_biography();

This will keep the old value for thebiographycolumn if it were to be set to the__debezium_unavailable_valuemarker value.

Stateful Stream Processing

An alternative approach to dealing with unchanged TOAST column values is a stateful stream processing application.

这个应用程序可以持续的最新值TOAST column (as obtained from a snapshot, an insert event or an update including the TOAST-able column) in a state store and put the value back into change events with the marker value.

Debezium makes sure that all change events for one particular record always go into the same partition, so they they will be processed in the exact same order as they were created. This ensures that the latest value is available in the statestore when receiving a change event with the marker value.

Kafka Streamswith its state store API comes in very handy for building such a service. Based onQuarkusand its extension for buildingKafka Streams applicationsrunning either on the JVM or natively via GraalVM, a solution could look like this:

@ApplicationScopedpublicclassTopologyProducer{privatestaticfinalLoggerLOG = LoggerFactory.getLogger(TopologyProducer.class);staticfinal字符串BIOGRAPHY_STORE ="biography-store";@ConfigProperty(name ="pgtoast.customers.topic")字符串customersTopic;@ConfigProperty(name ="pgtoast.customers.enriched.topic")字符串customersEnrichedTopic;@Producespublic{Strea拓扑buildTopology ()msBuilder builder =newStreamsBuilder(); StoreBuilder字符串>> biographyStore =(1)Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(BIOGRAPHY_STORE),newJsonObjectSerde(),newSerdes.StringSerde() ); builder.addStateStore(biographyStore); builder.stream(customersTopic)(2).transformValues(ToastColumnValueProvider::new, BIOGRAPHY_STORE) .to(customersEnrichedTopic);returnbuilder.build(); }classToastColumnValueProviderimplementsValueTransformerWithKey {privateKeyValueStore字符串> biographyStore;@Override@SuppressWarnings("unchecked")publicvoidinit(ProcessorContext context) { biographyStore = (KeyValueStore字符串>) context.getStateStore( TopologyProducer.BIOGRAPHY_STORE); }@OverridepublicJsonObject transform(JsonObject key, JsonObject value) { JsonObject payload = value.getJsonObject("payload"); JsonObject newRowState = payload.getJsonObject("after");字符串biography = newRowState.getString("biography");if(isUnavailableValueMarker(biography)) {(3)字符串currentValue = biographyStore.get(key);(4)if(currentValue ==null) { LOG.warn("No biography value found for key '{}'", key); }else{ value = Json.createObjectBuilder(value)(5).add("payload", Json.createObjectBuilder(payload) .add("after", Json.createObjectBuilder(newRowState).add("biography", currentValue ) ) ) .build(); } }else{(6)biographyStore.put(key, biography); }returnvalue; }privatebooleanisUnavailableValueMarker(字符串value) {return"__debezium_unavailable_value".contentEquals(value); }@Overridepublicvoidclose() { } } }
1 Set up a state store for storing the latestbiographyvalue per customer id
2 The actual streaming pipeline: for each message on the customers topic, apply the logic for replacing the TOAST column marker value and write the transformed message to an output topic
3 Check whether thebiographyvalue from the incoming message is the marker
4 If so, get the currentbiographyvalue for the customer from the state store
5 Replace the marker value with the actual value obtained from the state store
6 If the incoming message has an actualbiographyvalue, put this to the state store

Now, if a consumer subscribes to the "enriched" topic, it will see any customer change events with the actual value of any unchanged TOAST columns, as materialized from the state store. The fact that the Debezium connector originally emitted the special marker value, is fully transparent at that point.

Primary Key Changes

When a record’s primary key gets updated, Debezium will create two change events: one "delete" event using the old key and one "insert" event with the new key. When processing the second event, the stream processing application will not be able to look up thebiographyvalue stored earlier on, as it has been under the old key.

One way to address this would be to expose the original key value e.g. as a message header of the insert event. This requirement is tracked asDBZ-1531; let us know if you’d like to contribute and implement this feature.

When to Use What?

We’ve discussed different options for dealing with unchanged TOAST column values in Debezium’s data change events. Which one should be used in which case then?

Changing the replica identity toFULLis the easiest approach by far: a single configuration to the source table avoids the problem to begin with. It’s not the most efficient solution, though, and some DBAs might be reluctant to apply this setting.

When using the change events to update some kind of sink data store, it may sound attractive at first to simply omit any field with the special marker value when issuing an update. But this technique has some downsides: not all data stores and the corresponding connectors might support partial updates. Instead there might only be the option to do full updates to a record in the sink data store based on the incoming data. Even when that option exists, it might be sub-optimal. E.g. for a SQL database, a statement just with the available values may be executed. This is at odds with efficient usage of prepared statements and batching, though: as the "shape" of the data may change between two updates to the same table, the same prepared statement cannot be re-used and performance may suffer.

The trigger-based approach isn’t prone to these problems: any updates to a table will have the same number of columns, so the consumer (e.g. a sink connector) may re-use the same prepared statement and batch multiple records into a single execution. One thing to be aware of is the organizational cost associated with this approach: triggers must be installed for each affected column and be kept in sync when table structures change. This must be done individually in each sink datastore, and not all stores have may have support for triggers to begin with. But where possible, triggers can be a great solution.

Finally, stream processing makes the usage of TOAST-able columns and the absence of their values in update events fully transparent to consumers. The enrichment logic is implemented in a single place, from which all the consumers of the change event stream benefit, without the need for individual solutions in each one of them. Also, it’s the only viable solution if consumers themselves are stateless and don’t have any way to materialize the last value of such column, e.g. when streaming change events to a browser via web sockets or GraphQL subscriptions. The price to pay is the overhead of maintaining and operating a separate service.

On a side note, such stream processing application might also be provided as a configurable, ready-to-use component coming as a part of the Debezium platform. This might be useful not only for Postgres, but also when thinking about other Debezium connectors. For instance, in case of Cassandra, change events will only ever contain the updated fields; a similar mode could be envisioned for MySQL by supporting its "non full" binlog mode. In both cases, a stateful stream processing service could be used to hydrate full data change events based on earlier row state retrieved from a local state store and an incoming "patch" style change event. If you think that’d be a useful addition to Debezium, please let us know.

As always, there are no silver bullets: you should choose a solution based on your specific situation and requirements. As a starting point you can find a basic implementation of the trigger and Kafka Streams approaches in the Debeziumexamples repository.

Which approach would you prefer? Or perhaps you have even further alternatives in mind? Tell us about it in the comments below.

Many thanks toDave Cramerand Jiri Pechanec for their feedback while working on this post and the accompanying example code!

Gunnar Morling

Gunnar is a software engineer at Decodable and an open-source enthusiast by heart. He has been the project lead of Debezium over many years. Gunnar has created open-source projects like kcctl, JfrUnit, and MapStruct, and is the spec lead for Bean Validation 2.0 (JSR 380). He’s based in Hamburg, Germany.


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top ofKafkaand providesKafka Connectcompatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium isopen sourceunder theApache License, Version 2.0.

Get involved

我们希望你发现Debezium开云体育官方注册网址teresting and useful, and want to give it a try. Follow us on Twitter@debezium,chat with us on Zulip, or join ourmailing listto talk with the community. All of the code is open sourceon GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know orlog an issue.

Baidu
map