You are viewing documentation for an outdated version of Debezium.
If you want to view the latest stable version of this page, please gohere

Debezium connector for PostgreSQL

Debezium’s PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. PostgreSQL versions 9.6, 10, 11, 12 and 13 are supported.

The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.

Overview

PostgreSQL’slogical decodingfeature was introduced in version 9.4. It is a mechanism that allows the extraction of the changes that were committed to the transaction log and the processing of these changes in a user-friendly manner with the help of anoutput plug-in。The output plug-in enables clients to consume the changes.

The PostgreSQL connector contains two main parts that work together to read and process database changes:

  • A logical decoding output plug-in. You might need to install the output plug-in that you choose to use. You must configure a replication slot that uses your chosen output plug-in before running the PostgreSQL server. The plug-in can be one of the following:

    • decoderbufsis based on Protobuf and maintained by the Debezium community.

    • wal2jsonis based on JSON and maintained by the wal2json community.

    • pgoutputis the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself forlogical replication。This plug-in is always present so no additional libraries need to be installed. The Debezium connector interprets the raw replication event stream directly into change events.

  • Java code (the actual Kafka Connect connector) that reads the changes produced by the chosen logical decoding output plug-in. It uses PostgreSQL’sstreaming replication protocol, by means of the PostgreSQLJDBC driver

The connector produces achange eventfor every row-level insert, update, and delete operation that was captured and sends change event records for each table in a separate Kafka topic. Client applications read the Kafka topics that correspond to the database tables of interest, and can react to every row-level event they receive from those topics.

PostgreSQL normally purges write-ahead log (WAL) segments after some period of time. This means that the connector does not have the complete history of all changes that have been made to the database. Therefore, when the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing aconsistent snapshotof each of the database schemas. After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made. This way, the connector starts with a consistent view of all of the data, and does not omit any changes that were made while the snapshot was being taken.

The connector is tolerant of failures. As the connector reads changes and produces events, it records the WAL position for each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart the connector continues reading the WAL where it last left off. This includes snapshots. If the connector stops during a snapshot, the connector begins a new snapshot when it restarts.

The connector relies on and reflects the PostgreSQL logical decoding feature, which has the following limitations:

  • Logical decoding does not support DDL changes. This means that the connector is unable to report DDL change events back to consumers.

  • Logical decoding replication slots are supported on onlyprimaryservers. When there is a cluster of PostgreSQL servers, the connector can run on only the activeprimaryserver. It cannot run onhotorwarmstandby replicas. If theprimaryserver fails or is demoted, the connector stops. After theprimaryserver has recovered, you can restart the connector. If a different PostgreSQL server has been promoted toprimary, adjust the connector configuration before restarting the connector.

Behavior when things go wrongdescribes what the connector does when there is a problem.

Debezium currently supports databases with UTF-8 character encoding only. With a single byte character encoding, it is not possible to correctly process strings that contain extended ASCII code characters.

How the connector works

To optimally configure and run a Debezium PostgreSQL connector, it is helpful to understand how the connector performs snapshots, streams change events, determines Kafka topic names, and uses metadata.

Security

To use the Debezium connector to stream changes from a PostgreSQL database, the connector must operate with specific privileges in the database. Although one way to grant the necessary privileges is to provide the user withsuperuserprivileges, doing so potentially exposes your PostgreSQL data to unauthorized access. Rather than granting excessive privileges to the Debezium user, it is best to create a dedicated Debezium replication user to which you grant specific privileges.

For more information about configuring privileges for the Debezium PostgreSQL user, seeSetting up permissions。For more information about PostgreSQL logical replication security, see thePostgreSQL documentation

Snapshots

大多数PostgreSQL服务器配置不retain the complete history of the database in the WAL segments. This means that the PostgreSQL connector would be unable to see the entire history of the database by reading only the WAL. Consequently, the first time that the connector starts, it performs an initialconsistent snapshotof the database. The default behavior for performing a snapshot consists of the following steps. You can change this behavior by setting thesnapshot.modeconnector configuration propertyto a value other thaninitial

  1. Start a transaction with aSERIALIZABLE, READ ONLY, DEFERRABLEisolation level to ensure that subsequent reads in this transaction are against a single consistent version of the data. Any changes to the data due to subsequentINSERT,UPDATE, andDELETEoperations by other clients are not visible to this transaction.

  2. Read the current position in the server’s transaction log.

  3. Scan the database tables and schemas, generate aREADevent for each row and write that event to the appropriate table-specific Kafka topic.

  4. Commit the transaction.

  5. Record the successful completion of the snapshot in the connector offsets.

If the connector fails, is rebalanced, or stops after Step 1 begins but before Step 6 completes, upon restart the connector begins a new snapshot. After the connector completes its initial snapshot, the PostgreSQL connector continues streaming from the position that it read in step 3. This ensures that the connector does not miss any updates. If the connector stops again for any reason, upon restart, the connector continues streaming changes from where it previously left off.

Table 1. Settings forsnapshot.modeconnector configuration property
Setting Description

always

The connector always performs a snapshot when it starts. After the snapshot completes, the connector continues streaming changes from step 3 in the above sequence. This mode is useful in these situations:

  • It is known that some WAL segments have been deleted and are no longer available.

  • After a cluster failure, a new primary has been promoted. Thealwayssnapshot mode ensures that the connector does not miss any changes that were made after the new primary had been promoted but before the connector was restarted on the new primary.

never

The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the point in time when the PostgreSQL logical replication slot was created on the server. Theneversnapshot mode is useful only when you know all data of interest is still reflected in the WAL.

initial_only

The connector performs a database snapshot and stops before streaming any change event records. If the connector had started but did not complete a snapshot before stopping, the connector restarts the snapshot process and stops when the snapshot completes.

exported

Deprecated, all modes are lockless.

custom

Thecustomsnapshot mode lets you inject your own implementation of theio.debezium.connector.postgresql.spi.Snapshotterinterface. Set thesnapshot.custom.classconfiguration property to the class on the classpath of your Kafka Connect cluster or included in the JAR if using theEmbeddedEngine。For more details, seecustom snapshotter SPI

Ad-hoc snapshot

This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this extension.

When the initial snapshot is completed then it is not executed again as further data are acquired via streaming.

Sometimes it would be really useful to re-execute a snapshot, either completely, or for only a subset of tables. Examples for such use cases include:

  • a new table was added into the list of captured tables

  • some of the topics were deleted and their content needs to be rebuilt

  • the data were corrupted due to a configuration error

Debezium can be requested via thesignalling tablemechanism to execute a new snapshot of a defined list of tables. The signal is namedexecute-snapshotand accepts messages in the following format:

Table 2. Example of a signal record
Field Default Value

type

incremental

The type of the snapshot to be executed. Currently onlyincrementalis supported.
See the next section for more details.

data-collections

N/A

An array of qualified names of table to be snapshotted.
The format of the names is the same as forsignal.data.collectionconfiguration option.

An example of the signal to be triggered is this:

INSERT INTO myschema.debezium_signal VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema1.table2"]}')

Incremental snapshot

This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this extension.

Initial snapshots are a great way to obtain a consistent set of all data stored in a database upfront. But there are also some downsides to this approach, though:

  • For larger datasets it can take very long to complete; hours or even days

  • The snapshot must be completed before the start of streaming

  • Snapshots are not resumable If a snapshot hasn’t been completed when the connector gets stopped or restarted, the snapshot must be restarted from scratch

  • New tables that are added among the captured ones later are not snapshotted.

To mitigate these issues, a new mechanism called增量快照has been introduced.

Incremental snapshots can be started current only asan ad-hoc snapshot

Configuration optionsignal.data.collectionmust be set.

Incremental snapshot is based onDDD-3design document. In this case the table is snapshotted in chunks and streaming and snapshot events are blended together. The snaphot events are still denoted using operationrand thesnapshotfield insourceinfo block is set toincremental

Example
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental" }, "op":"r", "ts_ms":"1620393591654", "transaction":null }

The incremental snapshot events should be understood in a different way than events from an initial snapshot. Theread事件应该被理解就像一个实体化of data in a certain period of time. This means that event semantics differ between initial and incremental snapshots:

  • updateanddeleteevents can arrive beforereadevents

  • ifdeleteevent arrives first thenreadevent is never delivered

  • ifupdateevent arrives first thenreadevent will either not be delivered at all or it will be delivered with the updated value

Custom snapshotter SPI

For more advanced uses, you can provide an implementation of theio.debezium.connector.postgresql.spi.Snapshotterinterface. This interface allows control of most of the aspects of how the connector performs snapshots. This includes whether or not to take a snapshot, the options for opening the snapshot transaction, and whether to take locks.

Following is the full API for the interface. All built-in snapshot modes implement this interface.

/ * * *这个接口是用于确定细节about the snapshot process: * * Namely: * - Should a snapshot occur at all * - Should streaming occur * - What queries should be used to snapshot * * While many default snapshot modes are provided with Debezium, * a custom implementation of this interface can be provided by the implementor, which * can provide more advanced functionality, such as partial snapshots. * * Implementations must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()} * or true for both. */ @Incubating public interface Snapshotter { void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState); /** * @return true if the snapshotter should take a snapshot */ boolean shouldSnapshot(); /** * @return true if the snapshotter should stream after taking a snapshot */ boolean shouldStream(); /** * * @return true if streaming should resume from the start of the snapshot * transaction, or false for when a connector resumes and takes a snapshot, * streaming should resume from where streaming previously left off. */ default boolean shouldStreamEventsStartingFromSnapshot() { return true; } /** * Generate a valid postgres query string for the specified table, or an empty {@link Optional} * to skip snapshotting this table (but that table will still be streamed from) * * @param tableId the table to generate a query for * @return a valid query string, or none to skip snapshotting this table */ Optional buildSnapshotQuery(TableId tableId); /** * Return a new string that set up the transaction for snapshotting * * @param newSlotInfo if a new slow was created for snapshotting, this contains information from * the `create_replication_slot` command */ default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) { // we're using the same isolation level that pg_backup uses return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;"; } /** * Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter * implementation. */ default Optional snapshotTableLockingStatement(Duration lockTimeout, Set tableIds) { String lineSeparator = System.lineSeparator(); StringBuilder statements = new StringBuilder(); statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); // we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot // this does not prevent writes to the table, but prevents changes to the table's schema.... // DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted tableIds.forEach(tableId -> statements.append("LOCK TABLE ") .append(tableId.toDoubleQuotedString()) .append(" IN ACCESS SHARE MODE;") .append(lineSeparator)); return Optional.of(statements.toString()); } /** * Lifecycle hook called once the snapshot phase is finished. */ default void snapshotCompleted() { // no operation } }

Streaming changes

The PostgreSQL connector typically spends the vast majority of its time streaming changes from the PostgreSQL server to which it is connected. This mechanism relies onPostgreSQL’s replication protocol。This protocol enables clients to receive changes from the server as they are committed in the server’s transaction log at certain positions, which are referred to as Log Sequence Numbers (LSNs).

Whenever the server commits a transaction, a separate server process invokes a callback function from thelogical decoding plug-in。This function processes the changes from the transaction, converts them to a specific format (Protobuf or JSON in the case of Debezium plug-in) and writes them on an output stream, which can then be consumed by clients.

The Debezium PostgreSQL connector acts as a PostgreSQL client. When the connector receives changes it transforms the events into Debeziumcreate,update, ordeleteevents that include the LSN of the event. The PostgreSQL connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.

Periodically, Kafka Connect records the most recentoffsetin another Kafka topic. The offset indicates source-specific position information that Debezium includes with each event. For the PostgreSQL connector, the LSN recorded in each change event is the offset.

When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset. When the connector restarts, it sends a request to the PostgreSQL server to send the events starting just after that position.

PostgreSQL连接器检索模式备用ion as part of the events sent by the logical decoding plug-in. However, the connector does not retrieve information about which columns compose the primary key. The connector obtains this information from the JDBC metadata (side channel). If the primary key definition of a table changes (by adding, removing or renaming primary key columns), there is a tiny period of time when the primary key information from JDBC is not synchronized with the change event that the logical decoding plug-in generates. During this tiny period, a message could be created with an inconsistent key structure. To prevent this inconsistency, update primary key structures as follows:

  1. Put the database or an application into a read-only mode.

  2. Let Debezium process all remaining events.

  3. Stop Debezium.

  4. Update the primary key definition in the relevant table.

  5. Put the database or the application into read/write mode.

  6. Restart Debezium.

PostgreSQL 10+ logical decoding support (pgoutput)

PostgreSQL 10 +,there is a logical replication stream mode, calledpgoutputthat is natively supported by PostgreSQL. This means that a Debezium PostgreSQL connector can consume that replication stream without the need for additional plug-ins. This is particularly valuable for environments where installation of plug-ins is not supported or not allowed.

SeeSetting up PostgreSQLfor more details.

Topics names

The PostgreSQL connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. By default, the Kafka topic name isserverNameschemaNametableNamewhere:

  • serverNameis the logical name of the connector as specified with thedatabase.server.nameconnector configuration property.

  • schemaNameis the name of the database schema where the operation occurred.

  • tableNameis the name of the database table in which the operation occurred.

For example, suppose thatfulfillmentis the logical server name in the configuration for a connector that is capturing changes in a PostgreSQL installation that has apostgresdatabase and aninventoryschema that contains four tables:products,products_on_hand,customers, andorders。The connector would stream records to these four Kafka topics:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

Now suppose that the tables are not part of a specific schema but were created in the defaultpublicPostgreSQL schema. The names of the Kafka topics would be:

  • fulfillment.public.products

  • fulfillment.public.products_on_hand

  • fulfillment.public.customers

  • fulfillment.public.orders

Meta information

In addition to adatabase change event, each record produced by a PostgreSQL connector contains some metadata. Metadata includes where the event occurred on the server, the name of the source partition and the name of the Kafka topic and partition where the event should go, for example:

"sourcePartition": { "server": "fulfillment" }, "sourceOffset": { "lsn": "24023128", "txId": "555", "ts_ms": "1482918357011" }, "kafkaPartition": null
  • sourcePartitionalways defaults to the setting of thedatabase.server.nameconnector configuration property.

  • sourceOffsetcontains information about the location of the server where the event occurred:

    • lsnrepresents the PostgreSQLLog Sequence Numberoroffsetin the transaction log.

    • txIdrepresents the identifier of the server transaction that caused the event.

    • ts_msrepresents the server time at which the transaction was committed in the form of the number of milliseconds since the epoch.

  • kafkaPartitionwith a setting ofnullmeans that the connector does not use a specific Kafka partition. The PostgreSQL connector uses only one Kafka Connect partition and it places the generated events into one Kafka partition.

Transaction metadata

Debezium can generate events that represent transaction boundaries and that enrich data change event messages.

Limits on when Debezium receives transaction metadata

Debezium registers and receives metadata only for transactions that occur after you deploy the connector. Metadata for transactions that occur before you deploy the connector is not available.

For every transactionBEGINandEND, Debezium generates an event that contains the following fields:

  • status-BEGINorEND

  • id- string representation of unique transaction identifier

  • event_count(forENDevents) - total number of events emitted by the transaction

  • data_collections(forENDevents) - an array of pairs ofdata_collectionandevent_countthat provides the number of events emitted by changes originating from given data collection

Example
{"该队s": "BEGIN", "id": "571", "event_count": null, "data_collections": null } { "status": "END", "id": "571", "event_count": 2, "data_collections": [ { "data_collection": "s1.a", "event_count": 1 }, { "data_collection": "s2.a", "event_count": 1 } ] }

Transaction events are written to the topic nameddatabase.server.name。transaction

Change data event enrichment

When transaction metadata is enabled the data messageEnvelopeis enriched with a newtransactionfield. This field provides information about every event in the form of a composite of fields:

  • id- string representation of unique transaction identifier

  • total_order- absolute position of the event among all events generated by the transaction

  • data_collection_order- the per-data collection position of the event among all events that were emitted by the transaction

Following is an example of a message:

{ "before": null, "after": { "pk": "2", "aa": "1" }, "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "571", "total_order": "1", "data_collection_order": "1" } }

Data change events

The Debezium PostgreSQL connector generates a data change event for each row-levelINSERT,UPDATE, andDELETEoperation. Each event contains a key and a value. The structure of the key and the value depends on the table that was changed.

Debezium and Kafka Connect are designed aroundcontinuous streams of event messages。However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.

The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. Aschemafield is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:

{ "schema": {(1)。.. }, "payload": {(2)。.. }, "schema": {(3)。.. }, "payload": {(4)。.. }, }
Table 3. Overview of change event basic content
Item Field name Description

1

schema

The firstschemafield is part of the event key. It specifies a Kafka Connect schema that describes what is in the event key’spayloadportion. In other words, the firstschemafield describes the structure of the primary key, or the unique key if the table does not have a primary key, for the table that was changed.

It is possible to override the table’s primary key by setting themessage.key.columnsconnector configuration property。In this case, the first schema field describes the structure of the key identified by that property.

2

payload

The firstpayloadfield is part of the event key. It has the structure described by the previousschemafield and it contains the key for the row that was changed.

3

schema

The secondschemafield is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value’spayloadportion. In other words, the secondschemadescribes the structure of the row that was changed. Typically, this schema contains nested schemas.

4

payload

The secondpayloadfield is part of the event value. It has the structure described by the previousschemafield and it contains the actual data for the row that was changed.

By default behavior is that the connector streams change event records to主题名称一样的事件originating table

Starting with Kafka 0.10, Kafka can optionally record the event key and value with thetimestampat which the message was created (recorded by the producer) or written to the log by Kafka.

The PostgreSQL connector ensures that all Kafka Connect schema names adhere to theAvro schema name format。This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or _. Each remaining character in the logical server name and each character in the schema and table names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \_. If there is an invalid character it is replaced with an underscore character.

这可能会导致意想不到的有限公司nflicts if the logical server name, a schema name, or a table name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores.

Change event keys

For a given table, the change event’s key has a structure that contains a field for each column in the primary key of the table at the time the event was created. Alternatively, if the table hasREPLICA IDENTITYset toFULLorUSING INDEXthere is a field for each unique key constraint.

Consider acustomerstable defined in thepublicdatabase schema and the example of a change event key for that table.

Example table
CREATE TABLE customers ( id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id) );
Example change event key

If thedatabase.server.nameconnector configuration property has the valuePostgreSQL_server, every change event for thecustomerstable while it has this definition has the same key structure, which in JSON looks like this:

{ "schema": {(1)"type": "struct", "name": "PostgreSQL_server.public.customers.Key",(2)"optional": false,(3)"fields": [(4){ "name": "id", "index": "0", "schema": { "type": "INT32", "optional": "false" } } ] }, "payload": {(5)"id": "1" }, }
Table 4. Description of change event key
Item Field name Description

1

schema

The schema portion of the key specifies a Kafka Connect schema that describes what is in the key’spayloadportion.

2

PostgreSQL_server.inventory.customers.Key

Name of the schema that defines the structure of the key’s payload. This schema describes the structure of the primary key for the table that was changed. Key schema names have the formatconnector-namedatabase-nametable-nameKey。In this example:

  • PostgreSQL_serveris the name of the connector that generated this event.

  • inventoryis the database that contains the table that was changed.

  • customersis the table that was updated.

3

optional

Indicates whether the event key must contain a value in itspayloadfield. In this example, a value in the key’s payload is required. A value in the key’s payload field is optional when a table does not have a primary key.

4

fields

Specifies each field that is expected in thepayload, including each field’s name, index, and schema.

5

payload

Contains the key for the row for which this change event was generated. In this example, the key, contains a singleidfield whose value is1

Although thecolumn.exclude.listandcolumn.include.listconnector configuration properties allow you to capture only a subset of table columns, all columns in a primary or unique key are always included in the event’s key.

If the table does not have a primary or unique key, then the change event’s key is null. The rows in a table without a primary or unique key constraint cannot be uniquely identified.

Change event values

The value in a change event is a bit more complicated than the key. Like the key, the value has aschemasection and apayloadsection. Theschemasection contains the schema that describes theEnvelopestructure of thepayloadsection, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.

Consider the same sample table that was used to show an example of a change event key:

CREATE TABLE customers ( id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id) );

The value portion of a change event for a change to this table varies according to theREPLICA IDENTITYsetting and the operation that the event is for.

Replica identity

REPLICA IDENTITYis a PostgreSQL-specific table-level setting that determines the amount of information that is available to the logical decoding plug-in forUPDATEandDELETEevents. More specifically, the setting ofREPLICA IDENTITYcontrols what (if any) information is available for the previous values of the table columns involved, whenever anUPDATEorDELETEevent occurs.

There are 4 possible values forREPLICA IDENTITY:

  • DEFAULT- The default behavior is thatUPDATEandDELETEevents contain the previous values for the primary key columns of a table if that table has a primary key. For anUPDATEevent, only the primary key columns with changed values are present.

    If a table does not have a primary key, the connector does not emitUPDATEorDELETEevents for that table. For a table without a primary key, the connector emits onlycreateevents. Typically, a table without a primary key is used for appending messages to the end of the table, which means thatUPDATEandDELETEevents are not useful.

  • NOTHING- Emitted events forUPDATEandDELETEoperations do not contain any information about the previous value of any table column.

  • FULL- Emitted events forUPDATEandDELETEoperations contain the previous values of all columns in the table.

  • INDEXindex-name- Emitted events forUPDATEandDELETEoperations contain the previous values of the columns contained in the specified index.UPDATEevents also contain the indexed columns with the updated values.

createevents

The following example shows the value portion of a change event that the connector generates for an operation that creates data in thecustomerstable:

{ "schema": {(1)"type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "PostgreSQL_server.inventory.customers.Value",(2)"field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "PostgreSQL_server.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "xmin" } ], "optional": false, "name": "io.debezium.connector.postgresql.Source",(3)"field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "PostgreSQL_server.inventory.customers.Envelope"(4)}, "payload": {(5)"before": null,(6)"after": {(7)"id": 1, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": {(8)"version": "1.6.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": true, "db": "postgres", "sequence": "[\"24023119\",\"24023128\"]" "schema": "public", "table": "customers", "txId": 555, "lsn": 24023128, "xmin": null }, "op": "c",(9)"ts_ms": 1559033904863(10)} }
Table 5. Descriptions ofcreateevent value fields
Item Field name Description

1

schema

The value’s schema, which describes the structure of the value’s payload. A change event’s value schema is the same in every change event that the connector generates for a particular table.

2

name

In theschemasection, eachnamefield specifies the schema for a field in the value’s payload.

PostgreSQL_server.inventory.customers.Valueis the schema for the payload’sbeforeandafterfields. This schema is specific to thecustomerstable.

Names of schemas forbeforeandafterfields are of the formlogicalNametableName。Value, which ensures that the schema name is unique in the database. This means that when using theAvro converter, the resulting Avro schema for each table in each logical source has its own evolution and history.

3

name

io.debezium.connector.postgresql.Sourceis the schema for the payload’ssourcefield. This schema is specific to the PostgreSQL connector. The connector uses it for all events that it generates.

4

name

PostgreSQL_server.inventory.customers.Envelopeis the schema for the overall structure of the payload, wherePostgreSQL_serveris the connector name,inventoryis the database, andcustomersis the table.

5

payload

The value’s actual data. This is the information that the change event is providing.

It may appear that the JSON representations of the events are much larger than the rows they describe. This is because the JSON representation must include the schema and the payload portions of the message. However, by using theAvro converter, you can significantly decrease the size of the messages that the connector streams to Kafka topics.

6

before

An optional field that specifies the state of the row before the event occurred. When theopfield iscfor create, as it is in this example, thebeforefield isnullsince this change event is for new content.

Whether or not this field is available is dependent on theREPLICA IDENTITYsetting for each table.

7

after

An optional field that specifies the state of the row after the event occurred. In this example, theafterfield contains the values of the new row’sid,first_name,last_name, andemailcolumns.

8

source

Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:

  • Debezium version

  • Connector type and name

  • Database and table that contains the new row

  • Stringified JSON array of additional offset information. The first value is always the last committed LSN, the second value is always the current LSN. Either value may benull

  • Schema name

  • If the event was part of a snapshot

  • ID of the transaction in which the operation was performed

  • Offset of the operation in the database log

  • Timestamp for when the change was made in the database

9

op

Mandatory string that describes the type of operation that caused the connector to generate the event. In this example,cindicates that the operation created a row. Valid values are:

  • c= create

  • u= update

  • d= delete

  • r= read (applies to only snapshots)

10

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

updateevents

The value of a change event for an update in the samplecustomerstable has the same schema as acreateevent for that table. Likewise, the event value’s payload has the same structure. However, the event value payload contains different values in anupdateevent. Here is an example of a change event value in an event that the connector generates for an update in thecustomerstable:

{ "schema": { ... }, "payload": { "before": {(1)"id": 1 }, "after": {(2)"id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": {(3)"version": "1.6.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 24023128, "xmin": null }, "op": "u",(4)"ts_ms": 1465584025523(5)} }
Table 6. Descriptions ofupdateevent value fields
Item Field name Description

1

before

An optional field that contains values that were in the row before the database commit. In this example, only the primary key column,id, is present because the table’sREPLICA IDENTITYsetting is, by default,DEFAULT。+ For anupdateevent to contain the previous values of all columns in the row, you would have to change thecustomerstable by runningALTER TABLE customers REPLICA IDENTITY FULL

2

after

An optional field that specifies the state of the row after the event occurred. In this example, thefirst_namevalue is nowAnne Marie

3

source

Mandatory field that describes the source metadata for the event. Thesourcefield structure has the same fields as in acreateevent, but some values are different. The source metadata includes:

  • Debezium version

  • Connector type and name

  • Database and table that contains the new row

  • Schema name

  • If the event was part of a snapshot (alwaysfalseforupdateevents)

  • ID of the transaction in which the operation was performed

  • Offset of the operation in the database log

  • Timestamp for when the change was made in the database

4

op

Mandatory string that describes the type of operation. In anupdateevent value, theopfield value isu, signifying that this row changed because of an update.

5

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

Updating the columns for a row’s primary/unique key changes the value of the row’s key. When a key changes, Debezium outputsthreeevents: aDELETEevent and atombstone eventwith the old key for the row, followed by an event with the new key for the row. Details are in the next section.

Primary key updates

AnUPDATEoperation that changes a row’s primary key field(s) is known as a primary key change. For a primary key change, in place of sending anUPDATEevent record, the connector sends aDELETEevent record for the old key and aCREATEevent record for the new (updated) key. These events have the usual structure and content, and in addition, each one has a message header related to the primary key change:

  • TheDELETEevent record has__debezium.newkeyas a message header. The value of this header is the new primary key for the updated row.

  • TheCREATEevent record has__debezium.oldkeyas a message header. The value of this header is the previous (old) primary key that the updated row had.

deleteevents

The value in adeletechange event has the sameschemaportion ascreateandupdateevents for the same table. Thepayloadportion in adeleteevent for the samplecustomerstable looks like this:

{ "schema": { ... }, "payload": { "before": {(1)"id": 1 }, "after": null,(2)"source": {(3)"version": "1.6.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null }, "op": "d",(4)"ts_ms": 1465581902461(5)} }
Table 7. Descriptions ofdeleteevent value fields
Item Field name Description

1

before

Optional field that specifies the state of the row before the event occurred. In adeleteevent value, thebeforefield contains the values that were in the row before it was deleted with the database commit.

In this example, thebefore因为字段只包含主键列the table’sREPLICA IDENTITYsetting isDEFAULT

2

after

Optional field that specifies the state of the row after the event occurred. In adeleteevent value, theafterfield isnull, signifying that the row no longer exists.

3

source

Mandatory field that describes the source metadata for the event. In adeleteevent value, thesourcefield structure is the same as forcreateandupdateevents for the same table. Manysourcefield values are also the same. In adeleteevent value, thets_msandlsnfield values, as well as other values, might have changed. But thesourcefield in adeleteevent value provides the same metadata:

  • Debezium version

  • Connector type and name

  • Database and table that contained the deleted row

  • Schema name

  • If the event was part of a snapshot (alwaysfalsefordeleteevents)

  • ID of the transaction in which the operation was performed

  • Offset of the operation in the database log

  • Timestamp for when the change was made in the database

4

op

Mandatory string that describes the type of operation. Theopfield value isd, signifying that this row was deleted.

5

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

Adeletechange event record provides a consumer with the information it needs to process the removal of this row.

For a consumer to be able to process adeleteevent generated for a table that does not have a primary key, set the table’sREPLICA IDENTITYtoFULL。When a table does not have a primary key and the table’sREPLICA IDENTITYis set toDEFAULTorNOTHING, adeleteevent has nobeforefield.

PostgreSQL connector events are designed to work withKafka log compaction。Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.

Tombstone events

When a row is deleted, thedeleteevent value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must benull。To make this possible, the PostgreSQL connector follows adeleteevent with a specialtombstoneevent that has the same key but anullvalue.

truncateevents

Atruncatechange event signals that a table has been truncated. The message key isnullin this case, the message value looks like this:

{ "schema": { ... }, "payload": { "source": {(1)"version": "1.6.4.Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null }, "op": "t",(2)"ts_ms": 1559033904961(3)} }
Table 8. Descriptions oftruncateevent value fields
Item Field name Description

1

source

Mandatory field that describes the source metadata for the event. In atruncateevent value, thesourcefield structure is the same as forcreate,update, anddeleteevents for the same table, provides this metadata:

  • Debezium version

  • Connector type and name

  • Database and table that contains the new row

  • Schema name

  • If the event was part of a snapshot (alwaysfalsefordeleteevents)

  • ID of the transaction in which the operation was performed

  • Offset of the operation in the database log

  • Timestamp for when the change was made in the database

2

op

Mandatory string that describes the type of operation. Theopfield value ist, signifying that this table was truncated.

3

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

In case a singleTRUNCATEstatement applies to multiple tables, onetruncatechange event record for each truncated table will be emitted.

Note that sincetruncateevents represent a change made to an entire table and don’t have a message key, unless you’re working with topics with a single partition, there are no ordering guarantees for the change events pertaining to a table (create,update, etc.) andtruncateevents for that table. For instance a consumer may receive anupdateevent only after atruncateevent for that table, when those events are read from different partitions.

Data type mappings

The PostgreSQL connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the PostgreSQL data type of the column. The following sections describe how the connector maps PostgreSQL data types to aliteral typeand asemantic typein event fields.

  • literal typedescribes how the value is literally represented using Kafka Connect schema types:INT8,INT16,INT32,INT64,FLOAT32,FLOAT64,BOOLEAN,STRING,BYTES,ARRAY,MAP, andSTRUCT

  • semantic typedescribes how the Kafka Connect schema captures themeaningof the field using the name of the Kafka Connect schema for the field.

Basic types

The following table describes how the connector maps basic types.

Table 9. Mappings for PostgreSQL basic data types
PostgreSQL data type Literal type (schema type) Semantic type (schema name) and Notes

BOOLEAN

BOOLEAN

n/a

BIT(1)

BOOLEAN

n/a

BIT( > 1)

BYTES

io.debezium.data.Bits

Thelengthschema parameter contains an integer that represents the number of bits. The resultingbyte[]contains the bits in little-endian form and is sized to contain the specified number of bits. For example,numBytes = n/8 + (n % 8 == 0 ? 0 : 1)wherenis the number of bits.

BIT VARYING[(M)]

BYTES

io.debezium.data.Bits

Thelengthschema parameter contains an integer that represents the number of bits (2^31 - 1 in case no length is given for the column). The resultingbyte[]contains the bits in little-endian form and is sized based on the content. The specified size(M)is stored in the length parameter of theio.debezium.data.Bitstype.

SMALLINT,SMALLSERIAL

INT16

n/a

INTEGER,SERIAL

INT32

n/a

BIGINT,BIGSERIAL,OID

INT64

n/a

REAL

FLOAT32

n/a

DOUBLE PRECISION

FLOAT64

n/a

CHAR[(M)]

STRING

n/a

VARCHAR[(M)]

STRING

n/a

CHARACTER[(M)]

STRING

n/a

CHARACTER VARYING[(M)]

STRING

n/a

TIMESTAMPTZ,TIMESTAMP WITH TIME ZONE

STRING

io.debezium.time.ZonedTimestamp

A string representation of a timestamp with timezone information, where the timezone is GMT.

TIMETZ,TIME WITH TIME ZONE

STRING

io.debezium.time.ZonedTime

A string representation of a time value with timezone information, where the timezone is GMT.

INTERVAL [P]

INT64

io.debezium.time.MicroDuration
(default)

The approximate number of microseconds for a time interval using the365.25 / 12.0formula for days per month average.

INTERVAL [P]

STRING

io.debezium.time.Interval
(wheninterval.handling.modeis set tostring)

The string representation of the interval value that follows the patternPYMDTHMS, for example,P1Y2M3DT4H5M6.78S

BYTEA

BYTESorSTRING

n/a

Either the raw bytes (the default), a base64-encoded string, or a hex-encoded string, based on the connector’sbinary handling modesetting.

JSON,JSONB

STRING

io.debezium.data.Json

Contains the string representation of a JSON document, array, or scalar.

XML

STRING

io.debezium.data.Xml

Contains the string representation of an XML document.

UUID

STRING

io.debezium.data.Uuid

Contains the string representation of a PostgreSQL UUID value.

POINT

STRUCT

io.debezium.data.geometry.Point

Contains a structure with twoFLOAT64fields,(x,y)。Each field represents the coordinates of a geometric point.

LTREE

STRING

io.debezium.data.Ltree

Contains the string representation of a PostgreSQL LTREE value.

CITEXT

STRING

n/a

INET

STRING

n/a

INT4RANGE

STRING

n/a

Range of integer.

INT8RANGE

STRING

n/a

Range ofbigint

NUMRANGE

STRING

n/a

Range ofnumeric

TSRANGE

STRING

n/a

Contains the string representation of a timestamp range without a time zone.

TSTZRANGE

STRING

n/a

Contains the string representation of a timestamp range with the local system time zone.

DATERANGE

STRING

n/a

Contains the string representation of a date range. It always has an exclusive upper-bound.

ENUM

STRING

io.debezium.data.Enum

Contains the string representation of the PostgreSQLENUMvalue. The set of allowed values is maintained in theallowedschema parameter.

Temporal types

Other than PostgreSQL’sTIMESTAMPTZandTIMETZdata types, which contain time zone information, how temporal types are mapped depends on the value of thetime.precision.modeconnector configuration property. The following sections describe these mappings:

time.precision.mode=adaptive

When thetime.precision.modeproperty is set toadaptive, the default, the connector determines the literal type and semantic type based on the column’s data type definition. This ensures that eventsexactlyrepresent the values in the database.

Table 10. Mappings whentime.precision.modeisadaptive
PostgreSQL data type Literal type (schema type) Semantic type (schema name) and Notes

DATE

INT32

io.debezium.time.Date

Represents the number of days since the epoch.

TIME(1),TIME(2),TIME(3)

INT32

io.debezium.time.Time

Represents the number of milliseconds past midnight, and does not include timezone information.

TIME(4),TIME(5),TIME(6)

INT64

io.debezium.time.MicroTime

Represents the number of microseconds past midnight, and does not include timezone information.

TIMESTAMP(1),TIMESTAMP(2),TIMESTAMP(3)

INT64

io.debezium.time.Timestamp

Represents the number of milliseconds since the epoch, and does not include timezone information.

TIMESTAMP(4),TIMESTAMP(5),TIMESTAMP(6),TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

Represents the number of microseconds since the epoch, and does not include timezone information.

time.precision.mode=adaptive_time_microseconds

When thetime.precision.modeconfiguration property is set toadaptive_time_microseconds, the connector determines the literal type and semantic type for temporal types based on the column’s data type definition. This ensures that eventsexactlyrepresent the values in the database, except allTIMEfields are captured as microseconds.

Table 11. Mappings whentime.precision.modeisadaptive_time_microseconds
PostgreSQL data type Literal type (schema type) Semantic type (schema name) and Notes

DATE

INT32

io.debezium.time.Date

Represents the number of days since the epoch.

TIME([P])

INT64

io.debezium.time.MicroTime

Represents the time value in microseconds and does not include timezone information. PostgreSQL allows precisionPto be in the range 0-6 to store up to microsecond precision.

TIMESTAMP(1),TIMESTAMP(2),TIMESTAMP(3)

INT64

io.debezium.time.Timestamp

Represents the number of milliseconds past the epoch, and does not include timezone information.

TIMESTAMP(4),TIMESTAMP(5),TIMESTAMP(6),TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

Represents the number of microseconds past the epoch, and does not include timezone information.

time.precision.mode=connect

When thetime.precision.modeconfiguration property is set toconnect, the connector uses Kafka Connect logical types. This may be useful when consumers can handle only the built-in Kafka Connect logical types and are unable to handle variable-precision time values. However, since PostgreSQL supports microsecond precision, the events generated by a connector with theconnecttime precision moderesults in a loss of precisionwhen the database column has afractional second precisionvalue that is greater than 3.

Table 12. Mappings whentime.precision.modeisconnect
PostgreSQL data type Literal type (schema type) Semantic type (schema name) and Notes

DATE

INT32

org.apache.kafka.connect.data.Date

Represents the number of days since the epoch.

TIME([P])

INT64

org.apache.kafka.connect.data.Time

Represents the number of milliseconds since midnight, and does not include timezone information. PostgreSQL allowsPto be in the range 0-6 to store up to microsecond precision, though this mode results in a loss of precision whenPis greater than 3.

TIMESTAMP([P])

INT64

org.apache.kafka.connect.data.Timestamp

Represents the number of milliseconds since the epoch, and does not include timezone information. PostgreSQL allowsPto be in the range 0-6 to store up to microsecond precision, though this mode results in a loss of precision whenPis greater than 3.

TIMESTAMP type

TheTIMESTAMP类型代表没有时区的时间戳信息rmation. Such columns are converted into an equivalent Kafka Connect value based on UTC. For example, theTIMESTAMPvalue "2018-06-20 15:13:16.945104" is represented by anio.debezium.time.MicroTimestampwith the value "1529507596945104" whentime.precision.modeis not set toconnect

The timezone of the JVM running Kafka Connect and Debezium does not affect this conversion.

PostgreSQL supports using+/-infinitevalues inTIMESTAMPcolumns. These special values are converted to timestamps with value9223372036825200000in case of positive infinity or-9223372036832400000in case of negative infinity. This behaviour mimics the standard behaviour of PostgreSQL JDBC driver - seeorg.postgresql.PGStatementinterface for reference.

Decimal types

The setting of the PostgreSQL connector configuration property,decimal.handling.modedetermines how the connector maps decimal types.

When thedecimal.handling.modeproperty is set toprecise, the connector uses the Kafka Connectorg.apache.kafka.connect.data.Decimallogical type for allDECIMALandNUMERICcolumns. This is the default mode.

Table 13. Mappings whendecimal.handling.modeisprecise
PostgreSQL data type Literal type (schema type) Semantic type (schema name) and Notes

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

Thescaleschema parameter contains an integer representing how many digits the decimal point was shifted.

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

Thescaleschema parameter contains an integer representing how many digits the decimal point was shifted.

There is an exception to this rule. When theNUMERICorDECIMALtypes are used without scale constraints, the values coming from the database have a different (variable) scale for each value. In this case, the connector usesio.debezium.data.VariableScaleDecimal, which contains both the value and the scale of the transferred value.

Table 14. Mappings of decimal types when there are no scale constraints
PostgreSQL data type Literal type (schema type) Semantic type (schema name) and Notes

NUMERIC

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields:scaleof typeINT32that contains the scale of the transferred value andvalueof typeBYTEScontaining the original value in an unscaled form.

DECIMAL

STRUCT

io.debezium.data.VariableScaleDecimal

Contains a structure with two fields:scaleof typeINT32that contains the scale of the transferred value andvalueof typeBYTEScontaining the original value in an unscaled form.

When thedecimal.handling.modeproperty is set todouble, the connector represents allDECIMALandNUMERICvalues as Java double values and encodes them as shown in the following table.

Table 15. Mappings whendecimal.handling.modeisdouble
PostgreSQL data type Literal type (schema type) Semantic type (schema name)

NUMERIC[(M[,D])]

FLOAT64

DECIMAL[(M[,D])]

FLOAT64

The last possible setting for thedecimal.handling.modeconfiguration property isstring。In this case, the connector representsDECIMALandNUMERICvalues as their formatted string representation, and encodes them as shown in the following table.

Table 16. Mappings whendecimal.handling.modeisstring
PostgreSQL data type Literal type (schema type) Semantic type (schema name)

NUMERIC[(M[,D])]

STRING

DECIMAL[(M[,D])]

STRING

PostgreSQL supportsNaN(not a number) as a special value to be stored inDECIMAL/NUMERICvalues when the setting ofdecimal.handling.modeisstringordouble。In this case, the connector encodesNaNas eitherDouble.NaNor the string constantNAN

HSTORE type

When thehstore.handling.modeconnector configuration property is set tojson(the default), the connector representsHSTOREvalues as string representations of JSON values and encodes them as shown in the following table. When thehstore.handling.modeproperty is set tomap, the connector uses theMAPschema type forHSTOREvalues.

Table 17. Mappings forHSTOREdata type
PostgreSQL data type Literal type (schema type) Semantic type (schema name) and Notes

HSTORE

STRING

io.debezium.data.Json

Example: output representation using the JSON converter is{\"key\" : \"val\"}

HSTORE

MAP

n/a

Example: output representation using the JSON converter is{"key" : "val"}

Domain types

PostgreSQL supports user-defined types that are based on other underlying types. When such column types are used, Debezium exposes the column’s representation based on the full type hierarchy.

捕获的变化列使用PostgreSQL domain types requires special consideration. When a column is defined to contain a domain type that extends one of the default database types and the domain type defines a custom length or scale, the generated schema inherits that defined length or scale.

When a column is defined to contain a domain type that extends another domain type that defines a custom length or scale, the generated schema doesnotinherit the defined length or scale because that information is not available in the PostgreSQL driver’s column metadata.

Network address types

PostgreSQL has data types that can store IPv4, IPv6, and MAC addresses. It is better to use these types instead of plain text types to store network addresses. Network address types offer input error checking and specialized operators and functions.

Table 18. Mappings for network address types
PostgreSQL data type Literal type (schema type) Semantic type (schema name) and Notes

INET

STRING

n/a

IPv4 and IPv6 networks

CIDR

STRING

n/a

IPv4 and IPv6 hosts and networks

MACADDR

STRING

n/a

MAC addresses

MACADDR8

STRING

n/a

MAC addresses in EUI-64 format

PostGIS types

The PostgreSQL connector supports allPostGIS data types

Table 19. Mappings of PostGIS data types
PostGIS data type Literal type (schema type) Semantic type (schema name) and Notes

GEOMETRY
(planar)

STRUCT

io.debezium.data.geometry.Geometry

Contains a structure with two fields:

  • srid (INT32)- Spatial Reference System Identifier that defines what type of geometry object is stored in the structure.

  • wkb (BYTES)- A binary representation of the geometry object encoded in the Well-Known-Binary format.

GEOGRAPHY
(spherical)

STRUCT

io.debezium.data.geometry.Geography

Contains a structure with two fields:

  • srid (INT32)- Spatial Reference System Identifier that defines what type of geography object is stored in the structure.

  • wkb (BYTES)- A binary representation of the geometry object encoded in the Well-Known-Binary format.

Toasted values

PostgreSQL has a hard limit on the page size. This means that values that are larger than around 8 KBs need to be stored by using link::https://www.postgresql.org/docs/current/storage-toast.html[TOAST storage]. This impacts replication messages that are coming from the database. Values that were stored by using the TOAST mechanism and that have not been changed are not included in the message, unless they are part of the table’s replica identity. There is no safe way for Debezium to read the missing value out-of-bands directly from the database, as this would potentially lead to race conditions. Consequently, Debezium follows these rules to handle toasted values:

  • Tables withREPLICA IDENTITY FULL- TOAST column values are part of thebeforeandafterfields in change events just like any other column.

  • Tables withREPLICA IDENTITY DEFAULT- When receiving anUPDATEevent from the database, any unchanged TOAST column value that is not part of the replica identity is not contained in the event. Similarly, when receiving aDELETEevent, no TOAST columns, if any, are in thebeforefield. As Debezium cannot safely provide the column value in this case, the connector returns a placeholder value as defined by the connector configuration property,toasted.value.placeholder

There is a problem related to Amazon RDS instances. Thewal2jsonplug-in has evolved over the time and there were releases that provided out-of-band toasted values. Amazon supports different versions of the plug-in for different PostgreSQL versions. SeeAmazon’s documentationto obtain version to version mapping. For consistent toasted values handling:

  • Use thepgoutputplug-in for PostgreSQL 10+ instances.

  • Setinclude-unchanged-toast=0for older versions of thewal2jsonplug-in by using theslot.stream.paramsconfiguration option.

Default values

If a default value is specified for a column in the database schema, the PostgreSQL connector will attempt to propagate this value to the Kafka schema whenever possible. Most common data types are supported, including:

  • BOOLEAN

  • Numeric types (INT,FLOAT,NUMERIC, etc.)

  • Text types (CHAR,VARCHAR,TEXT, etc.)

  • Temporal types (DATE,TIME,INTERVAL,TIMESTAMP,TIMESTAMPTZ)

  • JSON,JSONB,XML

  • UUID

Note that for temporal types, parsing of the default value is provided by PostgreSQL libraries; therefore, any string representation which is normally supported by PostgreSQL should also be supported by the connector.

In the case that the default value is generated by a function rather than being directly specified in-line, the connector will instead export the equivalent of0for the given data type. These values include:

  • FALSEforBOOLEAN

  • 0with appropriate precision, for numeric types

  • Empty string for text/XML types

  • {}for JSON types

  • 1970-01-01forDATE,TIMESTAMP,TIMESTAMPTZtypes

  • 00:00forTIME

  • EPOCHforINTERVAL

  • 00000000-0000-0000-0000-000000000000forUUID

This support currently extends only to explicit usage of functions. For example,CURRENT_TIMESTAMP(6)is supported with parentheses, butCURRENT_TIMESTAMPis not.

Support for the propagation of default values exists primarily to allow for safe schema evolution when using the PostgreSQL connector with a schema registry which enforces compatibility between schema versions. Due to this primary concern, as well as the refresh behaviours of the different plug-ins, the default value present in the Kafka schema is not guaranteed to always be in-sync with the default value in the database schema.

  • Default values may appear 'late' in the Kafka schema, depending on when/how a given plugin triggers refresh of the in-memory schema. Values may never appear/be skipped in the Kafka schema if the default changes multiple times in-between refreshes

  • Default values may appear 'early' in the Kafka schema, if a schema refresh is triggered while the connector has records waiting to be processed. This is due to the column metadata being read from the database at refresh time, rather than being present in the replication message. This may occur if the connector is behind and a refresh occurs, or on connector start if the connector was stopped for a time while updates continued to be written to the source database.

This behaviour may be unexpected, but it is still safe. Only the schema definition is affected, while the real values present in the message will remain consistent with what was written to the source database.

Set up

Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server, decide which logical decoding plug-in you intend to use. If you plannotto use the nativepgoutputlogical replication stream support, then you must install the logical decoding plug-in into the PostgreSQL server. Afterward, enable a replication slot, and configure a user with sufficient privileges to perform the replication.

If your database is hosted by a service such asHeroku Postgresyou might be unable to install the plug-in. If so, and if you are using PostgreSQL 10+, you can use thepgoutputdecoder support to capture changes in your database. If that is not an option, you are unable to use Debezium with your database.

PostgreSQL in the Cloud

PostgreSQL on Amazon RDS

It is possible to capture changes in a PostgreSQL database that is running inAmazon RDS。To do this:

  • Set the instance parameterrds.logical_replicationto1

  • Verify that thewal_levelparameter is set tologicalby running the querySHOW wal_levelas the database RDS master user. This might not be the case in multi-zone replication setups. You cannot set this option manually. It isautomatically changedwhen therds.logical_replicationparameter is set to1。If thewal_levelis not set tologicalafter you make the preceding change, it is probably because the instance has to be restarted after the parameter group change. Restarts occur during your maintenance window, or you can initiate a restart manually.

  • Set the Debeziumplugin.nameparameter towal2json。You can skip this on PostgreSQL 10+ if you plan to usepgoutputlogical replication stream support.

  • Initiate logical replication from an AWS account that has therds_replicationrole. The role grants permissions to manage logical slots and to stream data using logical slots. By default, only the master user account on AWS has therds_replicationrole on Amazon RDS. To enable a user account other than the master account to initiate logical replication, you must grant the account therds_replicationrole. For example,grant rds_replication to。You must havesuperuseraccess to grant therds_replicationrole to a user. To enable accounts other than the master account to create an initial snapshot, you must grantSELECTpermission to the accounts on the tables to be captured. For more information about security for PostgreSQL logical replication, see thePostgreSQL documentation

Ensure that you use the latest versions of PostgreSQL 9.6, 10 or 11 on Amazon RDS. Otherwise, older versions of thewal2jsonplug-in might be installed. Seethe official documentationfor the exactwal2jsonversions installed on Amazon RDS. In the case of an older version, replication messages received from the database might not contain complete information about type constraints such as length or scale orNULL/NOT NULL。This might cause creation of messages with an inconsistent schema for a short period of time when there are changes to a column’s definition.

As of January 2019, the following PostgreSQL versions on RDS come with an up-to-date version ofwal2jsonand thus should be used:

  • PostgreSQL 9.6: 9.6.10和更新

  • PostgreSQL 10: 10.5 and newer

  • PostgreSQL 11: any version

PostgreSQL on Azure

It is possible to use Debezium with Azure Database for PostgreSQL, which has support for thewal2jsonand pgoutput plug-ins, both of which are supported by Debezium as well.

Set the Azure replication support to logical。You can use theAzure CLIor theAzure Portalto configure this. For example, to use the Azure CLI, here are theaz postgres servercommands that you need to execute:

az postgres server configuration set --resource-group mygroup --server-name myserver --name azure.replication_support --value logical az postgres server restart --resource-group mygroup --name myserver

PostgreSQL on CrunchyBridge

It is possible to use Debezium withCrunchyBridge; logical replication is already turned on. Thepgoutputplugin is available. You will have to create a replication user and provide correct privileges.

While using thepgoutputplug-in, it is recommended that you configurefilteredas thepublication.autocreate.mode。If you useall_tables, which is the default value forpublication.autocreate.mode, and the publication is not found, the connector tries to create one by using CREATE PUBLICATION FOR ALL TABLES;, but this fails due to lack of permissions.

Installing the logical decoding output plug-in

SeeLogical Decoding Output Plug-in Installation for PostgreSQLfor more detailed instructions for setting up and testing logical decoding plug-ins.

As of Debezium 0.10, the connector supports PostgreSQL 10+ logical replication streaming by usingpgoutput。This means that a logical decoding output plug-in is no longer necessary and changes can be emitted directly from the replication stream by the connector.

As of PostgreSQL 9.4, the only way to read changes to the write-ahead-log is to install a logical decoding output plug-in. Plug-ins are written in C, compiled, and installed on the machine that runs the PostgreSQL server. Plug-ins use a number of PostgreSQL specific APIs, as described by thePostgreSQL documentation

The PostgreSQL connector works with one of Debezium’s supported logical decoding plug-ins to encode the changes in eitherProtobuf formatorJSONformat. See the documentation for your chosen plug-in to learn more about the plug-in’s requirements, limitations, and how to compile it.

For simplicity, Debezium also provides a Docker image based on a vanilla PostgreSQL server image on top of which it compiles and installs the plug-ins. You canuse this imageas an example of the detailed steps required for the installation.

The Debezium logical decoding plug-ins have been installed and tested on only Linux machines. For Windows and other operating systems, different installation steps might be required.

Plug-in differences

Plug-in behavior is not completely the same for all cases. These differences have been identified:

  • Thewal2jsonanddecoderbufsplug-ins emit events for tables without primary keys.

  • Thewal2jsonplug-in does not support special values, such asNaNorinfinity为浮点类型。

  • Thewal2jsonplug-in should be used with theschema.refresh.modeconnector configuration property set tocolumns_diff_exclude_unchanged_toast。Otherwise, when receiving a change event for a row that contains an unchangedTOASTcolumn, no field for that column is contained in the emitted change event’safterfield. This is becausewal2jsonplug-in messages do not contain a field for such a column.

    The requirement for adding this is tracked under thewal2jsonissue 98。See the documentation ofcolumns_diff_exclude_unchanged_toastfurther below for implications of using it.

  • Thepgoutputplug-in does not emit all events for tables without primary keys. It emits only events forINSERToperations.

  • While all plug-ins will refresh schema metadata from the database upon detection of a schema change during streaming, thepgoutputplug-in is somewhat more 'eager' about triggering such refreshes. For example, a change to the default value for a column will trigger a refresh withpgoutput, while other plug-ins will not be aware of this change until another change triggers a refresh (eg. addition of a new column.) This is due to the behaviour ofpgoutput, rather than Debezium itself.

All up-to-date differences are tracked in a test suiteJava class

Configuring the PostgreSQL server

If you are using alogical decoding plug-inother than pgoutput, after installing it, configure the PostgreSQL server as follows:

  1. To load the plug-in at startup, add the following to thepostgresql.conffile::

    # MODULES shared_preload_libraries = 'decoderbufs,wal2json'(1)
    1 Instructs the server to load thedecoderbufsandwal2jsonlogical decoding plug-ins at startup. The names of the plug-ins are set in theProtobufandwal2jsonmake files.
  2. To configure the replication slot regardless of the decoder being used, specify the following in thepostgresql.conffile:

    # REPLICATION wal_level = logical(1)max_wal_senders = 1(2)max_replication_slots = 1(3)
    1 instructs the server to use logical decoding with the write-ahead log.
    2 instructs the server to use a maximum of1separate process for processing WAL changes.
    3 instructs the server to allow a maximum of1replication slot to be created for streaming WAL changes.

Debezium uses PostgreSQL’s logical decoding, which uses replication slots. Replication slots are guaranteed to retain all WAL segments required for Debezium even during Debezium outages. For this reason, it is important to closely monitor replication slots to avoid too much disk consumption and other conditions that can happen such as catalog bloat if a replication slot stays unused for too long. For more information, see thePostgreSQL streaming replication documentation

If you are working with asynchronous_commitsetting other thanon, the recommendation is to setwal_writer_delayto a value such as 10 milliseconds to achieve a low latency of change events. Otherwise, its default value is applied, which adds a latency of about 200 milliseconds.

Setting up permissions

Setting up a PostgreSQL server to run a Debezium connector requires a database user that can perform replications. Replication can be performed only by a database user that has appropriate permissions and only for a configured number of hosts.

Although, by default, superusers have the necessaryREPLICATIONandLOGINroles, as mentioned inSecurity, it is best not to provide the Debezium replication user with elevated privileges. Instead, create a Debezium user that has the the minimum required privileges.

Prerequisites
  • PostgreSQL administrative permissions.

Procedure
  1. To provide a user with replication permissions, define a PostgreSQL role that hasat leasttheREPLICATIONandLOGINpermissions, and then grant that role to the user. For example:

    CREATE ROLEREPLICATION LOGIN;

Setting privileges to enable Debezium to create PostgreSQL publications when you usepgoutput

If you usepgoutputas the logical decoding plugin, Debezium must operate in the database as a user with specific privileges.

Debezium streams change events for PostgreSQL source tables frompublications创建的表。出版物续ain a filtered set of change events that are generated from one or more tables. The data in each publication is filtered based on the publication specification. The specification can be created by the PostgreSQL database administrator or by the Debezium connector. To permit the Debezium PostgreSQL connector to create publications and specify the data to replicate to them, the connector must operate with specific privileges in the database.

There are several options for determining how publications are created. In general, it is best to manually create publications for the tables that you want to capture, before you set up the connector. However, you can configure your environment in a way that permits Debezium to create publications automatically, and to specify the data that is added to them.

Debezium uses include list and exclude list properties to specify how data is inserted in the publication. For more information about the options for enabling Debezium to create publications, seepublication.autocreate.mode

For Debezium to create a PostgreSQL publication, it must run as a user that has the following privileges:

  • Replication privileges in the database to add the table to a publication.

  • CREATEprivileges on the database to add publications.

  • SELECTprivileges on the tables to copy the initial table data. Table owners automatically haveSELECTpermission for the table.

To add tables to a publication, the user be an owner of the table. But because the source table already exists, you need a mechanism to share ownership with the original owner. To enable shared ownership, you create a PostgreSQL replication group, and then add the existing table owner and the replication user to the group.

Procedure
  1. Create a replication group.

    CREATE ROLE;
  2. Add the original owner of the table to the group.

    GRANT REPLICATION_GROUP TO;
  3. 加德bezium replication user to the group.

    GRANT REPLICATION_GROUP TO;
  4. Transfer ownership of the table to

    ALTER TABLEOWNER TO REPLICATION_GROUP;

For Debezium to specify the capture configuration, the value ofpublication.autocreate.modemust be set tofiltered

Configuring PostgreSQL to allow replication with the Debezium connector host

To enable Debezium to replicate PostgreSQL data, you must configure the database to permit replication with the host that runs the PostgreSQL connector. To specify the clients that are permitted to replicate with the database, add entries to the PostgreSQL host-based authentication file,pg_hba.conf。For more information about thepg_hba.conffile, seethe PostgreSQLdocumentation.

Procedure
  • Add entries to thepg_hba.conffile to specify the Debezium connector hosts that can replicate with the database host. For example,

    pg_hba.conffile example:
    local replication  trust(1)host replication  127.0.0.1/32 trust(2)host replication  ::1/128 trust(3)
    1 Instructs the server to allow replication forlocally, that is, on the server machine.
    2 Instructs the server to allowonlocalhostto receive replication changes usingIPV4
    3 Instructs the server to allowonlocalhostto receive replication changes usingIPV6

For more information about network masks, seethe PostgreSQL documentation

Supported PostgreSQL topologies

The PostgreSQL connector can be used with a standalone PostgreSQL server or with a cluster of PostgreSQL servers.

As mentionedin the beginning, PostgreSQL (for all versions ⇐ 12) supports logical replication slots on onlyprimaryservers. This means that a replica in a PostgreSQL cluster cannot be configured for logical replication, and consequently that the Debezium PostgreSQL connector can connect and communicate with only the primary server. Should this server fail, the connector stops. When the cluster is repaired, if the original primary server is once again promoted toprimary, you can restart the connector. However, if a different PostgreSQL serverwith the plug-in and proper configuration将提升primary, you must change the connector configuration to point to the newprimaryserver and then you can restart the connector.

WAL disk space consumption

In certain cases, it is possible for PostgreSQL disk space consumed by WAL files to spike or increase out of usual proportions. There are several possible reasons for this situation:

  • The LSN up to which the connector has received data is available in theconfirmed_flush_lsncolumn of the server’spg_replication_slotsview. Data that is older than this LSN is no longer available, and the database is responsible for reclaiming the disk space.

    Also in thepg_replication_slotsview, therestart_lsn列最古老的LSN WAL的connector might require. If the value forconfirmed_flush_lsnis regularly increasing and the value ofrestart_lsnlags then the database needs to reclaim the space.

    The database typically reclaims disk space in batch blocks. This is expected behavior and no action by a user is necessary.

  • There are many updates in a database that is being tracked but only a tiny number of updates are related to the table(s) and schema(s) for which the connector is capturing changes. This situation can be easily solved with periodic heartbeat events. Set theheartbeat.interval.msconnector configuration property.

  • The PostgreSQL instance contains multiple databases and one of them is a high-traffic database. Debezium captures changes in another database that is low-traffic in comparison to the other database. Debezium then cannot confirm the LSN as replication slots work per-database and Debezium is not invoked. As WAL is shared by all databases, the amount used tends to grow until an event is emitted by the database for which Debezium is capturing changes. To overcome this, it is necessary to:

    • Enable periodic heartbeat record generation with theheartbeat.interval.msconnector configuration property.

    • Regularly emit change events from the database for which Debezium is capturing changes.

      In the case ofwal2jsondecoder plug-in, it is sufficient to generate empty events. This can be achieved for example by truncating an empty temporary table. For other decoder plug-ins, the recommendation is to create a supplementary table for which Debezium is not capturing changes.

    A separate process would then periodically update the table by either inserting a new row or repeatedly updating the same row. PostgreSQL then invokes Debezium, which confirms the latest LSN and allows the database to reclaim the WAL space. This task can be automated by means of theheartbeat.action.queryconnector configuration property.

For users on AWS RDS with PostgreSQL, a situation similar to the high traffic/low traffic scenario can occur in an idle environment. AWS RDS causes writes to its own system tables to be invisible to clients on a frequent basis (5 minutes). Again, regularly emitting events solves the problem.

Deployment

To deploy a Debezium PostgreSQL connector, you install the Debezium PostgreSQL connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.

Prerequisites
Procedure
  1. Download the DebeziumPostgreSQL connector plug-in archive

  2. Extract the files into your Kafka Connect environment.

  3. Add the directory with the JAR files toKafka Connect’splugin.path

  4. Restart your Kafka Connect process to pick up the new JAR files.

If you are working with immutable containers, seeDebezium’s Container imagesfor Zookeeper, Kafka, PostgreSQL and Kafka Connect with the PostgreSQL connector already installed and ready to run. You can alsorun Debezium on Kubernetes and OpenShift

Connector configuration example

Following is an example of the configuration for a PostgreSQL connector that connects to a PostgreSQL server on port 5432 at 192.168.99.100, whose logical name isfulfillment。Typically, you configure the Debezium PostgreSQL connector in a JSON file by setting the configuration properties available for the connector.

You can choose to produce events for a subset of the schemas and tables in a database. Optionally, you can ignore, mask, or truncate columns that contain sensitive data, are larger than a specified size, or that you do not need.

{ "name": "fulfillment-connector",(1)"config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector",(2)"database.hostname": "192.168.99.100",(3)"database.port": "5432",(4)"database.user": "postgres",(5)"database.password": "postgres",(6)"database.dbname" : "postgres",(7)"database.server.name": "fulfillment",(8)"table.include.list": "public.inventory"(9)} }
1 The name of the connector when registered with a Kafka Connect service.
2 The name of this PostgreSQL connector class.
3 The address of the PostgreSQL server.
4 The port number of the PostgreSQL server.
5 The name of the PostgreSQL user that has therequired privileges
6 The password for the PostgreSQL user that has therequired privileges
7 The name of the PostgreSQL database to connect to
8 The logical name of the PostgreSQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
9 A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.

See thecomplete list of PostgreSQL connector propertiesthat can be specified in these configurations.

You can send this configuration with aPOSTcommand to a running Kafka Connect service. The service records the configuration and starts one connector task that performs the following actions:

  • Connects to the PostgreSQL database.

  • Reads the transaction log.

  • Streams change event records to Kafka topics.

Adding connector configuration

To run a Debezium PostgreSQL connector, create a connector configuration and add the configuration to your Kafka Connect cluster. .Prerequisites

Procedure
  1. Create a configuration for the PostgreSQL connector.

  2. Use theKafka Connect REST APIto add that connector configuration to your Kafka Connect cluster.

Results

When the connector starts, itperforms a consistent snapshotof the PostgreSQL server databases that the connector is configured for. The connector then starts generating data change events for row-level operations and streaming change event records to Kafka topics.

Connector configuration properties

The Debezium PostgreSQL connector has many configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:

The following configuration properties arerequiredunless a default value is available.

Table 20. Required connector configuration properties
Property Default Description

Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors.

The name of the Java class for the connector. Always use a value ofio.debezium.connector.postgresql.PostgresConnectorfor the PostgreSQL connector.

1

The maximum number of tasks that should be created for this connector. The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable.

decoderbufs

The name of the PostgreSQLlogical decoding plug-ininstalled on the PostgreSQL server.

Supported values aredecoderbufs,wal2json,wal2json_rds,wal2json_streaming,wal2json_rds_streamingandpgoutput

If you are using awal2jsonplug-in and transactions are very large, the JSON batch event that contains all transaction changes might not fit into the hard-coded memory buffer, which has a size of 1 GB. In such cases, switch to a streaming plug-in, by setting theplugin-nameproperty towal2json_streamingorwal2json_rds_streaming。With a streaming plug-in, PostgreSQL sends the connector a separate message for each change in a transaction.

开云体育官方注册网址

The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the Debezium connector that you are configuring.

Slot names must conform toPostgreSQL replication slot naming rules, which state:"Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."

false

是否要删除逻辑复制lot when the connector stops in a graceful, expected way. The default behavior is that the replication slot remains configured for the connector when the connector stops. When the connector restarts, having the same replication slot enables the connector to start processing where it left off.

Set totruein only testing or development environments. Dropping the slot allows the database to discard WAL segments. When the connector restarts it performs a new snapshot or it can continue from a persistent offset in the Kafka Connect offsets topic.

dbz_publication

The name of the PostgreSQL publication created for streaming changes when usingpgoutput

This publication is created at start-up if it does not already exist and it includesall tables。Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to change events for the specific tables of interest. The connector user must have superuser permissions to create this publication, so it is usually preferable to create the publication before starting the connector for the first time.

If the publication already exists, either for all tables or configured with a subset of tables, Debezium uses the publication as it is defined.

IP address or hostname of the PostgreSQL database server.

5432

Integer port number of the PostgreSQL database server.

Name of the PostgreSQL database user for connecting to the PostgreSQL database server.

Password to use when connecting to the PostgreSQL database server.

The name of the PostgreSQL database from which to stream the changes.

Logical name that identifies and provides a namespace for the particular PostgreSQL database server or cluster in which Debezium is capturing changes. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name. The logical name should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector.

一个可选,以逗号分隔的正则表达ssions that match names of schemas for which youwantto capture changes. Any schema name not included inschema.include.listis excluded from having its changes captured. By default, all non-system schemas have their changes captured. Do not also set theschema.exclude.listproperty.

一个可选,以逗号分隔的正则表达ssions that match names of schemas for which youdo notwant to capture changes. Any schema whose name is not included inschema.exclude.listhas its changes captured, with the exception of system schemas. Do not also set theschema.include.listproperty.

一个可选,以逗号分隔的正则表达ssions that match fully-qualified table identifiers for tables whose changes you want to capture. Any table not included intable.include.listdoes not have its changes captured. Each identifier is of the formschemaNametableName。By default, the connector captures changes in every non-system table in each schema whose changes are being captured. Do not also set thetable.exclude.listproperty.

一个可选,以逗号分隔的正则表达ssions that match fully-qualified table identifiers for tables whose changes youdo notwant to capture. Any table not included intable.exclude.listhas it changes captured. Each identifier is of the formschemaNametableName。Do not also set thetable.include.listproperty.

一个可选,以逗号分隔的正则表达ssions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the formschemaNametableNamecolumnName。Do not also set thecolumn.exclude.listproperty.

一个可选,以逗号分隔的正则表达ssions that match the fully-qualified names of columns that should be excluded from change event record values. Fully-qualified names for columns are of the formschemaNametableNamecolumnName。Do not also set thecolumn.include.listproperty.

adaptive

Time, date, and timestamps can be represented with different kinds of precision:

adaptivecaptures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type.

adaptive_time_microseconds抓住了日期,日期时间和时间戳值exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column’s type. An exception isTIMEtype fields, which are always captured as microseconds.

connectalways represents time and timestamp values by using Kafka Connect’s built-in representations forTime,Date, andTimestamp, which use millisecond precision regardless of the database columns' precision. See时间的价值s

precise

Specifies how the connector should handle values forDECIMALandNUMERICcolumns:

preciserepresents values by usingjava.math.BigDecimalto represent values in binary form in change events.

doublerepresents values by usingdoublevalues, which might result in a loss of precision but which is easier to use.

stringencodes values as formatted strings, which are easy to consume but semantic information about the real type is lost. SeeDecimal types

map

Specifies how the connector should handle values forhstorecolumns:

maprepresents values by usingMAP

jsonrepresents values by usingjson string。This setting encodes values as formatted strings such as{"key" : "val"}。SeePostgreSQLHSTOREtype

numeric

Specifies how the connector should handle values forintervalcolumns:

numericrepresents intervals using approximate number of microseconds.

stringrepresents intervals exactly by using the string pattern representationPYMDTHMS。For example:P1Y2M3DT4H5M6.78S。SeePostgreSQL basic types

disable

Whether to use an encrypted connection to the PostgreSQL server. Options include:

disableuses an unencrypted connection.

requireuses a secure (encrypted) connection, and fails if one cannot be established.

verify-cabehaves likerequirebut also verifies the server TLS certificate against the configured Certificate Authority (CA) certificates, or fails if no valid matching CA certificates are found.

verify-fullbehaves likeverify-cabut also verifies that the server certificate matches the host to which the connector is trying to connect. Seethe PostgreSQL documentationfor more information.

The path to the file that contains the SSL certificate for the client. Seethe PostgreSQL documentationfor more information.

The path to the file that contains the SSL private key of the client. Seethe PostgreSQL documentationfor more information.

The password to access the client private key from the file specified bydatabase.sslkey。Seethe PostgreSQL documentationfor more information.

The path to the file that contains the root certificate(s) against which the server is validated. Seethe PostgreSQL documentationfor more information.

true

Enable TCP keep-alive probe to verify that the database connection is still alive. Seethe PostgreSQL documentationfor more information.

true

Controls whether adeleteevent is followed by a tombstone event.

true- a delete operation is represented by adeleteevent and a subsequent tombstone event.

false- only adeleteevent is emitted.

After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in caselog compactionis enabled for the topic.

n/a

一个可选,以逗号分隔的正则表达ssions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the formschemaNametableNamecolumnName。In change event records, values in these columns are truncated if they are longer than the number of characters specified bylengthin the property name. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer, for example,+column.truncate.to.20.chars

n/a

一个可选,以逗号分隔的正则表达ssions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the formschemaNametableNamecolumnName。In change event values, the values in the specified table columns are replaced withlengthnumber of asterisk (*) characters. You can specify multiple properties with different lengths in a single configuration. Length must be a positive integer or zero. When you specify zero, the connector replaces a value with an empty string.

n/a

一个可选,以逗号分隔的正则表达ssions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the formschemaNametableNamecolumnName。In the resulting change event record, the values for the specified columns are replaced with pseudonyms.

A pseudonym consists of the hashed value that results from applying the specifiedhashAlgorithmandsalt。Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in theMessageDigest sectionof the Java Cryptography Architecture Standard Algorithm Name Documentation.

In the following example,CzQMA0cB5Kis a randomly selected salt.

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

If necessary, the pseudonym is automatically shortened to the length of the column. The connector configuration can include multiple properties that specify different hash algorithms and salts.

Depending on thehashAlgorithmused, thesaltselected, and the actual data set, the resulting data set might not be completely masked.

n/a

一个可选,以逗号分隔的正则表达ssions that match the fully-qualified names of columns. Fully-qualified names for columns are of the formdatabaseNametableNamecolumnName, ordatabaseNameschemaNametableNamecolumnName

For each specified column, the connector adds the column’s original type and original length as parameters to the corresponding field schemas in the emitted change records. The following added schema parameters propagate the original type name and also the original length for variable-width types:

__debezium.source.column.type+__debezium.source.column.length+__debezium.source.column.scale

This property is useful for properly sizing corresponding columns in sink databases.

n/a

一个可选,以逗号分隔的正则表达ssions that match the database-specific data type name for some columns. Fully-qualified data type names are of the formdatabaseNametableNametypeName, ordatabaseNameschemaNametableNametypeName

For these data types, the connector adds parameters to the corresponding field schemas in emitted change records. The added parameters specify the original type and length of the column:

__debezium.source.column.type+__debezium.source.column.length+__debezium.source.column.scale

These parameters propagate a column’s original type name and length, for variable-width types, respectively. This property is useful for properly sizing corresponding columns in sink databases.

See thelist of PostgreSQL-specific data type names

empty string

A list of expressions that specify the columns that the connector uses to form custom message keys for change event records that it publishes to the Kafka topics for specified tables.

By default, Debezium uses the primary key column of a table as the message key for records that it emits. In place of the default, or to specify a key for tables that lack a primary key, you can configure custom message keys based on one or more columns.

To establish a custom message key for a table, list the table, followed by the columns to use as the message key. Each list entry takes the following format:

:__,

To base a table key on multiple column names, insert commas between the column names.

Each fully-qualified table name is a regular expression in the following format:



The property can include entries for multiple tables. Use a semicolon to separate table entries in the list.

The following example sets the message key for the tablesinventory.customersandpurchase.orders:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

For the tableinventory.customer, the columnspk1andpk2are specified as the message key. For thepurchaseorderstables in any schema, the columnspk3andpk4server as the message key.

There is no limit to the number of columns that you use to create custom message keys. However, it’s best to use the minimum number that are required to specify a unique key.

all_tables

Applies only when streaming changes by usingthepgoutputplug-in。The setting determines how creation of apublicationshould work. Possible settings are:

all_tables- If a publication exists, the connector uses it. If a publication does not exist, the connector creates a publication for all tables in the database for which the connector is capturing changes. This requires that the database user that has permission to perform replications also has permission to create a publication. This is granted withCREATE PUBLICATION FOR ALL TABLES;

disabledd -连接器oes not attempt to create a publication. A database administrator or the user configured to perform replications must have created the publication before running the connector. If the connector cannot find the publication, the connector throws an exception and stops.

filtered- If a publication exists, the connector uses it. If no publication exists, the connector creates a new publication for tables that match the current filter configuration as specified by thedatabase.exclude.list,schema.include.list,schema.exclude.list, andtable.include.listconnector configuration properties. For example:CREATE PUBLICATION FOR TABLE

bytes

Specifies how binary (bytea) columns should be represented in change events:

bytesrepresents binary data as byte array.

base64represents binary data as base64-encoded strings.

hexrepresents binary data as hex-encoded (base16) strings.

bytes

Specifies how whetherTRUNCATEevents should be propagated or not (only available when using thepgoutput插件与Postgres 11或更高版本):

skipcauses those event to be omitted (the default).

includecauses those events to be included.

For information about the structure oftruncateevents and about their ordering semantics, seetruncateevents

The followingadvancedconfiguration properties have defaults that work in most situations and therefore rarely need to be specified in the connector’s configuration.

Table 21. Advanced connector configuration properties
Property Default Description

initial

Specifies the criteria for performing a snapshot when the connector starts:

initial- The connector performs a snapshot only when no offsets have been recorded for the logical server name.

always- The connector performs a snapshot each time the connector starts.

never- The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN in the Kafka offsets topic, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the point in time when the PostgreSQL logical replication slot was created on the server. Theneversnapshot mode is useful only when you know all data of interest is still reflected in the WAL.

initial_only- The connector performs an initial snapshot and then stops, without processing any subsequent changes.

exported- deprecated

custom- The connector performs a snapshot according to the setting for thesnapshot.custom.classproperty, which is a custom implementation of theio.debezium.connector.postgresql.spi.Snapshotterinterface.

Thereference table for snapshot mode settingshas more details.

A full Java class name that is an implementation of theio.debezium.connector.postgresql.spi.Snapshotterinterface. Required when thesnapshot.modeproperty is set tocustom。Seecustom snapshotter SPI

All tables specified intable.include.list

一个可选,以逗号分隔的正则表达ssions that match names of schemas specified intable.include.listfor which youwantto take the snapshot when thesnapshot.modeis notnever

10000

正整数的值指定的最大值amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If the connector cannot acquire table locks in this time interval, the snapshot fails.How the connector performs snapshotsprovides details.

Controls which table rows are included in snapshots. This property affects snapshots only. It does not affect events that are generated by the logical decoding plug-in. Specify a comma-separated list of fully-qualified table names in the formdatabaseName.tableName

For each table that you specify, also specify another configuration property:snapshot.select.statement.overrides.DB_NAMETABLE_NAME, for example:snapshot.select.statement.overrides.inventory.orders。Set this property to aSELECTstatement that obtains only the rows that you want in the snapshot. When the connector performs a snapshot, it executes thisSELECTstatement to retrieve data from that table.

A possible use case for setting these properties is large, append-only tables. You can specify aSELECTstatement that sets a specific point for where to start a snapshot, or where to resume a snapshot if a previous snapshot was interrupted.

fail

Specifies how the connector should react to exceptions during processing of events:

failpropagates the exception, indicates the offset of the problematic event, and causes the connector to stop.

warnlogs the offset of the problematic event, skips that event, and continues processing.

skipskips the problematic event and continues processing.

20240

Positive integer value for the maximum size of the blocking queue. The connector places change events received from streaming replication in the blocking queue before writing them to Kafka. This queue can provide backpressure when, for example, writing records to Kafka is slower that it should be or Kafka is not available.

10240

正整数的值指定的最大值size of each batch of events that the connector processes.

0

Long value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it’s set with a positive long value.

1000

Positive integer value that specifies the number of milliseconds the connector should wait for new change events to appear before it starts processing a batch of events. Defaults to 1000 milliseconds, or 1 second.

false

Specifies connector behavior when the connector encounters a field whose data type is unknown. The default behavior is that the connector omits the field from the change event and logs a warning.

Set this property totrueif you want the change event to contain an opaque binary representation of the field. This lets consumers decode the field. You can control the exact representation by setting thebinary handling modeproperty.

当消费者风险向后兼容性问题include.unknown.datatypesis set totrue。Not only may the database-specific binary representation change between releases, but if the data type is eventually supported by Debezium, the data type will be sent downstream in a logical type, which would require adjustments by consumers. In general, when encountering unsupported data types, create a feature request so that support can be added.

A semicolon separated list of SQL statements that the connector executes when it establishes a JDBC connection to the database. To use a semicolon as a character and not as a delimiter, specify two consecutive semicolons,;;

The connector may establish JDBC connections at its own discretion. Consequently, this property is useful for configuration of session parameters only, and not for executing DML statements.

The connector does not execute these statements when it creates a connection for reading the transaction log.

10000

Frequency for sending replication connection status updates to the server, given in milliseconds.
The property also controls how frequently the database status is checked to detect a dead connection in case the database was shut down.

0

Controls how frequently the connector sends heartbeat messages to a Kafka topic. The default behavior is that the connector does not send heartbeat messages.

Heartbeat messages are useful for monitoring whether the connector is receiving change events from the database. Heartbeat messages might help decrease the number of change events that need to be re-sent when a connector restarts. To send heartbeat messages, set this property to a positive integer, which indicates the number of milliseconds between heartbeat messages.

Heartbeat messages are needed when there are many updates in a database that is being tracked but only a tiny number of updates are related to the table(s) and schema(s) for which the connector is capturing changes. In this situation, the connector reads from the database transaction log as usual but rarely emits change records to Kafka. This means that no offset updates are committed to Kafka and the connector does not have an opportunity to send the latest retrieved LSN to the database. The database retains WAL files that contain events that have already been processed by the connector. Sending heartbeat messages enables the connector to send the latest retrieved LSN to the database, which allows the database to reclaim disk space being used by no longer needed WAL files.

__debezium-heartbeat

Controls the name of the topic to which the connector sends heartbeat messages. The topic name has this pattern:



For example, if the database server name isfulfillment, the default topic name is__debezium-heartbeat.fulfillment

指定连接器t上执行一个查询he source database when the connector sends a heartbeat message.

This is useful for resolving the situation described inWAL disk space consumption, where capturing changes from a low-traffic database on the same host as a high-traffic database prevents Debezium from processing WAL records and thus acknowledging WAL positions with the database. To address this situation, create a heartbeat table in the low-traffic database, and set this property to a statement that inserts records into that table, for example:

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

This allows the connector to receive changes from the low-traffic database and acknowledge their LSNs, which prevents unbounded WAL growth on the database host.

columns_diff

Specify the conditions that trigger a refresh of the in-memory schema for a table.

columns_diffis the safest mode. It ensures that the in-memory schema stays in sync with the database table’s schema at all times.

columns_diff_exclude_unchanged_toastinstructs the connector to refresh the in-memory schema cache if there is a discrepancy with the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.

This setting can significantly improve connector performance if there are frequently-updated tables that have TOASTed data that are rarely part of updates. However, it is possible for the in-memory schema to become outdated if TOASTable columns are dropped from the table.

An interval in milliseconds that the connector should wait before performing a snapshot when the connector starts. If you are starting multiple connectors in a cluster, this property is useful for avoiding snapshot interruptions, which might cause re-balancing of connectors.

10240

During a snapshot, the connector reads table content in batches of rows. This property specifies the maximum number of rows in a batch.

Semicolon separated list of parameters to pass to the configured logical decoding plug-in. For example,add-tables=public.table,public.table2;include-lsn=true

If you are using thewal2jsonplug-in, this property is useful for enabling server-side table filtering. Allowed values depend on the configured plug-in.

trueif connector configuration sets thekey.converterorvalue.converterproperty to the Avro converter.

falseif not.

Indicates whether field names are sanitized to adhere toAvro naming requirements

6

If connecting to a replication slot fails, this is the maximum number of consecutive attempts to connect.

10000(10 seconds)

The number of milliseconds to wait between retry attempts when the connector fails to connect to a replication slot.

__debezium_unavailable_value

Specifies the constant that the connector provides to indicate that the original value is a toasted value that is not provided by the database. If the setting oftoasted.value.placeholderstarts with thehex:prefix it is expected that the rest of the string represents hexadecimally encoded octets. Seetoasted valuesfor additional details.

false

Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specifytrueif you want the connector to do this. SeeTransaction metadatafor details.

10000 (10 seconds)

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

comma-separated list of operation types that will be skipped during streaming. The operations include:cfor inserts/create,ufor updates, anddfor deletes. By default, no operations are skipped.

Fully-qualified name of the data collection that is used to sendsignalsto the connector. The name format isschema-name.table-name

1024

The number of rows fetched from the database for each incremental snapshot iteration.

Pass-through connector configuration properties

The connector also supportspass-throughconfiguration properties that are used when creating the Kafka producer and consumer.

Be sure to consult theKafka documentationfor all of the configuration properties for Kafka producers and consumers. The PostgreSQL connector does use thenew consumer configuration properties

Monitoring

The Debezium PostgreSQL connector provides two types of metrics that are in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect provide.

  • Snapshot metricsprovide information about connector operation while performing a snapshot.

  • Streaming metricsprovide information about connector operation when the connector is capturing changes and streaming change event records.

Debezium monitoring documentationprovides details for how to expose these metrics by using JMX.

Snapshot metrics

TheMBeanisdebezium.postgres:type=connector-metrics,context=snapshot,server=

Snapshot metrics are not exposed unless a snapshot operation is active, or if a snapshot has occurred since the last connector start.

The following table lists the shapshot metrics that are available.

Attributes Type Description

string

The last snapshot event that the connector has read.

long

The number of milliseconds since the connector has read and processed the most recent event.

long

The total number of events that this connector has seen since last started or reset.

long

The number of events that have been filtered by include/exclude list filtering rules configured on the connector.

MonitoredTables
Deprecated and scheduled for removal in a future release; use theCapturedTablesmetric instead.

string[]

The list of tables that are monitored by the connector.

string[]

The list of tables that are captured by the connector.

int

The length the queue used to pass events between the snapshotter and the main Kafka Connect loop.

int

The free capacity of the queue used to pass events between the snapshotter and the main Kafka Connect loop.

int

The total number of tables that are being included in the snapshot.

int

The number of tables that the snapshot has yet to copy.

boolean

Whether the snapshot was started.

boolean

Whether the snapshot was aborted.

boolean

Whether the snapshot completed.

long

The total number of seconds that the snapshot has taken so far, even if not complete.

Map

Map containing the number of rows scanned for each table in the snapshot. Tables are incrementally added to the Map during processing. Updates every 10,000 rows scanned and upon completing a table.

long

The maximum buffer of the queue in bytes. It will be enabled ifmax.queue.size.in.bytesis passed with a positive long value.

long

The current data of records in the queue in bytes.

Streaming metrics

TheMBeanisdebezium.postgres:type=connector-metrics,context=streaming,server=

The following table lists the streaming metrics that are available.

Attributes Type Description

string

The last streaming event that the connector has read.

long

The number of milliseconds since the connector has read and processed the most recent event.

long

The total number of events that this connector has seen since last started or reset.

long

The number of events that have been filtered by include/exclude list filtering rules configured on the connector.

MonitoredTables
Deprecated and scheduled for removal in a future release; use the 'CapturedTables' metric instead

string[]

The list of tables that are monitored by the connector.

string[]

The list of tables that are captured by the connector.

int

The length the queue used to pass events between the streamer and the main Kafka Connect loop.

int

The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop.

boolean

Flag that denotes whether the connector is currently connected to the database server.

long

The number of milliseconds between the last change event’s timestamp and the connector processing it. The values will incoporate any differences between the clocks on the machines where the database server and the connector are running.

long

The number of processed transactions that were committed.

Map

The coordinates of the last received event.

string

Transaction identifier of the last processed transaction.

long

The maximum buffer of the queue in bytes.

long

The current data of records in the queue in bytes.

Behavior when things go wrong

Debezium is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then Debezium providesexactly oncedelivery of every change event record.

If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In these abnormal situations, Debezium, like Kafka, providesat least oncedelivery of change events.

The rest of this section describes how Debezium handles various kinds of faults and problems.

Configuration and startup errors

In the following situations, the connector fails when trying to start, reports an error/exception in the log, and stops running:

  • The connector’s configuration is invalid.

  • The connector cannot successfully connect to PostgreSQL by using the specified connection parameters.

  • The connector is restarting from a previously-recorded position in the PostgreSQL WAL (by using the LSN) and PostgreSQL no longer has that history available.

In these cases, the error message has details about the problem and possibly a suggested workaround. After you correct the configuration or address the PostgreSQL problem, restart the connector.

PostgreSQL becomes unavailable

When the connector is running, the PostgreSQL server that it is connected to could become unavailable for any number of reasons. If this happens, the connector fails with an error and stops. When the server is available again, restart the connector.

The PostgreSQL connector externally stores the last processed offset in the form of a PostgreSQL LSN. After a connector restarts and connects to a server instance, the connector communicates with the server to continue streaming from that particular offset. This offset is available as long as the Debezium replication slot remains intact. Never drop a replication slot on the primary server or you will lose data. See the next section for failure cases in which a slot has been removed.

Cluster failures

As of release 12, PostgreSQL allows logical replication slotsonly on primary servers。这意味着您可以点Debezium Postgre开云体育官方注册网址SQL connector to only the active primary server of a database cluster. Also, replication slots themselves are not propagated to replicas. If the primary server goes down, a new primary must be promoted.

The new primary must have thelogical decoding plug-ininstalled and a replication slot that is configured for use by the plug-in and the database for which you want to capture changes. Only then can you point the connector to the new server and restart the connector.

There are important caveats when failovers occur and you should pause Debezium until you can verify that you have an intact replication slot that has not lost data. After a failover:

  • There must be a process that re-creates the Debezium replication slot before allowing the application to write to thenewprimary. This is crucial. Without this process, your application can miss change events.

  • You might need to verify that Debezium was able to read all changes in the slotbefore the old primary failed

恢复和验证wh可靠的方法之一ether any changes were lost is to recover a backup of the failed primary to the point immediately before it failed. While this can be administratively difficult, it allows you to inspect the replication slot for any unconsumed changes.

There are discussions in the PostgreSQL community around a feature calledfailover slotsthat would help mitigate this problem, but as of PostgreSQL 12, they have not been implemented. However, there is active development for PostgreSQL 13 to support logical decoding on standbys, which is a major requirement to make failover possible. You can find more about this in this link:"0w50sRagcs+jrktBXuJAWGZQdSTMa57CCY+Dh-xbg@mail.gmail.com"">community thread。

More about the concept of failover slots is inthis blog post

Kafka Connect process stops gracefully

Suppose that Kafka Connect is being run in distributed mode and a Kafka Connect process is stopped gracefully. Prior to shutting down that process, Kafka Connect migrates the process’s connector tasks to another Kafka Connect process in that group. The new connector tasks start processing exactly where the prior tasks stopped. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.

Kafka Connect process crashes

If the Kafka Connector process stops unexpectedly, any connector tasks it was running terminate without recording their most recently processed offsets. When Kafka Connect is being run in distributed mode, Kafka Connect restarts those connector tasks on other processes. However, PostgreSQL connectors resume from the last offset that wasrecordedby the earlier processes. This means that the new replacement tasks might generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.

因为有一个机会,一些事件可能be duplicated during a recovery from failure, consumers should always anticipate some duplicate events. Debezium changes are idempotent, so a sequence of events always results in the same state.

In each change event record, Debezium connectors insert source-specific information about the origin of the event, including the PostgreSQL server’s time of the event, the ID of the server transaction, and the position in the write-ahead log where the transaction changes were written. Consumers can keep track of this information, especially the LSN, to determine whether an event is a duplicate.

Kafka becomes unavailable

As the connector generates change events, the Kafka Connect framework records those events in Kafka by using the Kafka producer API. Periodically, at a frequency that you specify in the Kafka Connect configuration, Kafka Connect records the latest offset that appears in those change events. If the Kafka brokers become unavailable, the Kafka Connect process that is running the connectors repeatedly tries to reconnect to the Kafka brokers. In other words, the connector tasks pause until a connection can be re-established, at which point the connectors resume exactly where they left off.

Connector is stopped for a duration

If the connector is gracefully stopped, the database can continue to be used. Any changes are recorded in the PostgreSQL WAL. When the connector restarts, it resumes streaming changes where it left off. That is, it generates change event records for all database changes that were made while the connector was stopped.

A properly configured Kafka cluster is able to handle massive throughput. Kafka Connect is written according to Kafka best practices, and given enough resources a Kafka Connect connector can also handle very large numbers of database change events. Because of this, after being stopped for a while, when a Debezium connector restarts, it is very likely to catch up with the database changes that were made while it was stopped. How quickly this happens depends on the capabilities and performance of Kafka and the volume of changes being made to the data in PostgreSQL.