您正在查看过时的Debezium版本的文档。开云体育官方注册网址
如果您想查看本页最新的稳定版本,请前往在这里

开云体育官方注册网址Debezium连接器的PostgreSQL

开云体育官方注册网址Debezium的PostgreSQL连接器可以监视和记录PostgreSQL数据库模式中的行级更改。开云体育电动老虎机

当它第一次连接到PostgreSQL服务器/集群时,它会读取所有模式的一致快照。当快照完成时,连接器会连续地传输提交到PostgreSQL 9.6或更高版本的更改,并生成相应的插入、更新和删除事件。每个表的所有事件都记录在一个单独的Kafka主题中,应用程序和服务可以很容易地使用它们。

概述

PostgreSQL的逻辑解码特性首次在9.4版中引入,它是一种机制,允许提取提交到事务日志中的更改,并通过类的帮助以用户友好的方式处理这些更改输出插件.这个输出插件必须在运行PostgreSQL服务器之前安装,并与复制插槽一起启用,以便客户端能够使用这些更改。

PostgreSQL连接器包含两个不同的部分,它们一起工作,以便能够读取和处理服务器的更改:

  • 逻辑解码输出插件,必须在PostgreSQL服务器上安装和配置

    • decoderbufs(由Debezium社区维护,基于Pr开云体育官方注册网址otoBuf)

    • wal2json(由wal2json社区维护,基于JSON)

    • pgoutput, PostgreSQL 10+中的标准逻辑解码插件(由Postgres社区维护,由Postgres自己用于逻辑复制);该插件始终存在,这意味着不需要安装其他库,Debezium连接器将直接将原始复制事件流解释为更改事件。开云体育官方注册网址

  • Java代码(实际的Kafka Connect连接器),它读取所选插件产生的更改,使用PostgreSQL的流复制协议,通过PostgreSQLJDBC驱动程序

连接器然后生成一个更改事件对于接收到的每个行级插入、更新和删除操作,将每个表的所有更改事件记录在单独的Kafka主题中。您的客户端应用程序读取对应于它们感兴趣的数据库表的Kafka主题,并对在这些主题中看到的每个行级事件做出反应。开云体育电动老虎机

PostgreSQL通常会在一段时间后清除WAL段。这意味着连接器不会拥有对数据库所做的所有更改的完整历史。开云体育电动老虎机因此,当PostgreSQL连接器第一次连接到一个特定的PostgreSQL数据库时,它首先执行一个开云体育电动老虎机一致的快照每个数据库模式的。开云体育电动老虎机在连接器完成快照之后,它继续从创建快照的确切位置开始流化更改。通过这种方式,我们从所有数据的一致视图开始,然后继续读取,而不会丢失快照发生时所做的任何更改。

该连接器还具有故障容忍度。当连接器读取更改并产生事件时,它将记录每个事件在预写日志中的位置。如果连接器由于任何原因停止(包括通信故障、网络问题或崩溃),在重新启动时,它只是在上次停止的地方继续读取WAL。这包括快照:如果在连接器停止时快照没有完成,那么在重新启动时它将开始一个新的快照。

连接器的功能依赖于PostgreSQL的逻辑解码特性。请注意连接器也反映了以下限制:

  1. 逻辑解码不支持DDL更改:这意味着连接器无法向使用者报告DDL更改事件。

  2. 逻辑解码复制插槽仅支持在主要的服务器:这意味着当有一个PostgreSQL服务器集群时,连接器只能在活动服务器上运行主要的服务器。它不能继续运行温暖的备用副本。如果主要的服务器失败或降级,连接器将停止。一旦主要的已恢复的连接器可以简单地重新启动。如果已升级到其他PostgreSQL服务器主要的,重新启动连接器前,必须调整连接器配置。确保阅读了更多关于连接器行为的内容当事情出错时

开云体育官方注册网址Debezium目前只支持UTF-8字符编码的数据库。开云体育电动老虎机使用单字节字符编码,不可能正确处理包含扩展ASCII码字符的字符串。

设置PostgreSQL

在使用PostgreSQL连接器监视在PostgreSQL服务器上提交的更改之前,首先确定您打算使用哪种逻辑解码器方法。如果你计划要使用原生pgoutput逻辑复制流支持,则需要将逻辑解码插件安装到PostgreSQL服务器中。然后启用复制槽位,并为用户配置足够的权限来执行复制。

请注意,如果您的数据库由服务托管,例开云体育电动老虎机如Heroku Postgres您可能无法安装插件。如果是这样,如果你使用的是PostgreSQL 10+,你可以使用pgoutput解码器支持来监控你的数据库。开云体育电动老虎机如果没有这个选项,您将无法使用Debezium监视数据库。开云体育官方注册网址开云体育电动老虎机

PostgreSQL在亚马逊RDS

可以监控PostgreSQL数据库的运行情况开云体育电动老虎机Amazon RDS.要让它运行,必须满足以下条件

  • 实例参数rds.logical_replication设置为1

  • 验证wal_level参数设置为逻辑通过运行查询显示wal_level作为数据库主用户;在多区域复制设置中可能不是这样。您不能手动设置此选项,它是(自动改变)当rds.logical_replication设置为1.如果wal_level不是逻辑在上述更改之后,可能是因为参数组更改导致实例必须重新启动,这将根据维护窗口发生,也可以手动完成。

  • plugin.name开云体育官方注册网址Debezium参数为wal2json.如果你想使用pgoutput逻辑复制流支持,你可以在PostgreSQL 10+上跳过这个步骤。

  • 使用数据开云体育电动老虎机库主帐户进行复制,因为RDS目前不支持设置复制另一个帐户的特权。

您应该确保在Amazon RDS上使用最新版本的Postgres 9.6、10或11。否则,可能会安装旧版本的wal2json插件(请参阅官方文件获取安装在Amazon RDS上的确切wal2json版本)。在这种情况下,从数据库接收的复制消息可能不携带关于类型约束的完整信息,如长度或规模或开云体育电动老虎机/非空,这反过来可能导致在列定义更改的情况下,在短时间内创建模式不一致的消息。

截至2019年1月,RDS上的以下Postgres版本附带了wal2json的最新版本,因此应该使用:

  • Postgres 9.6: 9.6.10或更新版本

  • Postgres 10: 10.5及更新版本

  • Postgres 11:任何版本

安装逻辑解码输出插件

也看到PostgreSQL逻辑解码输出插件安装有关设置和测试逻辑解码插件的更详细说明。

从Debez开云体育官方注册网址ium 0.10开始,连接器支持使用PostgreSQL 10+逻辑复制流pgoutput.这意味着不再需要逻辑解码输出插件,连接器可以直接从复制流发出更改。

从PostgreSQL 9.4开始,读取write-ahead-log更改的唯一方法是首先安装一个逻辑解码输出插件。插件用C语言编写,编译并安装在运行PostgreSQL服务器的机器上。插件使用许多特定于PostgreSQL的api,如PostgreSQL的文档

PostgreSQL连接器与Debezium支持的逻辑解码插件之一一起工作,对开云体育官方注册网址其中任何一个的更改进行编码Protobuf格式JSON格式。请参阅所选插件的文档(protobufwal2json),以了解插件的需求、限制以及如何编译它。

为了简单起见,Debezium还开云体育官方注册网址提供了一个基于普通PostgreSQL服务器映像的Docker映像,并在其上编译和安装插件。我们建议使用此图像作为安装所需详细步骤的示例。

Debe开云体育官方注册网址zium逻辑解码插件只被安装和测试过Linux机器。对于Windows和其他操作系统,可能需要不同的安装步骤。

插件之间的区别

插件的行为在所有情况下都不完全相同。到目前为止,已经确定了这些差异:

  • Wal2json插件无法处理带引号的标识符(问题

  • Wal2json插件不会对没有主键的表发出事件

  • Wal2json插件不支持特殊值()用于浮点类型

  • Wal2json应该与设置schema.refresh.mode连接器选项columns_diff_exclude_unchanged_toast;否则,当接收到包含未更改TOAST列的行的更改事件时,该列的字段将不会包含在发出的更改事件的字段中结构。这是因为wal2json的消息不会包含该列的字段。

添加这个的需求在wal2json中被跟踪发行98.参见columns_diff_exclude_unchanged_toast下面将进一步说明使用它的含义。

  • Pgoutput插件不会为没有主键的表发出事件

在测试套件中跟踪所有最新的差异Java类

配置PostgreSQL服务器

如果您正在使用其中一个受支持的逻辑解码插件(即不是pgoutput)并且它已经安装,配置服务器在启动时加载插件:

postgresql.conf
# MODULES shared_preload_libraries = 'decoderbufs,wal2json'(1)
1 告诉服务器它应该在启动时加载decoderbufs而且wal2json逻辑解码插件(插件的名称设置在Protobuf而且wal2jsonmakefile)

接下来是配置复制槽,而不管使用的解码器:

postgresql.conf
#复制wal_level = logical(1)Max_wal_senders = 1(2)Max_replication_slots = 1(3)
1 告诉服务器它应该对预写日志使用逻辑解码
2 告诉服务器它应该使用的最大值1处理WAL变更的独立进程
3. 告诉服务器它应该允许的最大值1为流化WAL更改而创建的复制插槽

开云体育官方注册网址Debezium使用PostgreSQL的逻辑解码,它使用复制插槽。即使在Debezium中断期间,复制插槽也保证保留Debezium所需的所有WAL。开云体育官方注册网址因此,密切监视复制插槽非常重要,以避免过多的磁盘消耗和其他可能发生的情况,如复制插槽长时间未使用时的编目膨胀。有关更多信息,请参阅官方Postgres文档这个主题

以防你和一个synchronous_commit其他设置,建议设置wal_writer_delay到一个值,例如10毫秒,以实现较低的更改事件延迟。否则,它的默认值将被应用,这将增加大约200毫秒的延迟。

我们强烈建议阅读和理解官方文件关于PostgreSQL预写日志的机制和配置。

设置权限

接下来,配置一个可以执行复制的数据库开云体育电动老虎机用户。

复制只能由具有相应权限的数据库用户执行,且只能对配置的主机数量进行复制。开云体育电动老虎机

为了赋予用户复制权限,定义一个PostgreSQL角色至少复制而且登录权限。例如:

创建角色名

超级用户默认拥有上述两种角色。

最后,配置PostgreSQL服务器,允许在服务器机器和运行PostgreSQL连接器的主机之间进行复制:

pg_hba.conf
本地复制信任(1)主机复制 127.0.0.1/32 trust(2)主机复制::1/128 trust(3)
1 告诉服务器允许复制<对于>本地(即在服务器机器上)
2 告诉服务器允许<对于>本地主机接收复制更改使用IPV4
3. 告诉服务器允许<对于>本地主机接收复制更改使用IPV6

看到PostgreSQL文档有关网络掩码的更多信息。

支持的PostgreSQL拓扑

PostgreSQL连接器可以与独立的PostgreSQL服务器一起使用,也可以与PostgreSQL服务器集群一起使用。

正如前面提到的一开始, PostgreSQL(所有版本⇐12)只支持逻辑复制槽上主要的服务器。这意味着PostgreSQL集群中的副本不能配置为逻辑复制,因此Debezium PostgreSQL连接器只能与主服务器连接和通信。开云体育官方注册网址如果此服务器失败,连接器将停止。当群集修复时,如果原始主服务器再次提升到主要的,连接器可以简单地重新启动。但是,如果是不同的PostgreSQL服务器使用插件和适当的配置被提升为主要的时,连接器配置必须更改为指向新的主要的服务器,然后可以重新启动。

WAL磁盘空间消耗

在某些情况下,WAL文件所消耗的PostgreSQL磁盘空间可能会出现峰值或超出正常比例。有三个潜在的原因可以解释这种情况:

  • 开云体育官方注册网址Debezium定期向数据库确认已处理事件的LSN。开云体育电动老虎机这是可见的confirmed_flush_lsnpg_replication_slots槽表。数据库负开云体育电动老虎机责回收磁盘空间,WAL大小可以从restart_lsn同一张桌子。如果confirmed_flush_lsn有规律地增加restart_lsn滞后时,数据库确实需要回收空开云体育电动老虎机间。磁盘空间通常在批处理块中回收,因此这是预期的行为,用户方面不需要任何操作。

  • 被监控的数据库中有很多更新,但只有很少一部分与被监控的表和/或模式相关。开云体育电动老虎机这种情况可以通过使用启用周期性心跳事件来轻松解决heartbeat.interval.ms配置选项。

  • PostgreSQL实例包含多个数据库,其中一个是高流量数据库。开云体育电动老虎机开云体育官方注册网址Debezium监视与另一个数据开云体育电动老虎机库相比流量较低的另一个数据库。开云体育官方注册网址然后Debezium不能确认LSN,因为复制插槽工作在每个数据库,并且Debezium没有被调用。开云体育电动老虎机由于WAL由所有数据库共享,它倾向于增长,直到D开云体育电动老虎机ebezium监视的数据库触发事件。开云体育官方注册网址

要克服第三个原因,就必须

  • 使能周期心跳记录生成heartbeat.interval.ms配置选项

  • 定期从Debezium跟踪的数据库发出更改事件开云体育官方注册网址开云体育电动老虎机

    • 在这种情况下wal2json解码器插件,它足以生成空事件。例如,可以通过截断空临时表来实现这一点。

    • 对于其他解码器插件,建议创建一个Debezium不监视的补充表。开云体育官方注册网址

然后,一个单独的进程将定期更新表(要么插入一个新事件,要么全部更新同一行)。然后PostgreSQL将调用Debezium, Deb开云体育官方注册网址ezium将确认最新的LSN,并允许数据库回收WAL空间。开云体育电动老虎机

对于使用Postgres的AWS RDS上的用户,在空闲环境中可能会出现与第三个原因类似的情况,因为AWS RDS对自己的系统表的写入对用户来说是不可见的(5分钟)。同样,定期发射事件将解决问题。

PostgreSQL连接器是如何工作的

快照

大多数PostgreSQL服务器被配置为不保留WAL段中数据库的完整历史,因此PostgreSQL连接器将无法通过简单地读取WAL来查看数据库的整个历史。开云体育电动老虎机因此,默认情况下,连接器将在第一次启动时执行初始化一致的快照数据库的。开云体育电动老虎机每个快照由以下步骤组成(当使用内置快照模式时,自定义快照模式可能会覆盖这个):

  1. 属性启动事务可序列化,只读,可延迟隔离级别,以确保此事务中的所有后续读取都是针对数据的单个一致版本完成的。任何数据的变化,由于后续插入更新,删除其他客户端的操作对该事务不可见。

  2. 获得一个访问共享模式锁定每个被监视的表,以确保在快照发生时,任何表都不会发生结构更改。注意,这些锁不阻止表插入更新而且删除在行动中发生。如果使用导出的快照模式,可以使用无锁快照,则不需要执行此步骤

  3. 读取服务器事务日志中的当前位置。

  4. 扫描所有数据库表和模式,并生成一开云体育电动老虎机个事件,并将该事件写入相应的特定于表的Kafka主题。

  5. 提交事务。

  6. 在连接器偏移中记录快照的成功完成。

如果连接器失败,重新平衡,或者在步骤1开始后但在步骤6完成之前停止,重新启动连接器将开始一个新的快照。一旦连接器完成了它的初始快照,然后PostgreSQL连接器在步骤3中继续从位置读取流,确保它不会错过任何更新。如果连接器由于任何原因再次停止,在重新启动时,它将从之前停止的地方继续流式传输更改。

第二个快照模式允许连接器执行快照总是.这个行为告诉连接器总是在快照启动时执行快照,并在快照完成后继续按照上述顺序执行步骤3中的更改。这种模式可用于以下情况:已知某些WAL段已被删除且不再可用,或者在新主服务器升级后发生集群故障,这样连接器就不会错过任何可能发生的更改,这些更改可能发生在新主服务器升级后,但在新主服务器上重新启动连接器之前。

第三个快照模式指示连接器从来没有执行快照。当一个新的连接器以这种方式配置时,if将继续从以前存储的偏移量流更改,或者它将从服务器上第一次创建PostgreSQL逻辑复制插槽的时间点开始。注意,只有当您知道所有感兴趣的数据仍然反映在WAL中时,这种模式才有用。

第四种快照模式,最初只有,将执行数据库快照,然后在流处理任开云体育电动老虎机何其他更改之前停止。如果连接器已经启动,但在停止之前没有完成快照,则连接器将重新启动快照进程,并在快照完成后停止。

第五种快照模式,出口,将根据创建复制槽时的时间点执行数开云体育电动老虎机据库快照。这种模式是一种以无锁方式执行快照的好方法。

最后一个快照模式,自定义,允许用户注入自己的实现io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter接口通过snapshot.custom.class配置属性,类在你的Kafka Connect集群的类路径(或包含在JAR中,如果使用EmbeddedEngine).有关详细信息,请参见定制的快照部分。

自定义快照SPI

类的实现可用于更高级的用法io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter接口。该接口允许控制快照操作的大部分方面,例如是否拍摄快照,以及用于打开快照事务或拍摄锁的选项的方式。

接口的完整API可以在这里看到:

** *快照进程的详细信息。**即:*——应该发生在一个快照* - *应该流发生快照查询应该使用什么* *虽然很多默认快照模式提供Debezium *自定义的实现这个接口可以提供的实现者,*可以提供更高级的功能,如部分快照* *实现者的必须返回true {@link # shouldSnapshot()}或{@link # shouldStream()} *开云体育官方注册网址或适用于两者。*/ @孵化公共接口快照{void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState SlotState);/** * @如果快照需要快照则返回true */ boolean shouldSnapshot();/** * @返回true如果快照在拍摄快照后应该流*/ boolean shouldStream();/** * @return true如果在创建插槽时需要导出快照,则*可作为获取锁的替代*/默认布尔exportSnapshot() {return false;} /** *为指定的表生成一个有效的postgres查询字符串,或一个空的{@link可选}*跳过快照该表(但该表仍将从)** @param tableId表生成一个查询* @返回一个有效的查询字符串,或不跳过快照该表*/可选< string > buildSnapshotQuery(tableId tableId);** @param newSlotInfo如果为快照创建了一个新的slow,它包含来自* the create_replication_slot命令的信息*/ default string snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo){//我们正在使用与pg_backup相同的隔离级别Return " set transaction isolation level SERIALIZABLE, READ ONLY, deferable;";} /** *返回一个SQL语句,用于在快照期间锁定给定的表,如果特定的snapshot *实现需要的话。*/ default可选 snapshotTableLockingStatement(Duration lockTimeout, Set tableIds) {String lineSeparator = System.lineSeparator(); StringBuilder statements = new StringBuilder(); statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); // we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot // this does not prevent writes to the table, but prevents changes to the table's schema.... // DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted tableIds.forEach(tableId -> statements.append("LOCK TABLE ") .append(tableId.toDoubleQuotedString()) .append(" IN ACCESS SHARE MODE;") .append(lineSeparator)); return Optional.of(statements.toString()); } }

所有内置快照模式都是根据这个接口实现的。

流的变化

PostgreSQL连接器通常会花费大量的时间来处理它所连接的PostgreSQL服务器的更改。这个机制依赖于PostgreSQL的复制协议当服务器的事务日志在特定位置(也称为?)提交更改时,客户机可以从服务器接收更改日志序列号或者简称为LSNs)。

每当服务器提交事务时,一个单独的服务器进程都会从逻辑解码插件.这个函数处理来自事务的更改,将它们转换为特定的格式(在Debezium插件的情况下是Protobuf或JSON),并将它们写入输出流,然后由客户端使用。开云体育官方注册网址

PostgreSQL连接器充当PostgreSQL客户端,当它接收到这些更改时,它将事件转换为Debezium开云体育官方注册网址创建更新,或删除事件,包括事件的LSN位置。PostgreSQL连接器将这些更改事件转发给Kafka Connect框架(在同一个进程中运行),然后Kafka Connect框架以相同的顺序异步地将它们写入相应的Kafka主题。Kafka Connect使用了这个术语抵消对于Debezium包含在每个事件中的源特定位置信息,Kafka Connect定期记录另一个Kaf开云体育官方注册网址ka主题中最近的偏移量。

当Kafka Connect优雅地关闭时,它会停止连接器,将所有事件刷新到Kafka,并记录从每个连接器接收到的最后偏移量。在重新启动时,Kafka Connect读取每个连接器的最后记录偏移量,并从该点启动连接器。PostgreSQL连接器使用记录在每个更改事件中的LSN作为偏移量,因此在重新启动时,连接器请求PostgreSQL服务器将事件发送到该位置之后。

PostgreSQL连接器检索模式信息,作为逻辑解码器插件发送的事件的一部分。唯一的例外是关于哪些列组成主键的信息,因为该信息是从JDBC元数据(侧通道)获得的。如果一个表的主键定义发生了变化(通过添加、删除或重命名PK列),那么就会存在一个轻微的风险,即来自JDBC的主键信息不会与逻辑解码事件中的更改数据同步,并且将创建具有不一致的键结构的少量消息。如果发生这种情况,则重新启动连接器并重新处理消息将解决问题。为了完全防止这个问题,建议用Debezium同步主键结构的更新,大致使用以下操作顺序:开云体育官方注册网址

  • 将数据库或应用程开云体育电动老虎机序设置为只读模式

  • 让Deb开云体育官方注册网址ezium处理所有剩余事件

  • 停止Deb开云体育官方注册网址ezium

  • 更新主键定义

  • 将数据库或应用程开云体育电动老虎机序置于读/写状态,并再次启动Debezium开云体育官方注册网址

PostgreSQL 10+逻辑解码支持(pgoutput)

从PostgreSQL 10+开始,引入了一种新的逻辑复制流模式,称为pgoutput.这种逻辑复制流模式是由PostgreSQL原生支持的,这意味着该连接器可以使用复制流,而不需要安装额外的插件。这对于不支持或不允许安装插件的环境尤其有价值。

看到设置PostgreSQL欲知详情。

主题名称

PostgreSQL连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。默认情况下,Kafka主题名为serverNameschemaName的表在哪里serverName连接器的逻辑名称是否与开云体育电动老虎机database.server.name配置属性,schemaName发生操作的数据库模式的名称和开云体育电动老虎机的表发生操作的数据库表的名称。开云体育电动老虎机

例如,考虑一个PostgreSQL安装postgres开云体育电动老虎机数据库和库存模式,包含四个表:产品products_on_hand客户,订单.如果给监视此数据库的连接器一个逻辑服务器名开云体育电动老虎机实现,那么连接器将在这四个Kafka主题上产生事件:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

另一方面,如果表不是特定模式的一部分,而是在默认情况下创建的公共PostgreSQL模式,那么Kafka主题的名字是:

  • fulfillment.public.products

  • fulfillment.public.products_on_hand

  • fulfillment.public.customers

  • fulfillment.public.orders

元信息

每一个记录所产生的PostgreSQL连接器,除了开云体育电动老虎机数据库事件,一些元信息,关于事件在服务器上发生的位置,源分区的名称,Kafka主题和事件应该放置的分区的名称:

"sourcePartition": {"server": "fulfillment"}, "sourceOffset": {"lsn": "24023128", "txId": "555", "ts_ms": "1482918357011"}, "kafkaPartition": null

PostgreSQL连接器只使用1个Kafka Connect分区并将生成的事件放入1个Kafka分区。因此得名sourcePartition将始终默认的名称开云体育电动老虎机database.server.name属性,而kafkaPartition有价值这意味着连接器不使用特定的Kafka分区。

sourceOffset消息的一部分包含有关事件发生的服务器位置的信息:

  • lsn表示PostgreSQL日志序列号抵消在事务日志中

  • txId表示引起事件的服务器事务的标识符

  • ts_ms表示自Unix纪元作为事务提交的服务器时间以来的微秒数

事件

所有由PostgreSQL连接器产生的数据更改事件都有一个键和一个值,尽管键和值的结构依赖于产生更改事件的表(参见主题名称).

从Kafka 0.10开始,Kafka可以选择记录消息键和值时间戳消息被创建(由生产者记录)或被Kafka写入日志的时间。

PostgreSQL连接器确保所有Kafka连接模式名有效的Avro模式名.这意味着逻辑服务器名必须以拉丁字母或下划线开头(例如,[a-z, a-z, _]),逻辑服务器名中的其余字符以及模式和表名中的所有字符必须是拉丁字母、数字或下划线(例如,[a-z, a-z, 0-9,\_])。如果不是,则所有无效字符将自动替换为下划线字符。

当逻辑服务器名、模式名和表名包含其他字符时,这可能导致意外的冲突,并且表全名之间唯一的区分字符无效,因此被下划线取代。

开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流,这些事件的结构可能会随着时间而改变。对于消费者来说,这可能很难处理,所以Kafka Connect让每个事件都是自包含的。每个消息键和值都有两部分模式而且有效载荷.模式描述有效负载的结构,而有效负载包含实际数据。

更改事件的键

对于给定的表,更改事件的键将具有一个结构,其中包含主键(或唯一键约束)中的每一列的字段副本的身份设置为完整的使用索引在创建事件时的表上)。

考虑一个客户表中定义的公共开云体育电动老虎机数据库模式:

CREATE TABLE customers (id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id));

如果开云体育电动老虎机database.server.name属性的值PostgreSQL_server,每变事件为客户表,当它有这个定义时,将具有相同的键结构,在JSON中是这样的:

{"schema": {"type": "struct", "name": "PostgreSQL_server.public.customers. "关键”、“可选”:假的,“字段”:[{“名称”:“id”、“指数”:“0”、“模式”:{“类型”:“INT32”、“可选”:“假的 " } } ] }, " 有效载荷”:{" id ": " 1 "}}

模式密钥部分包含一个Kafka Connect模式,描述密钥部分中的内容,在我们的例子中,这意味着有效载荷值不是可选的,是由名为PostgreSQL_server.public.customers.Key,并且有一个名为id类型的int32.如果我们看一下键的值有效载荷字段,我们将看到它确实是一个结构(在JSON中只是一个对象)id字段,值为1

因此,我们将此键解释为描述对象中的行public.customers表(从命名的连接器输出PostgreSQL_server),其id主键列的值为1

虽然column.blacklist属性允许您从事件值中删除列,主键或唯一键中的所有列始终包含在事件的键中。

如果表没有主键或唯一键,则更改事件的键将为空。这是有意义的,因为没有主键或唯一键约束的表中的行不能被唯一标识。

更改事件的值

更改事件消息的值稍微复杂一些。像消息键一样,它有一个模式节和有效载荷部分。PostgreSQL连接器产生的每个更改事件值的有效负载部分都有一个信封结构,使用以下字段:

  • 人事处必选字段,包含描述操作类型的字符串值。PostgreSQL连接器的值为c对于创建(或插入),u对于更新,d对于删除,和r用于读取(在快照的情况下)。

  • 之前是否是可选字段,如果存在则包含行状态之前事件发生了。该结构将由PostgreSQL_server.public.customers.ValueKafka Connect模式,其中PostgreSQL_server对象中的所有行使用连接器public.customers表格

该字段是否可用在很大程度上取决于副本的身份每个桌子的设置

  • 是否是可选字段,如果存在则包含行状态事件发生了。结构的描述是相同的PostgreSQL_server.public.customers.Value中使用的Kafka Connect模式之前

  • 是一个必填字段,包含描述事件源元数据的结构,在PostgreSQL中包含几个字段:Debezium版本、连接器名称、受影响的数据库名称、模式和表、事件是否是正在进行的快照的一部分以及记录中的相同字段开云体育官方注册网址开云体育电动老虎机元信息部分

  • ts_ms是可选的,如果存在,则包含连接器处理事件的时间(使用运行Kafka Connect任务的JVM中的系统时钟)。

当然,还有模式事件消息值的一部分包含描述此信封结构及其内嵌套字段的模式。

副本的身份

副本的身份是一个特定于PostgreSQL的表级设置,它决定了可用信息的数量逻辑解码如果更新而且删除事件。更具体地说,这控制了无论何时发生上述事件之一,有关所涉及的表列的先前值的可用信息(如果有的话)。

有4个可能的值副本的身份

  • 默认的,更新而且删除事件将仅包含表的主键列的先前值更新只有主列的值发生了变化

  • 没什么- - - - - -更新而且删除事件将不包含关于任何表列上一个值的任何信息

  • 完整的,更新而且删除事件将包含表中所有列的先前值

  • 指数索引名称-更新而且删除事件将包含命名的索引定义中包含的列的先前值索引名称,以防更新只会显示值已更改的索引列

创建事件

我们来看看a创建事件值可能看起来像客户表:

{"schema": {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": " postgresql_server .目录。值","field": "before"}, {"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "PostgreSQL_server.inventory.customers。值”、“场”:“后”},{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“弦”、“可选”:假的,“场”:“分贝”},{“类型”:“弦”、“可选”:假的,“场”:“模式”},{“类型”:“弦”、“可选”:假的,“场”:“表”},{“类型”:“int64”、“可选”:真的,“场”:“txId”},{“类型”:“int64”、“可选”:真的,“场”:“lsn”},{“类型”:“int64”、“可选”:真的,“场”:“xmin”}],“可选”:假的,“名字”:“io.debezium.connector.postgresql.Source”、“字段”:“源”},{“类型”:“弦”、“可选”:假的,“场”:“人事处”},{“类型”:“int64”、“可选”:真的,“场”:“ts_ms”}],“可选”:假开云体育官方注册网址的,“名字”:“PostgreSQL_server.inventory.customers。信封”},“有效载荷”:{“前”:空,“后”:{" id ": 1、“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{“版本”:“1.0.3。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": true, "db": "postgres", "schema": "public", "table": "customers", "txId": 555, "lsn": 24023128, "xmin": null}, "op": "c", "ts_ms": 1559033904863}}

如果我们看模式事件的一部分价值的模式信封的模式结构(特定于PostgreSQL连接器并在所有事件中重用),以及表特定的模式之前而且字段。

的模式名称之前而且字段的格式为logicalNameschemaName的表. value,因此完全独立于所有其他表的所有其他模式。

这意味着当使用Avro转换器时,所产生的Avro模式每个表在每一个逻辑源有自己的演变和历史。

如果我们看有效载荷事件的一部分价值,我们可以看到事件中的信息,即它正在描述创建的行(sinceop = c),以及字段值包含新插入行的'idfirst_namelast_name,电子邮件列。

事件的JSON表示形式似乎比它们描述的行要大得多。这是正确的,因为JSON表示必须包含模式有效载荷部分信息。

这是可能的,甚至建议使用Avro转换器来大幅减少写入Kafka主题的实际消息的大小。

更新事件

的值更新这个表上的Change事件实际上会有完全相同的结果模式,其有效负载的结构相同,但将持有不同的值。这里有一个例子:

{"schema":{…},“有效载荷”:{“前”:{" id ": 1},“后”:{" id ": 1、“first_name”:“安妮玛丽”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{“版本”:“1.0.3。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": null, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 24023128, "xmin": null}, "op": "u", "ts_ms": 1465584025523}}

当我们把这个和插入事件中,我们看到了一些不同有效载荷部分:

  • 人事处字段值现在是u,表示该行因更新而更改

  • 之前字段现在拥有数据库提交前包含值的行状态,但仅用于主键列开云体育电动老虎机id.这是因为副本的身份这是默认的默认的

如果我们想查看该行所有列的先前值,我们必须更改客户表首先通过运行更改表客户的副本标识为满

  • 字段现在已经更新了行状态,这里可以看到first_name价值就是现在安妮玛丽

  • 字段结构具有与以前相同的字段,但值不同,因为该事件来自WAL中的不同位置。

  • ts_ms显示了Debezium处理此事件的时间戳。开云体育官方注册网址

通过观察,我们可以学到很多东西有效载荷部分。我们可以比较之前而且结构来确定由于提交而在该行中实际更改的内容。的结构告诉我们关于这个变化的PostgreSQL记录的信息(提供可追溯性),但更重要的是,我们可以将这个信息与这个和其他主题中的其他事件进行比较,以了解这个事件是在之前发生,之后发生,还是作为其他事件的一部分发生在同一个PostgreSQL提交中。

当一行的主键/唯一键的列更新时,行键的值也发生了变化,因此Debezium将输出开云体育官方注册网址三个事件:删除事件和墓碑上的事件使用旧的行键,后面跟着插入事件,使用该行的新键。

删除事件

到目前为止,我们已经看到了样本创建而且更新事件。现在,我们来看看a的值删除事件。再一次,模式的部分值将与创建而且更新事件:

{"schema":{…},“有效载荷”:{“前”:{" id ": 1},“后”:空,“源”:{“版本”:“1.0.3。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": null, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null}, "op": "d", "ts_ms": 1465581902461}}

如果我们看有效载荷部分,我们看到了一些不同的比较创建更新事件的有效载荷:

  • 人事处字段值现在是d,表示该行已被删除

  • 之前字段现在拥有在数据库提交时删除的行状态。开云体育电动老虎机属性,因此只包含主键列副本的身份设置

  • 字段为空,表示该行不再存在

  • 字段结构具有许多与以前相同的值,除了ts_mslsn而且txId领域发生了变化

  • ts_ms显示了Debezium处理此事件的时间戳。开云体育官方注册网址

此事件向使用者提供了用于处理删除该行的各种信息。

请注意没有PK的表,任何来自副本id DEFAULT的表的删除消息将没有之前部分(因为他们没有PK,这是默认身份级别的唯一字段),因此将被跳过,完全为空。为了能够在没有PK的情况下处理来自表的消息,请将REPLICA IDENTITY设置为FULL级别。

PostgreSQL连接器的事件被设计用来处理Kafka对数压缩,只要每个键至少保留最近的消息,就可以删除一些旧消息。这允许Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。

删除一行时,删除上面列出的event值仍然适用于日志压缩,因为Kafka仍然可以删除所有使用相同键的早期消息。但仅当消息值为时卡夫卡会知道它可以移除吗所有消息用同样的钥匙。为了实现这一点,PostgreSQL连接器总是遵循删除特别活动墓碑上具有相同but键的事件价值。

数据类型

如上所述,PostgreSQL连接器用事件表示行更改,事件的结构类似于行所在的表。事件包含每个列值的字段,该值在事件中如何表示取决于列的PostgreSQL数据类型。本节描述此映射。

下表描述了连接器如何将每个PostgreSQL数据类型映射到文字类型而且语义类型在事件字段中。

在这里,文字类型描述了如何使用Kafka Connect模式类型逐字表示值,即INT8INT16INT32INT64FLOAT32FLOAT64布尔字符串字节数组地图,结构体

语义类型描述了Kafka Connect模式如何捕获意义该字段使用Kafka Connect模式的名称。

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

布尔

布尔

N/A

位(1)

布尔

N/A

比特(> 1)有些不同((M))

字节

io.开云体育官方注册网址debezium.data.Bits

长度模式参数包含表示比特数的整数。由此产生的byte []将包含小端序形式的位,并将大小调整为至少包含指定位数(例如,numBytes = n/8 + (n%8== 0 ?0: 1)在哪里n是比特数)。

短整型SMALLSERIAL

INT16

N/A

整数串行

INT32

N/A

长整型数字BIGSERIAL

INT64

N/A

真正的

FLOAT32

N/A

双精度

FLOAT64

N/A

CHAR ((M))

字符串

N/A

VARCHAR ((M))

字符串

N/A

字符((M))

字符串

N/A

性格不同((M))

字符串

N/A

TIMESTAMPTZ带时区的时间戳

字符串

io.开云体育官方注册网址debezium.time.ZonedTimestamp

带有时区信息的时间戳的字符串表示形式,其中时区为GMT

TIMETZ时区时间

字符串

io.开云体育官方注册网址debezium.time.ZonedTime

带时区信息的时间值的字符串表示形式,其中时区为GMT

区间[P]

INT64

io.开云体育官方注册网址debezium.time.MicroDuration
(默认)

类型的时间间隔的大约微秒数365.25 / 12.0每月平均天数公式

区间[P]

字符串

io.开云体育官方注册网址debezium.time.Interval
(当interval.handling.mode设置为字符串

遵循模式的间隔值的字符串表示形式P <年> Y <月> M > <天DT <时间> H <分钟> M <秒>,如。P1Y2M3DT4H5M6.78S

BYTEA

字节

N/A

JSONJSONB

字符串

io.开云体育官方注册网址debezium.data.Json

包含JSON文档、数组或标量的字符串表示形式。

XML

字符串

io.开云体育官方注册网址debezium.data.Xml

包含XML文档的字符串表示形式

UUID

字符串

io.开云体育官方注册网址debezium.data.Uuid

包含PostgreSQL UUID值的字符串表示形式

结构体

io.开云体育官方注册网址debezium.data.geometry.Point

包含一个带2的结构FLOAT64字段,(x, y)-每个代表一个几何点的坐标

LTREE

字符串

io.开云体育官方注册网址debezium.data.Ltree

包含PostgreSQL LTREE值的字符串表示形式

CITEXT

字符串

N/A

INET

字符串

N/A

INT4RANGE

字符串

N/A

整数范围

INT8RANGE

字符串

N/A

bigint的范围

NUMRANGE

字符串

N/A

数值范围

TSRANGE

字符串

N/A

包含不带时区的时间戳范围的字符串表示形式。

TSTZRANGE

字符串

N/A

包含具有(本地系统)时区的时间戳范围的字符串表示形式。

DATERANGE

字符串

N/A

包含日期范围的字符串表示形式。它总是有一个排他的上界。

枚举

字符串

io.开云体育官方注册网址debezium.data.Enum

包含PostgreSQL ENUM值的字符串表示形式。允许的值集在名为的模式参数中维护允许

其他数据类型映射将在下面几节中描述。

时间值

除了PostgreSQL的TIMESTAMPTZ而且TIMETZ数据类型(包含时区信息),其他时态类型取决于time.precision.mode配置属性。当time.precision.mode配置属性设置为自适应(默认值),那么连接器将根据列的数据类型定义确定时态类型的文字类型和语义类型,以便事件完全表示数据库中的值:开云体育电动老虎机

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

日期

INT32

io.开云体育官方注册网址debezium.time.Date

表示自epoch以来的天数。

时间(1)时间(2)(3)

INT32

io.开云体育官方注册网址debezium.time.Time

表示午夜过后的毫秒数,不包括时区信息。

(4)时间(5)(6)

INT64

io.开云体育官方注册网址debezium.time.MicroTime

表示午夜过后的微秒数,不包括时区信息。

时间戳(1)时间戳(2)时间戳(3)

INT64

io.开云体育官方注册网址debezium.time.Timestamp

表示过去纪元的毫秒数,不包括时区信息。

时间戳(4)时间戳(5)时间戳(6)时间戳

INT64

io.开云体育官方注册网址debezium.time.MicroTimestamp

表示经过epoch的微秒数,不包括时区信息。

time.precision.mode配置属性设置为adaptive_time_microseconds,然后连接器将根据列的数据类型定义确定时态类型的文字类型和语义类型,以便事件完全表示数据库中的值,除了所有的TIME字段将被捕获为微秒:开云体育电动老虎机

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

日期

INT32

io.开云体育官方注册网址debezium.time.Date

表示自epoch以来的天数。

时间([P])

INT64

io.开云体育官方注册网址debezium.time.MicroTime

表示以微秒为单位的时间值,不包括时区信息。PostgreSQL允许精度P在0-6的范围内存储到微秒精度。

时间戳(1)时间戳(2)时间戳(3)

INT64

io.开云体育官方注册网址debezium.time.Timestamp

表示过去纪元的毫秒数,不包括时区信息。

时间戳(4)时间戳(5)时间戳(6)时间戳

INT64

io.开云体育官方注册网址debezium.time.MicroTimestamp

表示经过epoch的微秒数,不包括时区信息。

time.precision.mode配置属性设置为连接,那么连接器将使用预定义的Kafka Connect逻辑类型。当用户只知道内置的Kafka Connect逻辑类型而无法处理可变精度的时间值时,这可能很有用。另一方面,由于PostgreSQL支持微秒精度,由连接器生成的事件使用连接时间精度模式导致精度的损失当数据库列具有开云体育电动老虎机分数秒精度取值大于3:

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

日期

INT32

org.apache.kafka.connect.data.Date

表示自epoch以来的天数。

时间([P])

INT64

org.apache.kafka.connect.data.Time

表示从午夜开始的毫秒数,不包括时区信息。PostgreSQL允许P在0-6范围内存储微秒精度,尽管这种模式会导致精度损失P> 3。

时间戳([P])

INT64

org.apache.kafka.connect.data.Timestamp

表示自epoch以来的毫秒数,不包括时区信息。PostgreSQL允许P在0-6范围内存储微秒精度,尽管这种模式会导致精度损失P> 3。

时间戳

时间戳类型表示不包含时区信息的时间戳。这些列将转换为基于UTC的等效Kafka Connect值。例如时间戳值“2018-06-20 15:13:16.945104”将由io.开云体育官方注册网址debezium.time.MicroTimestamp值为“1529507596945104”(假设time.precision.mode未设置为连接).

注意,运行Kafka Connect和Debezium的JVM的时区不会影响这种转换。开云体育官方注册网址

十进制值

decimal.handling.mode配置属性设置为精确的,那么连接器将使用预定义的Kafka Connectorg.apache.kafka.connect.data.Decimal所有的逻辑类型小数而且数字列。这是默认模式。

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

数字((M [D]))

字节

org.apache.kafka.connect.data.Decimal

按比例缩小的模式参数包含一个整数,表示小数点移位了多少位。

小数((M [D]))

字节

org.apache.kafka.connect.data.Decimal

按比例缩小的模式参数包含一个整数,表示小数点移位了多少位。

这条规则有一个例外。当数字小数类型的使用没有任何比例限制,这意味着来自数据库的值对于每个值都有不同的(变量)比例。开云体育电动老虎机在本例中是类型io.开云体育官方注册网址debezium.data.VariableScaleDecimal使用,并且它包含传输值的值和规模。

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

数字

结构体

io.开云体育官方注册网址debezium.data.VariableScaleDecimal

包含两个字段的结构:规模类型的INT32其中包含传输值的刻度和价值类型的字节以未缩放的形式包含原始值。

小数

结构体

io.开云体育官方注册网址debezium.data.VariableScaleDecimal

包含两个字段的结构:规模类型的INT32其中包含传输值的刻度和价值类型的字节以未缩放的形式包含原始值。

然而,当decimal.handling.mode配置属性设置为,则连接器将表示所有小数而且数字值是Java的双精度值,并按如下方式编码:

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

数字((M [D]))

FLOAT64

小数((M [D]))

FLOAT64

最后一个选项decimal.handling.mode配置属性为字符串.在这种情况下,连接器将表示所有小数而且数字值作为它们的格式化字符串表示,并按照如下方式编码它们:

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

数字((M [D]))

字符串

小数((M [D]))

字符串

PostgreSQL支持(不是数字)的特殊值小数/数字值。只有字符串而且模式能够处理这样的值,将它们编码为任意一种翻倍。南或者字符串常量

HStore值

hstore.handling.mode配置属性设置为json(默认值),连接器将表示所有HSTORE字符串化的JSON值,并按如下方式编码:

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

HSTORE

字符串

io.开云体育官方注册网址debezium.data.Json

示例:使用JSON转换器的输出表示是{\"key ": \"val "}

hstore.handling.mode配置属性设置为地图,则连接器将使用地图所有模式类型HSTORE列。

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

HSTORE

地图

示例:使用JSON转换器的输出表示是{"key": "val"}

域类型

PostgreSQL还支持基于其他底层类型的用户定义类型的概念。当使用此类列类型时,Debezium将基于完整的类型层次结构公开列开云体育官方注册网址的表示形式。

监视使用域类型的列时应该特别注意。

当使用扩展默认数据库类型之一的域类型定义列,并且域类型定义了自定义长度/比例时,生成的模式将继承已定义的长度/比例。开云体育电动老虎机

当使用扩展另一个定义自定义长度/比例的域类型的域类型定义列时,生成的模式将继承已定义的长度/规模,因为PostgreSQL驱动程序的列元数据实现。

网络地址类型

PostgreSQL也有可以存储IPv4、IPv6和MAC地址的数据类型。最好使用这些类型而不是纯文本类型来存储网络地址,因为这些类型提供了输入错误检查和专门的操作符和函数。

PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

INET

字符串

IPv4和IPv6组网

CIDR

字符串

IPv4和IPv6主机和网络

MACADDR

字符串

MAC地址

MACADDR8

字符串

MAC地址为EUI-64格式

PostGIS类型

PostgreSQL连接器也完全支持所有的PostGIS数据类型

PostGIS数据类型 文字类型(模式类型) 语义类型(模式名) 笔记

几何
(平面)

结构体

io.开云体育官方注册网址debezium.data.geometry.Geometry

包含一个具有2个字段的结构

  • srid (INT32)-空间引用系统标识符,定义存储在结构中的几何对象类型

  • wkb(字节)-以众所周知的二进制格式编码的几何对象的二进制表示。请参阅开放地理空间联盟简单特性访问规范格式的详细信息。

地理位置
(球)

结构体

io.开云体育官方注册网址debezium.data.geometry.Geography

包含一个具有2个字段的结构

  • srid (INT32)-空间引用系统标识符,定义存储在结构中的地理对象类型

  • wkb(字节)-以众所周知的二进制格式编码的几何对象的二进制表示。请参阅开放地理空间联盟简单特性访问规范格式的详细信息。

烤的价值观

PostgreSQL对页面大小有硬性限制。这意味着大于约8 KB的值需要使用吐司存储.这将影响来自数据库的复制消息,因为使用TOAST机制存储且未被更改的值不会包含在消息中,除非开云体育电动老虎机它们是表的复制标识的一部分。Debezium没有安全的方法直接从数据库中读取带开云体育官方注册网址外缺失的值,因为这可能会导致竞争条件。开云体育电动老虎机开云体育官方注册网址Debezium因此遵循以下规则来处理烘烤的值:

  • 表与副本标识已满: TOAST列值是之前而且更改事件块与任何其他列一样

  • 表与副本标识默认:当收到更新任何不属于副本标识的TOAST开云体育电动老虎机列值都不会成为该事件的一部分;类似地,当收到一个删除事件中,任何这样的TOAST列都不会是之前块。由于D开云体育官方注册网址ebezium在这种情况下不能安全地提供列值,所以它返回配置选项中定义的占位符值toasted.value.placeholder

有一个与Amazon RDS实例相关的特定问题。wal2json随着时间的推移,插件不断发展,有一些版本提供了带外toast值。Amazon为不同的PostgreSQL版本支持不同版本的插件。请参考亚马逊的文档获取版本间的配套关系。对于一致的烘烤值处理,我们建议

  • 使用pgoutput插件用于PostgreSQL 10+实例

  • include-unchanged-toast = 0对于旧版本的wal2json插件使用slot.stream.params配置选项

部署PostgreSQL连接器

如果您已经安装了动物园管理员卡夫卡,卡夫卡连接,那么使用Debezium开云体育官方注册网址的PostgreSQL连接器就很容易了。只需下载连接器插件存档,将jar文件解压到Kafka Connect环境中,并添加jar文件所在的目录Kafka Connect的plugin.path.重新启动Kafka Connect进程以获取新的jar。

如果你喜欢不可变容器,那就试试吧开云体育官方注册网址Debezium的Docker图片对于Zookeeper, Kafka, PostgreSQL和Kafka Connect, PostgreSQL连接器已经预安装并准备好了。你甚至可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium

示例配置

使用连接器为特定的PostgreSQL服务器或集群产生变更事件:

  1. 安装逻辑解码插件

  2. 配置PostgreSQL服务器支持逻辑复制

  3. 为PostgreSQL连接器创建一个配置文件。

当连接器启动时,它将获取PostgreSQL服务器中数据库的一致快照,并开始流化更改,为每个插入、更新和删除的行生成事件。开云体育电动老虎机您还可以选择为模式和表的一个子集生成事件。可选地忽略、屏蔽或截断敏感、过大或不需要的列。

下面是一个PostgreSQL连接器的配置示例,该连接器监视位于192.168.99.100上5432端口的PostgreSQL服务器fullfillment.通常情况下,您可以在配置Debezium PostgreS开云体育官方注册网址QL连接器. json文件中使用连接器可用的配置属性。

{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.postgresql. postgresconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“5432”,(4)“开云体育电动老虎机数据库。使用r": "postgres",(5)“开云体育电动老虎机数据库。密码”:“postgres”,(6)“开云体育电动老虎机数据库。dbname" : "postgres",(7):开云体育电动老虎机“database.server.name fullfillment”,(8)”表。白名单”:“public.inventory”(9)}}
1 当我们向Kafka Connect服务注册连接器时,连接器的名称。
2 这个PostgreSQL连接器类的名称。
3. PostgreSQL服务器地址。
4 PostgreSQL服务器的端口号。
5 属性的PostgreSQL用户的名称需要的特权
6 属性的PostgreSQL用户的密码需要的特权
7 要连接的PostgreSQL数据库的名称开云体育电动老虎机
8 PostgreSQL服务器/集群的逻辑名称,它形成一个命名空间,用于连接器写入的所有Kafka主题的名称,Kafka Connect模式名称,以及使用Avro连接器时对应的Avro模式的名称空间。
9 连接器将监视的由该服务器托管的所有表的列表。这是可选的,还有其他属性用于列出要从监视中包含或排除的模式和表。

看到连接器属性的完整列表可以在这些配置中指定。

这个配置可以通过POST发送到正在运行的Kafka Connect服务,然后该服务将记录配置并启动一个连接器任务,该任务将连接到PostgreSQL数据库并记录Kafka主题的事件。开云体育电动老虎机

监控

Kafka、Zookeeper和Kafka Connect都内置了对JMX指标的支持。PostgreSQL连接器还发布了许多关于连接器活动的度量,这些活动可以通过JMX进行监视。连接器有两种类型的指标。快照度量可以帮助您监视快照活动,并且在连接器执行快照时可用。流度量帮助您在连接器处理逻辑复制流时监视进度和活动。

快照指标

MBean: 开云体育官方注册网址debezium.postgres: type = connector-metrics上下文=快照,server =<开云体育电动老虎机 database.server.name >
属性名称 类型 描述

LastEvent

字符串

连接器读取的最后一个快照事件。

MilliSecondsSinceLastEvent

自连接器读取并处理最近事件以来的毫秒数。

TotalNumberOfEventsSeen

自上次启动或重置以来,此连接器已看到的事件总数。

NumberOfEventsFiltered

连接器上已配置白名单或黑名单过滤规则过滤的事件个数。

MonitoredTables

string []

连接器监视的表的列表。

QueueTotalCapcity

int

用于在快照和主Kafka Connect循环之间传递事件的队列长度。

QueueRemainingCapcity

int

用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。

TotalTableCount

int

快照中包含的表的总数。

RemainingTableCount

int

快照尚未复制的表数。

SnapshotRunning

布尔

快照是否启动。

SnapshotAborted

布尔

快照是否中止。

SnapshotCompleted

布尔

快照是否完成。

SnapshotDurationInSeconds

快照到目前为止所花费的总秒数,即使没有完成。

RowsScanned

Map < String,长>

映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。

流指标

MBean: 开云体育官方注册网址debezium.postres: type = connector-metrics、上下文=流媒体服务器=<开云体育电动老虎机 database.server.name >
属性名称 类型 描述

LastEvent

字符串

连接器读取的最后一个流事件。

MilliSecondsSinceLastEvent

自连接器读取并处理最近事件以来的毫秒数。

TotalNumberOfEventsSeen

自上次启动或重置以来,此连接器已看到的事件总数。

NumberOfEventsFiltered

连接器上已配置白名单或黑名单过滤规则过滤的事件个数。

MonitoredTables

string []

连接器监视的表的列表。

QueueTotalCapcity

int

用于在streamer和主Kafka Connect循环之间传递事件的队列长度。

QueueRemainingCapcity

int

用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。

连接

布尔

标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机

MilliSecondsBehindSource

从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机

NumberOfCommittedTransactions

提交的已处理事务的数量。

SourceEventPosition

map < string, string >

上次接收事件的坐标。

LastTransactionId

字符串

最后处理的事务的事务标识符。

连接器属性

以下配置属性为要求除非有默认值可用。

财产 默认的 描述

的名字

连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。)

connector.class

连接器的Java类的名称。始终使用值io.开云体育官方注册网址debezium.connector.postgresql.PostgresConnector用于PostgreSQL连接器。

tasks.max

1

应该为此连接器创建的最大任务数。PostgreSQL连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。

plugin.name

decoderbufs

邮差的名字逻辑解码插件安装在服务器上。支持的值为decoderbufswal2jsonwal2json_rdswal2json_streamingwal2json_rds_streaming而且pgoutput

当处理的事务非常大时,可能会出现JSON事务中所有更改的批处理事件将无法放入大小为1gb的硬编码内存缓冲区中。在这种情况下,可以改用所谓的流媒体当事务中的每一个变化都作为一个单独的消息从PostgreSQL发送到Debezium时。开云体育官方注册网址

slot.name

开云体育官方注册网址

Postgres逻辑解码槽的名称,用于从插件和数据库实例中流化更改。开云体育电动老虎机价值观必须符合Postgres复制槽位命名规则状态:“每个复制插槽都有一个名称,可以包含小写字母、数字和下划线。”

slot.drop.on.stop

当连接器有序完成时是否删除逻辑复制插槽。应该只设置为真正的在测试或开发环境中。丢弃插槽允许WAL段被数据库丢弃,因此在重新启动后连接器可能无法从之前中断的WAL位置恢复。开云体育电动老虎机

publication.name

dbz_publication

使用时为流更改而创建的PostgreSQL发布的名称pgoutput

如果该发布还不存在,则在启动时创建所有的表.开云体育官方注册网址然后Debezium将使用它自己的白/黑名单过滤功能,将更改事件限制到特定的表(如果配置了的话)。注意,连接器用户必须具有超级用户权限才能创建该发布,因此通常最好先创建发布。

如果发布已经存在(对于所有表或配置了一个表子集),Debezium将使用定义的发布。开云体育官方注册网址

开云体育电动老虎机database.hostname

PostgreSQL数据库服务器的IP地址或主机名。开云体育电动老虎机

开云体育电动老虎机database.port

5432

PostgreSQL数据库服务器的整型端口号。开云体育电动老虎机

开云体育电动老虎机database.user

连接到PostgreSQL数据库服务器时使用的开云体育电动老虎机PostgreSQL数据库的名称。

开云体育电动老虎机database.password

连接PostgreSQL数据库服务器时使用的密码。开云体育电动老虎机

开云体育电动老虎机database.dbname

用于流化更改的PostgreSQL数据库的名称开云体育电动老虎机

开云体育电动老虎机database.server.name

逻辑名称,用于标识并提供被监控的特定PostgreSQL数据库服务器/集群的名称空间。开云体育电动老虎机逻辑名在所有其他连接器中应该是唯一的,因为它被用作来自该连接器的所有Kafka主题名的前缀。只能使用字母数字字符和下划线。

schema.whitelist

一个可选的、以逗号分隔的正则表达式列表,它匹配要监控的模式名;任何没有包含在白名单中的模式名都将被排除在监控之外。默认情况下,将监视所有非系统模式。不得与schema.blacklist

schema.blacklist

可选的逗号分隔的正则表达式列表,匹配要排除在监视之外的模式名称;黑名单中没有包含的任何模式名都将被监控,系统模式除外。不得与schema.whitelist

table.whitelist

一个可选的逗号分隔的正则表达式列表,它匹配要监控的表的完全限定表标识符;任何不在白名单中的表都将被排除在监控之外。每个标识符都是这样的schemaName的表.默认情况下,连接器将监视每个被监视模式中的每个非系统表。不得与table.blacklist

table.blacklist

一个可选的逗号分隔的正则表达式列表,它与要排除在监视之外的表的完全限定表标识符匹配;任何不在黑名单中的表都将被监控。每个标识符都是这样的schemaName的表.不得与table.whitelist

column.blacklist

一个以逗号分隔的可选正则表达式列表,该列表与应从更改事件消息值中排除的列的完全限定名称匹配。列的完全限定名的格式为schemaName的表columnName

time.precision.mode

自适应

时间、日期和时间戳可以用不同的精度表示,包括:自适应(默认值)根据数据库列的类型,使用毫秒、微秒或纳秒精度值捕获与数据库中完全相同的时间和时间戳值;开云体育电动老虎机adaptive_time_microseconds根据数据库列的类型,使用毫秒、微秒或纳秒精度值捕获数据库中的日期、datetime和时间戳值,但TIME类型字段总是以微秒为单位捕获;开云体育电动老虎机或连接总是使用Kafka Connect内置的时间、日期和时间戳表示来表示时间和时间戳值,无论数据库列的精度如何,它都使用毫秒精度。开云体育电动老虎机看到时间值

decimal.handling.mode

精确的

指定连接器应如何处理的值小数而且数字列:精确的(默认值)精确地表示它们使用java.math.BigDecimal变更事件中以二进制形式表示的值;或表示它们使用值,这可能会导致精度的损失,但将更容易使用。字符串选项将值编码为格式化的字符串,这很容易使用,但关于真实类型的语义信息会丢失。看到十进制值

hstore.handling.mode

地图

指定连接器应如何处理的值hstore列:地图(默认值)表示使用地图;或json表示它们使用json字符串json选项将值编码为格式化的字符串,例如{"key": "val"}.看到HStore值

interval.handling.mode

数字

指定连接器应如何处理的值时间间隔列:数字(默认值)表示使用近似微秒数的间隔;字符串使用字符串模式表示法准确地表示它们P <年> Y <月> M > <天DT <时间> H <分钟> M <秒>,如。P1Y2M3DT4H5M6.78S.看到数据类型

开云体育电动老虎机database.sslmode

禁用

是否使用加密连接到PostgreSQL服务器。选项包括:禁用(默认值)使用未加密的连接;需要使用安全(加密)连接,如果无法建立则失败;verify-ca就像需要但是额外地根据配置的证书颁发机构(CA)证书验证服务器TLS证书,如果没有找到有效的匹配CA证书,则失败;verify-full就像verify-ca但是还要检查服务器证书是否与尝试连接的主机相匹配。看到PostgreSQL文档获取更多信息。

开云体育电动老虎机database.sslcert

包含客户端SSL证书的文件的路径。看到PostgreSQL文档获取更多信息。

开云体育电动老虎机database.sslkey

包含客户端SSL私钥的文件的路径。看到PostgreSQL文档获取更多信息。

开云体育电动老虎机database.sslpassword

从指定的文件中访问客户端私钥的密码开云体育电动老虎机database.sslkey.看到PostgreSQL文档获取更多信息。

开云体育电动老虎机database.sslrootcert

包含对服务器进行验证的根证书的文件的路径。看到PostgreSQL文档获取更多信息。

开云体育电动老虎机database.tcpKeepAlive

启用TCP保持连接探测以验证数据库连接是否仍然活跃。开云体育电动老虎机(默认启用)。看到PostgreSQL文档获取更多信息。

tombstones.on.delete

真正的

控制是否在删除事件之后生成墓碑事件。
真正的删除操作由删除事件和后续的墓碑事件表示。当只发送一个删除事件。
触发墓碑事件(默认行为)允许Kafka在源记录被删除后完全删除与给定键相关的所有事件。

column.truncate.to。长度.chars

N/A

一个以逗号分隔的可选正则表达式列表,它与基于字符的列的完全限定名匹配,如果字段值长于指定的字符数,则这些列的值应在更改事件消息值中被截断。可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName,或开云体育电动老虎机数据库名schemaName的表columnName

column.mask.with。长度.chars

N/A

一个以逗号分隔的可选正则表达式列表,该列表与基于字符的列的完全限定名称匹配,这些列的值应在更改事件消息值中被替换为由指定数量的星号()字符。可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数或零。列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName,或开云体育电动老虎机数据库名schemaName的表columnName

column.propagate.source.type

N/A

一个以逗号分隔的可选正则表达式列表,它与列的完全限定名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模式中。模式参数__开云体育官方注册网址debezium.source.column.type__开云体育官方注册网址debezium.source.column.length而且__开云体育官方注册网址debezium.source.column.scale将分别用于传播原始类型名称和长度(对于变宽类型)。对于接收器数据库中相应列的适当大小很有用。开云体育电动老虎机列的完全限定名的格式为开云体育电动老虎机数据库名的表columnName,或开云体育电动老虎机数据库名schemaName的表columnName

message.key.columns

空字符串

与完全限定的表和列匹配以映射主键的正则表达式的分号列表。
每个项(正则表达式)必须匹配完全限定<完全限定表>:<逗号分隔的列列表>表示自定义键。
完全限定表可以定义为DB_NAME。TABLE_NAMESCHEMA_NAME。TABLE_NAME,取决于具体的连接器。

以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。

财产 默认的 描述

snapshot.mode

最初的

指定在连接器启动时运行快照的条件。默认为最初的,并指定只有在没有为逻辑服务器名记录偏移量时连接器才能运行快照。的总是选项指定连接器在每次启动时运行一个快照。的从来没有选项指定连接永远不应该使用快照,并且在使用逻辑服务器名首次启动时,连接器应该从它最后停止的位置(最后的LSN位置)读取,或者从逻辑复制插槽的视图开始从头读取。的initial_only选项指定连接器应该只获取初始快照,然后停止,而不处理任何后续更改。的出口选项指定数据库快照将基于创建复制槽时的时间点,这是一开云体育电动老虎机种以无锁方式执行快照的好方法。最后,如果设置为自定义然后用户还必须设置snapshot.custom.class的自定义实现是io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter接口。看到快照

snapshot.custom.class

类的实现的完整java类名io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter接口。仅用于以下情况snapshot.mode是*自定义

snapshot.lock.timeout.ms

10000

正整数值,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果在此时间间隔内无法获取表锁,则快照将失败。看到快照

snapshot.select.statement.overrides

控制快照中将包括表中的哪些行。
此属性包含以逗号分隔的全限定表列表(DB_NAME.TABLE_NAME).在进一步的配置属性中为各个表指定选择语句,每个表一个,由id标识snapshot.select.statement.overrides。[DB_NAME] [TABLE_NAME]。.这些属性的值是在快照期间从特定表检索数据时使用的SELECT语句。对于大型只能追加的表,一个可能的用例是设置一个特定的开始(恢复)快照的点,以防之前的快照被中断。
说明:该设置仅对快照有影响。逻辑解码器生成的事件完全不受其影响。

event.processing.failure.handling.mode

失败

指定连接器在事件处理期间应如何对异常作出反应。失败将传播异常(指示有问题事件的偏移量),导致连接器停止。
警告将导致跳过有问题的事件并记录有问题事件的偏移量。
跳过将导致跳过有问题的事件。

max.queue.size

20240

正整数值,指定通过流复制接收到的更改事件写入Kafka之前放入的阻塞队列的最大大小。例如,当写入Kafka较慢或Kafka不可用时,该队列可以提供反压力。

max.batch.size

10240

正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。

poll.interval.ms

1000

正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。

include.unknown.datatypes

当Debe开云体育官方注册网址zium遇到数据类型未知的字段时,默认情况下,该字段将从更改事件中忽略,并记录警告。在某些情况下,包含字段并将其以不透明的二进制表示形式发送到下游客户端,以便客户端自己解码可能更可取。设置为将未知数据从事件和真正的以二进制格式保存。

客户端存在向后兼容性问题。不仅数据库特定的二进制表示在不同版开云体育电动老虎机本之间可能会发生变化,而且当数据类型最终被Debezium支持时,它将以逻辑类型的形式发送到下游,这需要消费者进行调整。开云体育官方注册网址一般情况下,当遇到不支持的数据类型时,请提交特性请求,以便添加支持。

开云体育电动老虎机database.initial.statements

当建立到数据库的JDBC连接(不是事务日志读取连接)时,将执行的SQL语句的分号列表。开云体育电动老虎机使用双分号(';;')将分号用作字符而不是分隔符。

连接器可以根据自己的判断建立JDBC连接,因此这通常只用于配置会话参数,而不是用于执行DML语句。

heartbeat.interval.ms

0

控制心跳消息的发送频率。
此属性包含以毫秒为单位的间隔,该间隔定义连接器向心跳主题发送消息的频率。这可用于监视连接器是否仍在接收来自数据库的更改事件。开云体育电动老虎机在较长一段时间内只更改非捕获表中的记录的情况下,还应该利用心跳消息。在这种情况下,连接器会继续从数据库中读取日志,但不会向Kafka发出任何更改消息,这反过来意味着不会向Kafka提交偏移量更新。开云体育电动老虎机这将导致WAL文件被数据库保留的时间超过所需的时间(因为连接器实际上已经处理了它们,但从未有机会将最新检开云体育电动老虎机索到的LSN刷新到数据库),还可能导致在连接器重新启动后重新发送更多更改事件。将此参数设置为0完全不发送心跳消息。
默认禁用。

heartbeat.topics.prefix

__开云体育官方注册网址debezium-heartbeat

控制要向其发送心跳消息的主题的命名。
主题根据模式命名< heartbeat.topics.prefix >。< server.name >

schema.refresh.mode

columns_diff

指定触发表的内存模式刷新的条件。

columns_diff(默认值)是最安全的模式,可确保内存模式始终与数据库表的模式保持同步。开云体育电动老虎机

columns_diff_exclude_unchanged_toast指示连接器,如果它与从传入消息派生的模式之间存在差异,则刷新内存中的模式缓存,除非未更改的TOASTable数据完全解释了差异。

如果存在频繁更新的表,其中的TOASTed数据很少出现在这些更新中,则此设置可以显著提高连接器性能。但是,如果从表中删除TOASTable列,则内存模式可能会过时。

snapshot.delay.ms

连接器启动后在快照之前应该等待的间隔(以毫秒为单位);
可用于在启动集群中的多个连接器时避免快照中断,这可能导致连接器的重新平衡。

snapshot.fetch.size

10240

指定在进行快照时应一次性从每个表读取的最大行数。连接器将以这个大小的多个批次读取表内容。默认值为10240。

slot.stream.params

要传递给已配置逻辑解码插件的可选参数列表。例如,在使用wal2json插件时,可以用于启用服务器端表过滤。允许的值取决于所选插件,用分号分隔。例如,添加表= public.table public.table2; include-lsn = true

sanitize.field.names

真正的连接器配置显式指定key.convertervalue.converter参数使用Avro,否则默认为

是否对字段名进行消毒以符合Avro命名要求。看到Avro命名欲知详情。

slot.max.retries

6

当尝试失败时,重试连接复制插槽的次数。

slot.retry.delay.ms

10000(10秒)

当连接器连接到复制插槽失败时,重试尝试之间等待的毫秒数。

toasted.value.placeholder

__开云体育官方注册网址debezium_unavailable_value

指定Debezium将提供的常量,以指示原始值是一个烘烤值,而不是数据库提供的值。开云体育官方注册网址开云体育电动老虎机If以十六进制:前缀,期望字符串的其余部分表示十六进制编码的字节。看到部分附加细节。

连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。

一定要咨询卡夫卡的文档Kafka生产者和消费者的所有配置属性。(PostgreSQL连接器使用新的消费者.)

PostgreSQL常见问题

开云体育官方注册网址Debezium是一个分布式系统,它捕获多个上游数据库中的所有更改,并且永远不会错过或丢失事件。开云体育电动老虎机当然,当系统在名义上运行或被仔细管理时,Debezium可以提供开云体育官方注册网址只有一天交付每个变更事件。但是,如果确实发生了错误,那么系统仍然不会丢失任何事件,尽管当它从错误中恢复时,它可能会重复一些更改事件。因此,在这些不正常的情况下,Debezium就像Kafka一样提供了开云体育官方注册网址至少一次变更事件的交付。

本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址

配置和启动错误

连接器将在启动时失败,在日志中报告错误/异常,当连接器的配置无效时,当连接器不能成功地使用指定的连接参数连接到PostgreSQL时,或者当连接器从PostgreSQL WAL中先前记录的位置(通过LSN值)重新启动时,并且PostgreSQL不再有该历史可用时,连接器将停止运行。

在这些情况下,错误将提供有关问题的更多详细信息,并可能提供建议的解决方法。当配置已经更正或PostgreSQL问题已经解决时,连接器可以重新启动。

PostgreSQL不可用

一旦连接器运行,如果它所连接的PostgreSQL服务器由于任何原因变得不可用,连接器将失败并报错,连接器将停止。当服务器可用时,只需重新启动连接器。

PostgreSQL连接器在外部存储最后处理的偏移量(以PostgreSQL的形式)日志序列号值)。一旦连接器重新启动并连接到服务器实例,它将要求服务器继续从该特定偏移量进行流处理。只要Debezium复制插槽保持不变,这个偏移量就始终可用。开云体育官方注册网址永远不要在主服务器上删除复制插槽,否则将丢失数据。有关插槽被移除时的故障情况,请参见下一节。

集群的失败

12, PostgreSQL允许逻辑复制槽仅在主服务器上,这意味着PostgreSQL连接器只能指向数据库集群的活动主节点。开云体育电动老虎机而且复制槽本身不会传播到副本。如果主节点关闭,则只有在提升了一个新的主节点之后(使用逻辑解码插件已经安装)并且在那里创建了复制插槽,可以重新启动连接器并将其指向新的服务器。

关于故障转移有一些非常重要的注意事项,您应该暂停Debezium,直到您可以验证您有一个完整的复制插槽,没有丢失数据。开云体育官方注册网址在故障转移之后,您将错过更改事件,除非您的故障转移管理包括在应用程序被允许写入开云体育官方注册网址主要的您还可能需要在故障转移情况下验证Debezium是否能够读取插槽中的所有更改开云体育官方注册网址在旧的初选失败之前

恢复和验证任何丢失的更改的一种可靠方法(但在管理上很困难)是将失败的主服务器的备份恢复到它发生故障之前的位置,这将允许您检查复制插槽是否有任何未使用的更改。在任何情况下,在允许对新主服务器进行写操作之前,在新主服务器上重新创建复制插槽是至关重要的。

在PostgreSQL社区中有关于一个叫做故障转移槽这将有助于缓解这个问题,但到目前为止12它们尚未得到实施。但是,Postgres 13正在积极开发支持备用逻辑解码,这是实现故障转移的主要需求。你可以在社区的线程

您可以在这里找到有关故障转移插槽概念的更多信息这篇博文

Kafka连接进程优雅地停止

如果Kafka Connect正在以分布式模式运行,并且一个Kafka Connect进程被优雅地停止,那么在关闭该进程之前,Kafka Connect将把该进程的所有连接器任务迁移到该组中的另一个Kafka Connect进程中,新的连接器任务将准确地拾取先前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。

Kafka连接进程崩溃

如果Kafka连接器进程意外停止,那么它正在运行的任何连接器任务显然都将终止,而不会记录它们最近处理的偏移量。当Kafka Connect以分布式模式运行时,它将重新启动其他进程上的连接器任务。但是,PostgreSQL连接器将从上次偏移量开始恢复记录通过较早的流程,这意味着新的替换任务可能会生成一些与崩溃之前处理的相同的更改事件。重复事件的数量将取决于偏移刷新周期和崩溃前的数据更改量。

由于在故障恢复过程中可能会出现重复的事件,因此使用者应该始终预测到某些事件可能会重复。开云体育官方注册网址Debezium的变化是幂等的,所以一系列事件的结果总是相同的状态。

开云体育官方注册网址Debezium还在每个更改事件消息中包含关于事件起源的源特定信息,包括PostgreSQL服务器的事件时间、服务器事务的id以及事务更改写入预写日志中的位置。消费者可以跟踪这些信息(特别是LSN位置),以了解他们是否已经看到了特定的事件。

Kafka不可用

当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。Kafka Connect还会定期记录在这些更改事件中出现的最新偏移量,频率在Kafka Connect worker配置中指定。如果Kafka代理变得不可用,运行连接器的Kafka Connect工作进程将简单地重复尝试重新连接到Kafka代理。换句话说,连接器任务将简单地暂停,直到重新建立连接,此时连接器将完全从它们停止的地方恢复。

连接器停止一段时间

如果连接器被优雅地停止,数据库可以继续使用,任何新的更改都将被记录在PostgreSQL开云体育电动老虎机 WAL中。当连接器重新启动时,它将在上次停止的地方恢复流更改,记录连接器停止时所做的所有更改的更改事件。

一个正确配置的Kafka集群能够处理巨大的吞吐量.Kafka Connect是用Kafka最佳实践编写的,如果有足够的资源,它也能够处理大量的数据库更改事件。开云体育电动老虎机正因为如此,当一个连接器在一段时间后重新启动时,它很可能会赶上数据库,尽管多快将取决于Kafka的能力和性能以及对PostgreSQL中的数据所做的更改量。开云体育电动老虎机