开云体育官方注册网址Debezium连接器的PostgreSQL
开云体育官方注册网址Debezium的PostgreSQL连接器可以监视和记录PostgreSQL数据库模式中的行级更改。开云体育电动老虎机
当它第一次连接到PostgreSQL服务器/集群时,它会读取所有模式的一致快照。当快照完成时,连接器会连续地传输提交到PostgreSQL 9.6或更高版本的更改,并生成相应的插入、更新和删除事件。每个表的所有事件都记录在一个单独的Kafka主题中,应用程序和服务可以很容易地使用它们。
概述
PostgreSQL的逻辑解码特性首次在9.4版中引入,它是一种机制,允许提取提交到事务日志中的更改,并通过类的帮助以用户友好的方式处理这些更改输出插件.这个输出插件必须在运行PostgreSQL服务器之前安装,并与复制插槽一起启用,以便客户端能够使用这些更改。
PostgreSQL连接器包含两个不同的部分,它们一起工作,以便能够读取和处理服务器的更改:
逻辑解码输出插件,必须在PostgreSQL服务器上安装和配置
decoderbufs(由Debezium社区维护,基于Pr开云体育官方注册网址otoBuf)
wal2json(由wal2json社区维护,基于JSON)
pgoutput, PostgreSQL 10+中的标准逻辑解码插件(由Postgres社区维护,由Postgres自己用于逻辑复制);该插件始终存在,这意味着不需要安装其他库,Debezium连接器将直接将原始复制事件流解释为更改事件。开云体育官方注册网址
Java代码(实际的Kafka Connect连接器),它读取所选插件产生的更改,使用PostgreSQL的流复制协议,通过PostgreSQLJDBC驱动程序
连接器然后生成一个更改事件对于接收到的每个行级插入、更新和删除操作,将每个表的所有更改事件记录在单独的Kafka主题中。您的客户端应用程序读取对应于它们感兴趣的数据库表的Kafka主题,并对在这些主题中看到的每个行级事件做出反应。开云体育电动老虎机
PostgreSQL通常会在一段时间后清除WAL段。这意味着连接器不会拥有对数据库所做的所有更改的完整历史。开云体育电动老虎机因此,当PostgreSQL连接器第一次连接到一个特定的PostgreSQL数据库时,它首先执行一个开云体育电动老虎机一致的快照每个数据库模式的。开云体育电动老虎机在连接器完成快照之后,它继续从创建快照的确切位置开始流化更改。通过这种方式,我们从所有数据的一致视图开始,然后继续读取,而不会丢失快照发生时所做的任何更改。
该连接器还具有故障容忍度。当连接器读取更改并产生事件时,它将记录每个事件在预写日志中的位置。如果连接器由于任何原因停止(包括通信故障、网络问题或崩溃),在重新启动时,它只是在上次停止的地方继续读取WAL。这包括快照:如果在连接器停止时快照没有完成,那么在重新启动时它将开始一个新的快照。
连接器的功能依赖于PostgreSQL的逻辑解码特性。请注意连接器也反映了以下限制:
|
开云体育官方注册网址Debezium目前只支持UTF-8字符编码的数据库。开云体育电动老虎机使用单字节字符编码,不可能正确处理包含扩展ASCII码字符的字符串。 |
设置PostgreSQL
在使用PostgreSQL连接器监视在PostgreSQL服务器上提交的更改之前,首先确定您打算使用哪种逻辑解码器方法。如果你计划不要使用原生pgoutput逻辑复制流支持,则需要将逻辑解码插件安装到PostgreSQL服务器中。然后启用复制槽位,并为用户配置足够的权限来执行复制。
请注意,如果您的数据库由服务托管,例开云体育电动老虎机如Heroku Postgres您可能无法安装插件。如果是这样,如果你使用的是PostgreSQL 10+,你可以使用pgoutput解码器支持来监控你的数据库。开云体育电动老虎机如果没有这个选项,您将无法使用Debezium监视数据库。开云体育官方注册网址开云体育电动老虎机
PostgreSQL在亚马逊RDS
可以监控PostgreSQL数据库的运行情况开云体育电动老虎机Amazon RDS.要让它运行,必须满足以下条件
实例参数
rds.logical_replication
设置为1
.验证
wal_level
参数设置为逻辑
通过运行查询显示wal_level
作为数据库主用户;在多区域复制设置中可能不是这样。您不能手动设置此选项,它是(自动改变)当rds.logical_replication
设置为1
.如果wal_level
不是逻辑
在上述更改之后,可能是因为参数组更改导致实例必须重新启动,这将根据维护窗口发生,也可以手动完成。集
plugin.name
开云体育官方注册网址Debezium参数为wal2json
.如果你想使用pgoutput逻辑复制流支持,你可以在PostgreSQL 10+上跳过这个步骤。使用数据开云体育电动老虎机库主帐户进行复制,因为RDS目前不支持设置
复制
另一个帐户的特权。
您应该确保在Amazon RDS上使用最新版本的Postgres 9.6、10或11。否则,可能会安装旧版本的wal2json插件(请参阅官方文件获取安装在Amazon RDS上的确切wal2json版本)。在这种情况下,从数据库接收的复制消息可能不携带关于类型约束的完整信息,如长度或规模或开云体育电动老虎机 截至2019年1月,RDS上的以下Postgres版本附带了wal2json的最新版本,因此应该使用:
|
安装逻辑解码输出插件
也看到PostgreSQL逻辑解码输出插件安装有关设置和测试逻辑解码插件的更详细说明。 |
从Debez开云体育官方注册网址ium 0.10开始,连接器支持使用PostgreSQL 10+逻辑复制流pgoutput.这意味着不再需要逻辑解码输出插件,连接器可以直接从复制流发出更改。 |
从PostgreSQL 9.4开始,读取write-ahead-log更改的唯一方法是首先安装一个逻辑解码输出插件。插件用C语言编写,编译并安装在运行PostgreSQL服务器的机器上。插件使用许多特定于PostgreSQL的api,如PostgreSQL的文档.
PostgreSQL连接器与Debezium支持的逻辑解码插件之一一起工作,对开云体育官方注册网址其中任何一个的更改进行编码Protobuf格式或JSON格式。请参阅所选插件的文档(protobuf,wal2json),以了解插件的需求、限制以及如何编译它。
为了简单起见,Debezium还开云体育官方注册网址提供了一个基于普通PostgreSQL服务器映像的Docker映像,并在其上编译和安装插件。我们建议使用此图像作为安装所需详细步骤的示例。
Debe开云体育官方注册网址zium逻辑解码插件只被安装和测试过Linux机器。对于Windows和其他操作系统,可能需要不同的安装步骤。 |
插件之间的区别
插件的行为在所有情况下都不完全相同。到目前为止,已经确定了这些差异:
Wal2json插件无法处理带引号的标识符(问题)
Wal2json插件不会对没有主键的表发出事件
Wal2json插件不支持特殊值(
南
或∞
)用于浮点类型Wal2json应该与设置
schema.refresh.mode
连接器选项columns_diff_exclude_unchanged_toast
;否则,当接收到包含未更改TOAST列的行的更改事件时,该列的字段将不会包含在发出的更改事件的字段中后
结构。这是因为wal2json的消息不会包含该列的字段。
添加这个的需求在wal2json中被跟踪发行98.参见columns_diff_exclude_unchanged_toast
下面将进一步说明使用它的含义。
Pgoutput插件不会为没有主键的表发出事件
在测试套件中跟踪所有最新的差异Java类.
配置PostgreSQL服务器
如果您正在使用其中一个受支持的逻辑解码插件(即不是pgoutput)并且它已经安装,配置服务器在启动时加载插件:
# MODULES shared_preload_libraries = 'decoderbufs,wal2json'(1)
接下来是配置复制槽,而不管使用的解码器:
#复制wal_level = logical(1)Max_wal_senders = 1(2)Max_replication_slots = 1(3)
1 | 告诉服务器它应该对预写日志使用逻辑解码 |
2 | 告诉服务器它应该使用的最大值1 处理WAL变更的独立进程 |
3. | 告诉服务器它应该允许的最大值1 为流化WAL更改而创建的复制插槽 |
开云体育官方注册网址Debezium使用PostgreSQL的逻辑解码,它使用复制插槽。即使在Debezium中断期间,复制插槽也保证保留Debezium所需的所有WAL。开云体育官方注册网址因此,密切监视复制插槽非常重要,以避免过多的磁盘消耗和其他可能发生的情况,如复制插槽长时间未使用时的编目膨胀。有关更多信息,请参阅官方Postgres文档这个主题.
以防你和一个synchronous_commit
其他设置在
,建议设置wal_writer_delay
到一个值,例如10毫秒,以实现较低的更改事件延迟。否则,它的默认值将被应用,这将增加大约200毫秒的延迟。
我们强烈建议阅读和理解官方文件关于PostgreSQL预写日志的机制和配置。 |
设置权限
接下来,配置一个可以执行复制的数据库开云体育电动老虎机用户。
复制只能由具有相应权限的数据库用户执行,且只能对配置的主机数量进行复制。开云体育电动老虎机
为了赋予用户复制权限,定义一个PostgreSQL角色至少的复制
而且登录
权限。例如:
创建角色名
超级用户默认拥有上述两种角色。 |
最后,配置PostgreSQL服务器,允许在服务器机器和运行PostgreSQL连接器的主机之间进行复制:
本地复制信任(1)主机复制 127.0.0.1/32 trust(2)主机复制::1/128 trust(3)
1 | 告诉服务器允许复制<对于> 本地(即在服务器机器上) |
2 | 告诉服务器允许<对于> 在本地主机 接收复制更改使用IPV4 |
3. | 告诉服务器允许<对于> 在本地主机 接收复制更改使用IPV6 |
看到PostgreSQL文档有关网络掩码的更多信息。 |
支持的PostgreSQL拓扑
PostgreSQL连接器可以与独立的PostgreSQL服务器一起使用,也可以与PostgreSQL服务器集群一起使用。
正如前面提到的一开始, PostgreSQL(所有版本⇐12)只支持逻辑复制槽上主要的
服务器。这意味着PostgreSQL集群中的副本不能配置为逻辑复制,因此Debezium PostgreSQL连接器只能与主服务器连接和通信。开云体育官方注册网址如果此服务器失败,连接器将停止。当群集修复时,如果原始主服务器再次提升到主要的
,连接器可以简单地重新启动。但是,如果是不同的PostgreSQL服务器使用插件和适当的配置被提升为主要的
时,连接器配置必须更改为指向新的主要的
服务器,然后可以重新启动。
WAL磁盘空间消耗
在某些情况下,WAL文件所消耗的PostgreSQL磁盘空间可能会出现峰值或超出正常比例。有三个潜在的原因可以解释这种情况:
开云体育官方注册网址Debezium定期向数据库确认已处理事件的LSN。开云体育电动老虎机这是可见的
confirmed_flush_lsn
在pg_replication_slots
槽表。数据库负开云体育电动老虎机责回收磁盘空间,WAL大小可以从restart_lsn
同一张桌子。如果confirmed_flush_lsn
有规律地增加restart_lsn
滞后时,数据库确实需要回收空开云体育电动老虎机间。磁盘空间通常在批处理块中回收,因此这是预期的行为,用户方面不需要任何操作。被监控的数据库中有很多更新,但只有很少一部分与被监控的表和/或模式相关。开云体育电动老虎机这种情况可以通过使用启用周期性心跳事件来轻松解决
heartbeat.interval.ms
配置选项。PostgreSQL实例包含多个数据库,其中一个是高流量数据库。开云体育电动老虎机开云体育官方注册网址Debezium监视与另一个数据开云体育电动老虎机库相比流量较低的另一个数据库。开云体育官方注册网址然后Debezium不能确认LSN,因为复制插槽工作在每个数据库,并且Debezium没有被调用。开云体育电动老虎机由于WAL由所有数据库共享,它倾向于增长,直到D开云体育电动老虎机ebezium监视的数据库触发事件。开云体育官方注册网址
要克服第三个原因,就必须
使能周期心跳记录生成
heartbeat.interval.ms
配置选项定期从Debezium跟踪的数据库发出更改事件开云体育官方注册网址开云体育电动老虎机
在这种情况下
wal2json
解码器插件,它足以生成空事件。例如,可以通过截断空临时表来实现这一点。对于其他解码器插件,建议创建一个Debezium不监视的补充表。开云体育官方注册网址
然后,一个单独的进程将定期更新表(要么插入一个新事件,要么全部更新同一行)。然后PostgreSQL将调用Debezium, Deb开云体育官方注册网址ezium将确认最新的LSN,并允许数据库回收WAL空间。开云体育电动老虎机
对于使用Postgres的AWS RDS上的用户,在空闲环境中可能会出现与第三个原因类似的情况,因为AWS RDS对自己的系统表的写入对用户来说是不可见的(5分钟)。同样,定期发射事件将解决问题。 |
PostgreSQL连接器是如何工作的
快照
大多数PostgreSQL服务器被配置为不保留WAL段中数据库的完整历史,因此PostgreSQL连接器将无法通过简单地读取WAL来查看数据库的整个历史。开云体育电动老虎机因此,默认情况下,连接器将在第一次启动时执行初始化一致的快照数据库的。开云体育电动老虎机每个快照由以下步骤组成(当使用内置快照模式时,自定义快照模式可能会覆盖这个):
属性启动事务可序列化,只读,可延迟隔离级别,以确保此事务中的所有后续读取都是针对数据的单个一致版本完成的。任何数据的变化,由于后续
插入
,更新
,删除
其他客户端的操作对该事务不可见。获得一个
访问共享模式
锁定每个被监视的表,以确保在快照发生时,任何表都不会发生结构更改。注意,这些锁不阻止表插入
,更新
而且删除
在行动中发生。如果使用导出的快照模式,可以使用无锁快照,则不需要执行此步骤.读取服务器事务日志中的当前位置。
扫描所有数据库表和模式,并生成一开云体育电动老虎机个
读
事件,并将该事件写入相应的特定于表的Kafka主题。提交事务。
在连接器偏移中记录快照的成功完成。
如果连接器失败,重新平衡,或者在步骤1开始后但在步骤6完成之前停止,重新启动连接器将开始一个新的快照。一旦连接器完成了它的初始快照,然后PostgreSQL连接器在步骤3中继续从位置读取流,确保它不会错过任何更新。如果连接器由于任何原因再次停止,在重新启动时,它将从之前停止的地方继续流式传输更改。
第二个快照模式允许连接器执行快照总是.这个行为告诉连接器总是在快照启动时执行快照,并在快照完成后继续按照上述顺序执行步骤3中的更改。这种模式可用于以下情况:已知某些WAL段已被删除且不再可用,或者在新主服务器升级后发生集群故障,这样连接器就不会错过任何可能发生的更改,这些更改可能发生在新主服务器升级后,但在新主服务器上重新启动连接器之前。
第三个快照模式指示连接器从来没有执行快照。当一个新的连接器以这种方式配置时,if将继续从以前存储的偏移量流更改,或者它将从服务器上第一次创建PostgreSQL逻辑复制插槽的时间点开始。注意,只有当您知道所有感兴趣的数据仍然反映在WAL中时,这种模式才有用。
第四种快照模式,最初只有,将执行数据库快照,然后在流处理任开云体育电动老虎机何其他更改之前停止。如果连接器已经启动,但在停止之前没有完成快照,则连接器将重新启动快照进程,并在快照完成后停止。
第五种快照模式,出口,将根据创建复制槽时的时间点执行数开云体育电动老虎机据库快照。这种模式是一种以无锁方式执行快照的好方法。
最后一个快照模式,自定义,允许用户注入自己的实现io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter
接口通过snapshot.custom.class
配置属性,类在你的Kafka Connect集群的类路径(或包含在JAR中,如果使用EmbeddedEngine
).有关详细信息,请参见定制的快照部分。
自定义快照SPI
类的实现可用于更高级的用法io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter
接口。该接口允许控制快照操作的大部分方面,例如是否拍摄快照,以及用于打开快照事务或拍摄锁的选项的方式。
接口的完整API可以在这里看到:
** *快照进程的详细信息。**即:*——应该发生在一个快照* - *应该流发生快照查询应该使用什么* *虽然很多默认快照模式提供Debezium *自定义的实现这个接口可以提供的实现者,*可以提供更高级的功能,如部分快照* *实现者的必须返回true {@link # shouldSnapshot()}或{@link # shouldStream()} *开云体育官方注册网址或适用于两者。*/ @孵化公共接口快照{void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState SlotState);/** * @如果快照需要快照则返回true */ boolean shouldSnapshot();/** * @返回true如果快照在拍摄快照后应该流*/ boolean shouldStream();/** * @return true如果在创建插槽时需要导出快照,则*可作为获取锁的替代*/默认布尔exportSnapshot() {return false;} /** *为指定的表生成一个有效的postgres查询字符串,或一个空的{@link可选}*跳过快照该表(但该表仍将从)** @param tableId表生成一个查询* @返回一个有效的查询字符串,或不跳过快照该表*/可选< string > buildSnapshotQuery(tableId tableId);** @param newSlotInfo如果为快照创建了一个新的slow,它包含来自* the create_replication_slot命令的信息*/ default string snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo){//我们正在使用与pg_backup相同的隔离级别Return " set transaction isolation level SERIALIZABLE, READ ONLY, deferable;";} /** *返回一个SQL语句,用于在快照期间锁定给定的表,如果特定的snapshot *实现需要的话。*/ default可选 snapshotTableLockingStatement(Duration lockTimeout, Set tableIds) {String lineSeparator = System.lineSeparator(); StringBuilder statements = new StringBuilder(); statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); // we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot // this does not prevent writes to the table, but prevents changes to the table's schema.... // DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted tableIds.forEach(tableId -> statements.append("LOCK TABLE ") .append(tableId.toDoubleQuotedString()) .append(" IN ACCESS SHARE MODE;") .append(lineSeparator)); return Optional.of(statements.toString()); } }
所有内置快照模式都是根据这个接口实现的。
流的变化
PostgreSQL连接器通常会花费大量的时间来处理它所连接的PostgreSQL服务器的更改。这个机制依赖于PostgreSQL的复制协议当服务器的事务日志在特定位置(也称为?)提交更改时,客户机可以从服务器接收更改日志序列号
或者简称为LSNs)。
每当服务器提交事务时,一个单独的服务器进程都会从逻辑解码插件.这个函数处理来自事务的更改,将它们转换为特定的格式(在Debezium插件的情况下是Protobuf或JSON),并将它们写入输出流,然后由客户端使用。开云体育官方注册网址
PostgreSQL连接器充当PostgreSQL客户端,当它接收到这些更改时,它将事件转换为Debezium开云体育官方注册网址创建,更新,或删除事件,包括事件的LSN位置。PostgreSQL连接器将这些更改事件转发给Kafka Connect框架(在同一个进程中运行),然后Kafka Connect框架以相同的顺序异步地将它们写入相应的Kafka主题。Kafka Connect使用了这个术语抵消对于Debezium包含在每个事件中的源特定位置信息,Kafka Connect定期记录另一个Kaf开云体育官方注册网址ka主题中最近的偏移量。
当Kafka Connect优雅地关闭时,它会停止连接器,将所有事件刷新到Kafka,并记录从每个连接器接收到的最后偏移量。在重新启动时,Kafka Connect读取每个连接器的最后记录偏移量,并从该点启动连接器。PostgreSQL连接器使用记录在每个更改事件中的LSN作为偏移量,因此在重新启动时,连接器请求PostgreSQL服务器将事件发送到该位置之后。
PostgreSQL连接器检索模式信息,作为逻辑解码器插件发送的事件的一部分。唯一的例外是关于哪些列组成主键的信息,因为该信息是从JDBC元数据(侧通道)获得的。如果一个表的主键定义发生了变化(通过添加、删除或重命名PK列),那么就会存在一个轻微的风险,即来自JDBC的主键信息不会与逻辑解码事件中的更改数据同步,并且将创建具有不一致的键结构的少量消息。如果发生这种情况,则重新启动连接器并重新处理消息将解决问题。为了完全防止这个问题,建议用Debezium同步主键结构的更新,大致使用以下操作顺序:开云体育官方注册网址
|
PostgreSQL 10+逻辑解码支持(pgoutput)
从PostgreSQL 10+开始,引入了一种新的逻辑复制流模式,称为pgoutput.这种逻辑复制流模式是由PostgreSQL原生支持的,这意味着该连接器可以使用复制流,而不需要安装额外的插件。这对于不支持或不允许安装插件的环境尤其有价值。
看到设置PostgreSQL欲知详情。
主题名称
PostgreSQL连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。默认情况下,Kafka主题名为serverName.schemaName.的表在哪里serverName连接器的逻辑名称是否与开云体育电动老虎机database.server.name
配置属性,schemaName发生操作的数据库模式的名称和开云体育电动老虎机的表发生操作的数据库表的名称。开云体育电动老虎机
例如,考虑一个PostgreSQL安装postgres
开云体育电动老虎机数据库和库存
模式,包含四个表:产品
,products_on_hand
,客户
,订单
.如果给监视此数据库的连接器一个逻辑服务器名开云体育电动老虎机实现
,那么连接器将在这四个Kafka主题上产生事件:
fulfillment.inventory.products
fulfillment.inventory.products_on_hand
fulfillment.inventory.customers
fulfillment.inventory.orders
另一方面,如果表不是特定模式的一部分,而是在默认情况下创建的公共
PostgreSQL模式,那么Kafka主题的名字是:
fulfillment.public.products
fulfillment.public.products_on_hand
fulfillment.public.customers
fulfillment.public.orders
元信息
每一个记录
所产生的PostgreSQL连接器,除了开云体育电动老虎机数据库事件,一些元信息,关于事件在服务器上发生的位置,源分区的名称,Kafka主题和事件应该放置的分区的名称:
"sourcePartition": {"server": "fulfillment"}, "sourceOffset": {"lsn": "24023128", "txId": "555", "ts_ms": "1482918357011"}, "kafkaPartition": null
PostgreSQL连接器只使用1个Kafka Connect分区并将生成的事件放入1个Kafka分区。因此得名sourcePartition
将始终默认的名称开云体育电动老虎机database.server.name
属性,而kafkaPartition
有价值零
这意味着连接器不使用特定的Kafka分区。
的sourceOffset
消息的一部分包含有关事件发生的服务器位置的信息:
lsn
表示PostgreSQL日志序列号或抵消
在事务日志中txId
表示引起事件的服务器事务的标识符ts_ms
表示自Unix纪元作为事务提交的服务器时间以来的微秒数
事件
所有由PostgreSQL连接器产生的数据更改事件都有一个键和一个值,尽管键和值的结构依赖于产生更改事件的表(参见主题名称).
从Kafka 0.10开始,Kafka可以选择记录消息键和值时间戳消息被创建(由生产者记录)或被Kafka写入日志的时间。 |
PostgreSQL连接器确保所有Kafka连接模式名是有效的Avro模式名.这意味着逻辑服务器名必须以拉丁字母或下划线开头(例如,[a-z, a-z, _]),逻辑服务器名中的其余字符以及模式和表名中的所有字符必须是拉丁字母、数字或下划线(例如,[a-z, a-z, 0-9,\_])。如果不是,则所有无效字符将自动替换为下划线字符。 当逻辑服务器名、模式名和表名包含其他字符时,这可能导致意外的冲突,并且表全名之间唯一的区分字符无效,因此被下划线取代。 |
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流,这些事件的结构可能会随着时间而改变。对于消费者来说,这可能很难处理,所以Kafka Connect让每个事件都是自包含的。每个消息键和值都有两部分模式而且有效载荷.模式描述有效负载的结构,而有效负载包含实际数据。
更改事件的键
对于给定的表,更改事件的键将具有一个结构,其中包含主键(或唯一键约束)中的每一列的字段副本的身份
设置为完整的
或使用索引
在创建事件时的表上)。
考虑一个客户
表中定义的公共
开云体育电动老虎机数据库模式:
CREATE TABLE customers (id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id));
如果开云体育电动老虎机database.server.name
属性的值PostgreSQL_server
,每变事件为客户
表,当它有这个定义时,将具有相同的键结构,在JSON中是这样的:
{"schema": {"type": "struct", "name": "PostgreSQL_server.public.customers. "关键”、“可选”:假的,“字段”:[{“名称”:“id”、“指数”:“0”、“模式”:{“类型”:“INT32”、“可选”:“假的 " } } ] }, " 有效载荷”:{" id ": " 1 "}}
的模式
密钥部分包含一个Kafka Connect模式,描述密钥部分中的内容,在我们的例子中,这意味着有效载荷
值不是可选的,是由名为PostgreSQL_server.public.customers.Key
,并且有一个名为id
类型的int32
.如果我们看一下键的值有效载荷
字段,我们将看到它确实是一个结构(在JSON中只是一个对象)id
字段,值为1
.
因此,我们将此键解释为描述对象中的行public.customers
表(从命名的连接器输出PostgreSQL_server
),其id
主键列的值为1
.
虽然 |
如果表没有主键或唯一键,则更改事件的键将为空。这是有意义的,因为没有主键或唯一键约束的表中的行不能被唯一标识。 |
更改事件的值
更改事件消息的值稍微复杂一些。像消息键一样,它有一个模式节和有效载荷部分。PostgreSQL连接器产生的每个更改事件值的有效负载部分都有一个信封结构,使用以下字段:
人事处
必选字段,包含描述操作类型的字符串值。PostgreSQL连接器的值为c
对于创建(或插入),u
对于更新,d
对于删除,和r
用于读取(在快照的情况下)。之前
是否是可选字段,如果存在则包含行状态之前事件发生了。该结构将由PostgreSQL_server.public.customers.Value
Kafka Connect模式,其中PostgreSQL_server
对象中的所有行使用连接器public.customers
表格
该字段是否可用在很大程度上取决于副本的身份每个桌子的设置 |
后
是否是可选字段,如果存在则包含行状态后事件发生了。结构的描述是相同的PostgreSQL_server.public.customers.Value
中使用的Kafka Connect模式之前
.源
是一个必填字段,包含描述事件源元数据的结构,在PostgreSQL中包含几个字段:Debezium版本、连接器名称、受影响的数据库名称、模式和表、事件是否是正在进行的快照的一部分以及记录中的相同字段开云体育官方注册网址开云体育电动老虎机元信息部分ts_ms
是可选的,如果存在,则包含连接器处理事件的时间(使用运行Kafka Connect任务的JVM中的系统时钟)。
当然,还有模式事件消息值的一部分包含描述此信封结构及其内嵌套字段的模式。
副本的身份
副本的身份是一个特定于PostgreSQL的表级设置,它决定了可用信息的数量逻辑解码
如果更新
而且删除
事件。更具体地说,这控制了无论何时发生上述事件之一,有关所涉及的表列的先前值的可用信息(如果有的话)。
有4个可能的值副本的身份
:
默认的,
更新
而且删除
事件将仅包含表的主键列的先前值更新
只有主列的值发生了变化没什么- - - - - -
更新
而且删除
事件将不包含关于任何表列上一个值的任何信息完整的,
更新
而且删除
事件将包含表中所有列的先前值指数
索引名称
-更新
而且删除
事件将包含命名的索引定义中包含的列的先前值索引名称
,以防更新
只会显示值已更改的索引列
创建事件
我们来看看a创建事件值可能看起来像客户
表:
{"schema": {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": " postgresql_server .目录。值","field": "before"}, {"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "PostgreSQL_server.inventory.customers。值”、“场”:“后”},{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“弦”、“可选”:假的,“场”:“分贝”},{“类型”:“弦”、“可选”:假的,“场”:“模式”},{“类型”:“弦”、“可选”:假的,“场”:“表”},{“类型”:“int64”、“可选”:真的,“场”:“txId”},{“类型”:“int64”、“可选”:真的,“场”:“lsn”},{“类型”:“int64”、“可选”:真的,“场”:“xmin”}],“可选”:假的,“名字”:“io.debezium.connector.postgresql.Source”、“字段”:“源”},{“类型”:“弦”、“可选”:假的,“场”:“人事处”},{“类型”:“int64”、“可选”:真的,“场”:“ts_ms”}],“可选”:假开云体育官方注册网址的,“名字”:“PostgreSQL_server.inventory.customers。信封”},“有效载荷”:{“前”:空,“后”:{" id ": 1、“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{“版本”:“1.0.3。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": true, "db": "postgres", "schema": "public", "table": "customers", "txId": 555, "lsn": 24023128, "xmin": null}, "op": "c", "ts_ms": 1559033904863}}
如果我们看模式
事件的一部分价值的模式信封的模式源
结构(特定于PostgreSQL连接器并在所有事件中重用),以及表特定的模式之前
而且后
字段。
的模式名称 这意味着当使用Avro转换器时,所产生的Avro模式每个表在每一个逻辑源有自己的演变和历史。 |
如果我们看有效载荷
事件的一部分价值,我们可以看到事件中的信息,即它正在描述创建的行(sinceop = c
),以及后
字段值包含新插入行的'id
,first_name
,last_name
,电子邮件
列。
事件的JSON表示形式似乎比它们描述的行要大得多。这是正确的,因为JSON表示必须包含模式和有效载荷部分信息。 这是可能的,甚至建议使用Avro转换器来大幅减少写入Kafka主题的实际消息的大小。 |
更新事件
的值更新这个表上的Change事件实际上会有完全相同的结果模式,其有效负载的结构相同,但将持有不同的值。这里有一个例子:
{"schema":{…},“有效载荷”:{“前”:{" id ": 1},“后”:{" id ": 1、“first_name”:“安妮玛丽”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{“版本”:“1.0.3。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": null, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 24023128, "xmin": null}, "op": "u", "ts_ms": 1465584025523}}
当我们把这个和插入事件中,我们看到了一些不同有效载荷
部分:
的
人事处
字段值现在是u
,表示该行因更新而更改的
之前
字段现在拥有数据库提交前包含值的行状态,但仅用于主键列开云体育电动老虎机id
.这是因为副本的身份这是默认的默认的
.
如果我们想查看该行所有列的先前值,我们必须更改 |
的
后
字段现在已经更新了行状态,这里可以看到first_name
价值就是现在安妮玛丽
.的
源
字段结构具有与以前相同的字段,但值不同,因为该事件来自WAL中的不同位置。的
ts_ms
显示了Debezium处理此事件的时间戳。开云体育官方注册网址
通过观察,我们可以学到很多东西有效载荷
部分。我们可以比较之前
而且后
结构来确定由于提交而在该行中实际更改的内容。的源
结构告诉我们关于这个变化的PostgreSQL记录的信息(提供可追溯性),但更重要的是,我们可以将这个信息与这个和其他主题中的其他事件进行比较,以了解这个事件是在之前发生,之后发生,还是作为其他事件的一部分发生在同一个PostgreSQL提交中。
当一行的主键/唯一键的列更新时,行键的值也发生了变化,因此Debezium将输出开云体育官方注册网址三个事件: |
删除事件
到目前为止,我们已经看到了样本创建而且更新事件。现在,我们来看看a的值删除事件。再一次,模式
的部分值将与创建而且更新事件:
{"schema":{…},“有效载荷”:{“前”:{" id ": 1},“后”:空,“源”:{“版本”:“1.0.3。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": null, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null}, "op": "d", "ts_ms": 1465581902461}}
如果我们看有效载荷
部分,我们看到了一些不同的比较创建或更新事件的有效载荷:
的
人事处
字段值现在是d
,表示该行已被删除的
之前
字段现在拥有在数据库提交时删除的行状态。开云体育电动老虎机属性,因此只包含主键列副本的身份设置的
后
字段为空,表示该行不再存在的
源
字段结构具有许多与以前相同的值,除了ts_ms
,lsn
而且txId
领域发生了变化的
ts_ms
显示了Debezium处理此事件的时间戳。开云体育官方注册网址
此事件向使用者提供了用于处理删除该行的各种信息。
请注意没有PK的表,任何来自副本id DEFAULT的表的删除消息将没有 |
PostgreSQL连接器的事件被设计用来处理Kafka对数压缩,只要每个键至少保留最近的消息,就可以删除一些旧消息。这允许Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。
删除一行时,删除上面列出的event值仍然适用于日志压缩,因为Kafka仍然可以删除所有使用相同键的早期消息。但仅当消息值为时零
卡夫卡会知道它可以移除吗所有消息用同样的钥匙。为了实现这一点,PostgreSQL连接器总是遵循删除特别活动墓碑上具有相同but键的事件零
价值。
数据类型
如上所述,PostgreSQL连接器用事件表示行更改,事件的结构类似于行所在的表。事件包含每个列值的字段,该值在事件中如何表示取决于列的PostgreSQL数据类型。本节描述此映射。
下表描述了连接器如何将每个PostgreSQL数据类型映射到文字类型而且语义类型在事件字段中。
在这里,文字类型描述了如何使用Kafka Connect模式类型逐字表示值,即INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,布尔
,字符串
,字节
,数组
,地图
,结构体
.
的语义类型描述了Kafka Connect模式如何捕获意义该字段使用Kafka Connect模式的名称。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
的 |
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
带有时区信息的时间戳的字符串表示形式,其中时区为GMT |
|
|
|
带时区信息的时间值的字符串表示形式,其中时区为GMT |
|
|
|
类型的时间间隔的大约微秒数 |
|
|
|
遵循模式的间隔值的字符串表示形式 |
|
|
N/A |
|
|
|
|
包含JSON文档、数组或标量的字符串表示形式。 |
|
|
|
包含XML文档的字符串表示形式 |
|
|
|
包含PostgreSQL UUID值的字符串表示形式 |
|
|
|
包含一个带2的结构 |
|
|
|
包含PostgreSQL LTREE值的字符串表示形式 |
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
整数范围 |
|
|
N/A |
bigint的范围 |
|
|
N/A |
数值范围 |
|
|
N/A |
包含不带时区的时间戳范围的字符串表示形式。 |
|
|
N/A |
包含具有(本地系统)时区的时间戳范围的字符串表示形式。 |
|
|
N/A |
包含日期范围的字符串表示形式。它总是有一个排他的上界。 |
|
|
|
包含PostgreSQL ENUM值的字符串表示形式。允许的值集在名为的模式参数中维护 |
其他数据类型映射将在下面几节中描述。
时间值
除了PostgreSQL的TIMESTAMPTZ
而且TIMETZ
数据类型(包含时区信息),其他时态类型取决于time.precision.mode
配置属性。当time.precision.mode
配置属性设置为自适应
(默认值),那么连接器将根据列的数据类型定义确定时态类型的文字类型和语义类型,以便事件完全表示数据库中的值:开云体育电动老虎机
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示午夜过后的毫秒数,不包括时区信息。 |
|
|
|
表示午夜过后的微秒数,不包括时区信息。 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示经过epoch的微秒数,不包括时区信息。 |
当time.precision.mode
配置属性设置为adaptive_time_microseconds
,然后连接器将根据列的数据类型定义确定时态类型的文字类型和语义类型,以便事件完全表示数据库中的值,除了所有的TIME字段将被捕获为微秒:开云体育电动老虎机
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示以微秒为单位的时间值,不包括时区信息。PostgreSQL允许精度 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示经过epoch的微秒数,不包括时区信息。 |
当time.precision.mode
配置属性设置为连接
,那么连接器将使用预定义的Kafka Connect逻辑类型。当用户只知道内置的Kafka Connect逻辑类型而无法处理可变精度的时间值时,这可能很有用。另一方面,由于PostgreSQL支持微秒精度,由连接器生成的事件使用连接
时间精度模式导致精度的损失当数据库列具有开云体育电动老虎机分数秒精度取值大于3:
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示从午夜开始的毫秒数,不包括时区信息。PostgreSQL允许 |
|
|
|
表示自epoch以来的毫秒数,不包括时区信息。PostgreSQL允许 |
时间戳
值
的时间戳
类型表示不包含时区信息的时间戳。这些列将转换为基于UTC的等效Kafka Connect值。例如时间戳
值“2018-06-20 15:13:16.945104”将由io.开云体育官方注册网址debezium.time.MicroTimestamp
值为“1529507596945104”(假设time.precision.mode
未设置为连接
).
注意,运行Kafka Connect和Debezium的JVM的时区不会影响这种转换。开云体育官方注册网址
十进制值
当decimal.handling.mode
配置属性设置为精确的
,那么连接器将使用预定义的Kafka Connectorg.apache.kafka.connect.data.Decimal
所有的逻辑类型小数
而且数字
列。这是默认模式。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
的 |
|
|
|
的 |
这条规则有一个例外。当数字
或小数
类型的使用没有任何比例限制,这意味着来自数据库的值对于每个值都有不同的(变量)比例。开云体育电动老虎机在本例中是类型io.开云体育官方注册网址debezium.data.VariableScaleDecimal
使用,并且它包含传输值的值和规模。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
包含两个字段的结构: |
|
|
|
包含两个字段的结构: |
然而,当decimal.handling.mode
配置属性设置为双
,则连接器将表示所有小数
而且数字
值是Java的双精度值,并按如下方式编码:
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
||
|
|
最后一个选项decimal.handling.mode
配置属性为字符串
.在这种情况下,连接器将表示所有小数
而且数字
值作为它们的格式化字符串表示,并按照如下方式编码它们:
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
||
|
|
PostgreSQL支持南
(不是数字)的特殊值小数
/数字
值。只有字符串
而且双
模式能够处理这样的值,将它们编码为任意一种翻倍。南
或者字符串常量南
.
HStore值
当hstore.handling.mode
配置属性设置为json
(默认值),连接器将表示所有HSTORE
字符串化的JSON值,并按如下方式编码:
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
示例:使用JSON转换器的输出表示是 |
当hstore.handling.mode
配置属性设置为地图
,则连接器将使用地图
所有模式类型HSTORE
列。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
示例:使用JSON转换器的输出表示是 |
域类型
PostgreSQL还支持基于其他底层类型的用户定义类型的概念。当使用此类列类型时,Debezium将基于完整的类型层次结构公开列开云体育官方注册网址的表示形式。
监视使用域类型的列时应该特别注意。 当使用扩展默认数据库类型之一的域类型定义列,并且域类型定义了自定义长度/比例时,生成的模式将继承已定义的长度/比例。开云体育电动老虎机 当使用扩展另一个定义自定义长度/比例的域类型的域类型定义列时,生成的模式将不继承已定义的长度/规模,因为PostgreSQL驱动程序的列元数据实现。 |
网络地址类型
PostgreSQL也有可以存储IPv4、IPv6和MAC地址的数据类型。最好使用这些类型而不是纯文本类型来存储网络地址,因为这些类型提供了输入错误检查和专门的操作符和函数。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
IPv4和IPv6组网 |
|
|
|
IPv4和IPv6主机和网络 |
|
|
|
MAC地址 |
|
|
|
MAC地址为EUI-64格式 |
PostGIS类型
PostgreSQL连接器也完全支持所有的PostGIS数据类型
PostGIS数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
包含一个具有2个字段的结构
|
|
|
|
包含一个具有2个字段的结构
|
烤的价值观
PostgreSQL对页面大小有硬性限制。这意味着大于约8 KB的值需要使用吐司存储.这将影响来自数据库的复制消息,因为使用TOAST机制存储且未被更改的值不会包含在消息中,除非开云体育电动老虎机它们是表的复制标识的一部分。Debezium没有安全的方法直接从数据库中读取带开云体育官方注册网址外缺失的值,因为这可能会导致竞争条件。开云体育电动老虎机开云体育官方注册网址Debezium因此遵循以下规则来处理烘烤的值:
表与
副本标识已满
: TOAST列值是之前
而且后
更改事件块与任何其他列一样表与
副本标识默认
:当收到更新
任何不属于副本标识的TOAST开云体育电动老虎机列值都不会成为该事件的一部分;类似地,当收到一个删除
事件中,任何这样的TOAST列都不会是之前
块。由于D开云体育官方注册网址ebezium在这种情况下不能安全地提供列值,所以它返回配置选项中定义的占位符值toasted.value.placeholder
.
有一个与Amazon RDS实例相关的特定问题。
|
部署PostgreSQL连接器
如果您已经安装了动物园管理员,卡夫卡,卡夫卡连接,那么使用Debezium开云体育官方注册网址的PostgreSQL连接器就很容易了。只需下载连接器插件存档,将jar文件解压到Kafka Connect环境中,并添加jar文件所在的目录Kafka Connect的plugin.path.重新启动Kafka Connect进程以获取新的jar。
如果你喜欢不可变容器,那就试试吧开云体育官方注册网址Debezium的Docker图片对于Zookeeper, Kafka, PostgreSQL和Kafka Connect, PostgreSQL连接器已经预安装并准备好了。你甚至可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium.
示例配置
使用连接器为特定的PostgreSQL服务器或集群产生变更事件:
安装逻辑解码插件
配置PostgreSQL服务器支持逻辑复制
为PostgreSQL连接器创建一个配置文件。
当连接器启动时,它将获取PostgreSQL服务器中数据库的一致快照,并开始流化更改,为每个插入、更新和删除的行生成事件。开云体育电动老虎机您还可以选择为模式和表的一个子集生成事件。可选地忽略、屏蔽或截断敏感、过大或不需要的列。
下面是一个PostgreSQL连接器的配置示例,该连接器监视位于192.168.99.100上5432端口的PostgreSQL服务器fullfillment
.通常情况下,您可以在配置Debezium PostgreS开云体育官方注册网址QL连接器. json
文件中使用连接器可用的配置属性。
{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.postgresql. postgresconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“5432”,(4)“开云体育电动老虎机数据库。使用r": "postgres",(5)“开云体育电动老虎机数据库。密码”:“postgres”,(6)“开云体育电动老虎机数据库。dbname" : "postgres",(7):开云体育电动老虎机“database.server.name fullfillment”,(8)”表。白名单”:“public.inventory”(9)}}
1 | 当我们向Kafka Connect服务注册连接器时,连接器的名称。 |
2 | 这个PostgreSQL连接器类的名称。 |
3. | PostgreSQL服务器地址。 |
4 | PostgreSQL服务器的端口号。 |
5 | 属性的PostgreSQL用户的名称需要的特权. |
6 | 属性的PostgreSQL用户的密码需要的特权. |
7 | 要连接的PostgreSQL数据库的名称开云体育电动老虎机 |
8 | PostgreSQL服务器/集群的逻辑名称,它形成一个命名空间,用于连接器写入的所有Kafka主题的名称,Kafka Connect模式名称,以及使用Avro连接器时对应的Avro模式的名称空间。 |
9 | 连接器将监视的由该服务器托管的所有表的列表。这是可选的,还有其他属性用于列出要从监视中包含或排除的模式和表。 |
看到连接器属性的完整列表可以在这些配置中指定。
这个配置可以通过POST发送到正在运行的Kafka Connect服务,然后该服务将记录配置并启动一个连接器任务,该任务将连接到PostgreSQL数据库并记录Kafka主题的事件。开云体育电动老虎机
监控
Kafka、Zookeeper和Kafka Connect都内置了对JMX指标的支持。PostgreSQL连接器还发布了许多关于连接器活动的度量,这些活动可以通过JMX进行监视。连接器有两种类型的指标。快照度量可以帮助您监视快照活动,并且在连接器执行快照时可用。流度量帮助您在连接器处理逻辑复制流时监视进度和活动。
快照指标
MBean: 开云体育官方注册网址debezium.postgres: type = connector-metrics上下文=快照,server =<开云体育电动老虎机 database.server.name >
属性名称 | 类型 | 描述 |
---|---|---|
|
|
连接器读取的最后一个快照事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
快照中包含的表的总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否启动。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。 |
|
|
映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。 |
流指标
MBean: 开云体育官方注册网址debezium.postres: type = connector-metrics、上下文=流媒体服务器=<开云体育电动老虎机 database.server.name >
属性名称 | 类型 | 描述 |
---|---|---|
|
|
连接器读取的最后一个流事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机 |
|
|
从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机 |
|
|
提交的已处理事务的数量。 |
|
|
上次接收事件的坐标。 |
|
|
最后处理的事务的事务标识符。 |
连接器属性
以下配置属性为要求除非有默认值可用。
财产 | 默认的 | 描述 |
---|---|---|
|
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。) |
|
|
连接器的Java类的名称。始终使用值 |
|
|
|
应该为此连接器创建的最大任务数。PostgreSQL连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。 |
|
|
邮差的名字逻辑解码插件安装在服务器上。支持的值为 当处理的事务非常大时,可能会出现 |
|
|
Postgres逻辑解码槽的名称,用于从插件和数据库实例中流化更改。开云体育电动老虎机价值观必须符合Postgres复制槽位命名规则状态:“每个复制插槽都有一个名称,可以包含小写字母、数字和下划线。” |
|
|
当连接器有序完成时是否删除逻辑复制插槽。应该只设置为 |
|
|
使用时为流更改而创建的PostgreSQL发布的名称 如果该发布还不存在,则在启动时创建所有的表.开云体育官方注册网址然后Debezium将使用它自己的白/黑名单过滤功能,将更改事件限制到特定的表(如果配置了的话)。注意,连接器用户必须具有超级用户权限才能创建该发布,因此通常最好先创建发布。 如果发布已经存在(对于所有表或配置了一个表子集),Debezium将使用定义的发布。开云体育官方注册网址 |
|
PostgreSQL数据库服务器的IP地址或主机名。开云体育电动老虎机 |
|
|
|
PostgreSQL数据库服务器的整型端口号。开云体育电动老虎机 |
|
连接到PostgreSQL数据库服务器时使用的开云体育电动老虎机PostgreSQL数据库的名称。 |
|
|
连接PostgreSQL数据库服务器时使用的密码。开云体育电动老虎机 |
|
|
用于流化更改的PostgreSQL数据库的名称开云体育电动老虎机 |
|
|
逻辑名称,用于标识并提供被监控的特定PostgreSQL数据库服务器/集群的名称空间。开云体育电动老虎机逻辑名在所有其他连接器中应该是唯一的,因为它被用作来自该连接器的所有Kafka主题名的前缀。只能使用字母数字字符和下划线。 |
|
|
一个可选的、以逗号分隔的正则表达式列表,它匹配要监控的模式名;任何没有包含在白名单中的模式名都将被排除在监控之外。默认情况下,将监视所有非系统模式。不得与 |
|
|
可选的逗号分隔的正则表达式列表,匹配要排除在监视之外的模式名称;黑名单中没有包含的任何模式名都将被监控,系统模式除外。不得与 |
|
|
一个可选的逗号分隔的正则表达式列表,它匹配要监控的表的完全限定表标识符;任何不在白名单中的表都将被排除在监控之外。每个标识符都是这样的schemaName.的表.默认情况下,连接器将监视每个被监视模式中的每个非系统表。不得与 |
|
|
一个可选的逗号分隔的正则表达式列表,它与要排除在监视之外的表的完全限定表标识符匹配;任何不在黑名单中的表都将被监控。每个标识符都是这样的schemaName.的表.不得与 |
|
|
一个以逗号分隔的可选正则表达式列表,该列表与应从更改事件消息值中排除的列的完全限定名称匹配。列的完全限定名的格式为schemaName.的表.columnName |
|
|
|
时间、日期和时间戳可以用不同的精度表示,包括: |
|
|
指定连接器应如何处理的值 |
|
|
指定连接器应如何处理的值 |
|
|
指定连接器应如何处理的值 |
|
|
是否使用加密连接到PostgreSQL服务器。选项包括:禁用(默认值)使用未加密的连接;需要使用安全(加密)连接,如果无法建立则失败;verify-ca就像 |
|
包含客户端SSL证书的文件的路径。看到PostgreSQL文档获取更多信息。 |
|
|
包含客户端SSL私钥的文件的路径。看到PostgreSQL文档获取更多信息。 |
|
|
从指定的文件中访问客户端私钥的密码 |
|
|
包含对服务器进行验证的根证书的文件的路径。看到PostgreSQL文档获取更多信息。 |
|
|
启用TCP保持连接探测以验证数据库连接是否仍然活跃。开云体育电动老虎机(默认启用)。看到PostgreSQL文档获取更多信息。 |
|
|
|
控制是否在删除事件之后生成墓碑事件。 |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与基于字符的列的完全限定名匹配,如果字段值长于指定的字符数,则这些列的值应在更改事件消息值中被截断。可以在单个配置中使用具有不同长度的多个属性,尽管每个属性的长度必须为正整数。列的完全限定名的格式为开云体育电动老虎机数据库名.的表.columnName,或开云体育电动老虎机数据库名.schemaName.的表.columnName. |
|
N/A |
一个以逗号分隔的可选正则表达式列表,该列表与基于字符的列的完全限定名称匹配,这些列的值应在更改事件消息值中被替换为由指定数量的星号( |
|
N/A |
一个以逗号分隔的可选正则表达式列表,它与列的完全限定名称相匹配,这些列的原始类型和长度应该作为参数添加到发出的更改消息中的相应字段模式中。模式参数 |
|
空字符串 |
与完全限定的表和列匹配以映射主键的正则表达式的分号列表。 |
以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。
财产 | 默认的 | 描述 | ||
---|---|---|---|---|
|
|
指定在连接器启动时运行快照的条件。默认为最初的,并指定只有在没有为逻辑服务器名记录偏移量时连接器才能运行快照。的总是选项指定连接器在每次启动时运行一个快照。的从来没有选项指定连接永远不应该使用快照,并且在使用逻辑服务器名首次启动时,连接器应该从它最后停止的位置(最后的LSN位置)读取,或者从逻辑复制插槽的视图开始从头读取。的initial_only选项指定连接器应该只获取初始快照,然后停止,而不处理任何后续更改。的出口选项指定数据库快照将基于创建复制槽时的时间点,这是一开云体育电动老虎机种以无锁方式执行快照的好方法。最后,如果设置为自定义然后用户还必须设置 |
||
|
类的实现的完整java类名 |
|||
|
|
正整数值,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果在此时间间隔内无法获取表锁,则快照将失败。看到快照 |
||
|
控制快照中将包括表中的哪些行。 |
|||
|
|
指定连接器在事件处理期间应如何对异常作出反应。 |
||
|
|
正整数值,指定通过流复制接收到的更改事件写入Kafka之前放入的阻塞队列的最大大小。例如,当写入Kafka较慢或Kafka不可用时,该队列可以提供反压力。 |
||
|
|
正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。 |
||
|
|
正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
||
|
|
当Debe开云体育官方注册网址zium遇到数据类型未知的字段时,默认情况下,该字段将从更改事件中忽略,并记录警告。在某些情况下,包含字段并将其以不透明的二进制表示形式发送到下游客户端,以便客户端自己解码可能更可取。设置为
|
||
|
当建立到数据库的JDBC连接(不是事务日志读取连接)时,将执行的SQL语句的分号列表。开云体育电动老虎机使用双分号(';;')将分号用作字符而不是分隔符。
|
|||
|
|
控制心跳消息的发送频率。 |
||
|
|
控制要向其发送心跳消息的主题的命名。 |
||
|
|
指定触发表的内存模式刷新的条件。
如果存在频繁更新的表,其中的TOASTed数据很少出现在这些更新中,则此设置可以显著提高连接器性能。但是,如果从表中删除TOASTable列,则内存模式可能会过时。 |
||
|
连接器启动后在快照之前应该等待的间隔(以毫秒为单位); |
|||
|
|
指定在进行快照时应一次性从每个表读取的最大行数。连接器将以这个大小的多个批次读取表内容。默认值为10240。 |
||
|
要传递给已配置逻辑解码插件的可选参数列表。例如,在使用wal2json插件时,可以用于启用服务器端表过滤。允许的值取决于所选插件,用分号分隔。例如, |
|||
|
|
是否对字段名进行消毒以符合Avro命名要求。看到Avro命名欲知详情。 |
||
|
6 |
当尝试失败时,重试连接复制插槽的次数。 |
||
|
10000(10秒) |
当连接器连接到复制插槽失败时,重试尝试之间等待的毫秒数。 |
||
|
|
指定Debezium将提供的常量,以指示原始值是一个烘烤值,而不是数据库提供的值。开云体育官方注册网址开云体育电动老虎机If以 |
连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。
PostgreSQL常见问题
开云体育官方注册网址Debezium是一个分布式系统,它捕获多个上游数据库中的所有更改,并且永远不会错过或丢失事件。开云体育电动老虎机当然,当系统在名义上运行或被仔细管理时,Debezium可以提供开云体育官方注册网址只有一天交付每个变更事件。但是,如果确实发生了错误,那么系统仍然不会丢失任何事件,尽管当它从错误中恢复时,它可能会重复一些更改事件。因此,在这些不正常的情况下,Debezium就像Kafka一样提供了开云体育官方注册网址至少一次变更事件的交付。
本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址
配置和启动错误
连接器将在启动时失败,在日志中报告错误/异常,当连接器的配置无效时,当连接器不能成功地使用指定的连接参数连接到PostgreSQL时,或者当连接器从PostgreSQL WAL中先前记录的位置(通过LSN值)重新启动时,并且PostgreSQL不再有该历史可用时,连接器将停止运行。
在这些情况下,错误将提供有关问题的更多详细信息,并可能提供建议的解决方法。当配置已经更正或PostgreSQL问题已经解决时,连接器可以重新启动。
PostgreSQL不可用
一旦连接器运行,如果它所连接的PostgreSQL服务器由于任何原因变得不可用,连接器将失败并报错,连接器将停止。当服务器可用时,只需重新启动连接器。
PostgreSQL连接器在外部存储最后处理的偏移量(以PostgreSQL的形式)日志序列号
值)。一旦连接器重新启动并连接到服务器实例,它将要求服务器继续从该特定偏移量进行流处理。只要Debezium复制插槽保持不变,这个偏移量就始终可用。开云体育官方注册网址永远不要在主服务器上删除复制插槽,否则将丢失数据。有关插槽被移除时的故障情况,请参见下一节。
集群的失败
的12
, PostgreSQL允许逻辑复制槽仅在主服务器上,这意味着PostgreSQL连接器只能指向数据库集群的活动主节点。开云体育电动老虎机而且复制槽本身不会传播到副本。如果主节点关闭,则只有在提升了一个新的主节点之后(使用逻辑解码插件已经安装)并且在那里创建了复制插槽,可以重新启动连接器并将其指向新的服务器。
关于故障转移有一些非常重要的注意事项,您应该暂停Debezium,直到您可以验证您有一个完整的复制插槽,没有丢失数据。开云体育官方注册网址在故障转移之后,您将错过更改事件,除非您的故障转移管理包括在应用程序被允许写入开云体育官方注册网址新主要的您还可能需要在故障转移情况下验证Debezium是否能够读取插槽中的所有更改开云体育官方注册网址在旧的初选失败之前.
恢复和验证任何丢失的更改的一种可靠方法(但在管理上很困难)是将失败的主服务器的备份恢复到它发生故障之前的位置,这将允许您检查复制插槽是否有任何未使用的更改。在任何情况下,在允许对新主服务器进行写操作之前,在新主服务器上重新创建复制插槽是至关重要的。
Kafka连接进程优雅地停止
如果Kafka Connect正在以分布式模式运行,并且一个Kafka Connect进程被优雅地停止,那么在关闭该进程之前,Kafka Connect将把该进程的所有连接器任务迁移到该组中的另一个Kafka Connect进程中,新的连接器任务将准确地拾取先前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。
Kafka连接进程崩溃
如果Kafka连接器进程意外停止,那么它正在运行的任何连接器任务显然都将终止,而不会记录它们最近处理的偏移量。当Kafka Connect以分布式模式运行时,它将重新启动其他进程上的连接器任务。但是,PostgreSQL连接器将从上次偏移量开始恢复记录通过较早的流程,这意味着新的替换任务可能会生成一些与崩溃之前处理的相同的更改事件。重复事件的数量将取决于偏移刷新周期和崩溃前的数据更改量。
由于在故障恢复过程中可能会出现重复的事件,因此使用者应该始终预测到某些事件可能会重复。开云体育官方注册网址Debezium的变化是幂等的,所以一系列事件的结果总是相同的状态。 开云体育官方注册网址Debezium还在每个更改事件消息中包含关于事件起源的源特定信息,包括PostgreSQL服务器的事件时间、服务器事务的id以及事务更改写入预写日志中的位置。消费者可以跟踪这些信息(特别是LSN位置),以了解他们是否已经看到了特定的事件。 |
Kafka不可用
当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。Kafka Connect还会定期记录在这些更改事件中出现的最新偏移量,频率在Kafka Connect worker配置中指定。如果Kafka代理变得不可用,运行连接器的Kafka Connect工作进程将简单地重复尝试重新连接到Kafka代理。换句话说,连接器任务将简单地暂停,直到重新建立连接,此时连接器将完全从它们停止的地方恢复。
连接器停止一段时间
如果连接器被优雅地停止,数据库可以继续使用,任何新的更改都将被记录在PostgreSQL开云体育电动老虎机 WAL中。当连接器重新启动时,它将在上次停止的地方恢复流更改,记录连接器停止时所做的所有更改的更改事件。
一个正确配置的Kafka集群能够处理巨大的吞吐量.Kafka Connect是用Kafka最佳实践编写的,如果有足够的资源,它也能够处理大量的数据库更改事件。开云体育电动老虎机正因为如此,当一个连接器在一段时间后重新启动时,它很可能会赶上数据库,尽管多快将取决于Kafka的能力和性能以及对PostgreSQL中的数据所做的更改量。开云体育电动老虎机