开云体育官方注册网址Debezium连接器PostgreSQL
开云体育官方注册网址Debezium的PostgreSQL连接器捕获PostgreSQL数据库模式中的行级更改。开云体育电动老虎机PostgreSQL支持9.6、10、11、12版本。
当连接器第一次连接到PostgreSQL服务器或集群时,它会获取所有模式的一致快照。快照完成后,连接器将持续捕获行级更改,这些更改插入、更新和删除数据库内容,并提交给PostgreSQL数据库。开云体育电动老虎机连接器生成数据更改事件记录,并将其传输到Kafka主题。对于每个表,默认行为是连接器将所有生成的事件流到该表的单独Kafka主题中。应用程序和服务使用来自该主题的数据更改事件记录。
概述
PostgreSQL连接器包含两个主要部分,它们一起工作来读取和处理数据库更改:开云体育电动老虎机
逻辑解码输出插件。您可能需要安装您选择使用的输出插件。在运行PostgreSQL服务器之前,必须配置一个使用所选输出插件的复制插槽。该插件可以是以下插件之一:
decoderbufs
它基于Protobuf,由Debezium社区维护。开云体育官方注册网址wal2json
基于JSON,由wal2json社区维护。pgoutput
是PostgreSQL 10+中标准的逻辑解码输出插件。它由PostgreSQL社区维护,并由PostgreSQL自己用于逻辑复制.该插件始终存在,因此不需要安装其他库。Debe开云体育官方注册网址zium连接器将原始复制事件流直接解释为更改事件。
读取所选逻辑解码输出插件产生的更改的Java代码(实际的Kafka Connect连接器)。它使用PostgreSQL的流复制协议,通过PostgreSQLJDBC驱动程序
连接器将生成更改事件为每个被捕获的行级插入、更新和删除操作,并在单独的Kafka主题中为每个表发送更改事件记录。客户端应用程序读取与感兴趣的数据库表对应的Kafka主题,并可以对从这些主题接收到的每一个行级事件做出反应。开云体育电动老虎机
PostgreSQL通常会在一段时间后清除write-ahead log (WAL)段。这意味着连接器不具有对数据库所做的所有更改的完整历史。开云体育电动老虎机因此,当PostgreSQL连接器第一次连接到一个特定的PostgreSQL数据库时,它首先执行一个开云体育电动老虎机一致的快照每个数据库模式的。开云体育电动老虎机在连接器完成快照之后,它继续从创建快照的确切位置开始流化更改。通过这种方式,连接器以所有数据的一致视图开始,并且不会忽略在拍摄快照时所做的任何更改。
这个连接器容错能力强。当连接器读取更改并产生事件时,它会记录每个事件的WAL位置。如果连接器由于任何原因(包括通信故障、网络问题或崩溃)而停止,重新启动时连接器将继续读取它上次停止的WAL。这包括快照。如果连接器在快照期间停止,则连接器在重新启动时开始一个新的快照。
该连接器依赖并反映了PostgreSQL逻辑解码特性,该特性有以下限制:
出现问题时的行为描述出现问题时连接器的操作。 |
开云体育官方注册网址Debezium目前只支持UTF-8字符编码的数据库。开云体育电动老虎机使用单字节字符编码,不可能正确处理包含扩展ASCII码字符的字符串。 |
连接器如何工作
为了优化配置和运行Debezium PostgreSQL连接器,了开云体育官方注册网址解连接器如何执行快照、流更改事件、确定Kafka主题名称以及使用元数据是很有帮助的。
快照
大多数PostgreSQL服务器被配置为不保留WAL段中数据库的完整历史。开云体育电动老虎机这意味着PostgreSQL连接器将无法通过只读取WAL来查看数据库的整个历史。开云体育电动老虎机因此,连接器第一次启动时执行初始化一致的快照数据库的。开云体育电动老虎机执行快照的默认行为包括以下步骤。属性可以更改此行为snapshot.mode
连接器配置属性到一个值以外最初的
.
属性启动事务可序列化,只读,可延迟隔离级别,以确保此事务中的后续读取针对数据的单个一致版本。任何数据的变化,由于后续
插入
,更新
,删除
其他客户端的操作对该事务不可见。获得一个
访问共享模式
锁定正在跟踪的每个表,以确保在快照发生时,任何表都不会发生结构更改。这些锁不会阻止表插入
,更新
而且删除
快照期间发生的操作。时,省略此步骤
snapshot.mode
设置为出口
,它允许连接器执行无锁快照.读取服务器事务日志中的当前位置。
扫描数据库表和模式开云体育电动老虎机,生成一个
读
事件,并将该事件写入相应的特定于表的Kafka主题。提交事务。
在连接器偏移中记录快照的成功完成。
如果连接器失败,重新平衡,或者在步骤1开始后但在步骤6完成之前停止,则重新启动连接器后开始一个新的快照。在连接器完成它的初始快照之后,PostgreSQL连接器继续从它在第3步中读取的位置进行流处理。这确保连接器不会错过任何更新。如果连接器由于任何原因再次停止,在重新启动时,连接器将继续从之前停止的地方传输更改。
强烈建议您配置一个PostgreSQL连接器来设置 |
设置 | 描述 |
---|---|
|
连接器在启动时总是执行快照。快照完成后,连接器继续按照上述顺序中的步骤3对更改进行流式处理。这种模式在以下情况下很有用:
|
|
连接器从不执行快照。以这种方式配置连接器时,其启动时的行为如下所示。如果Kafka偏移主题中有先前存储的LSN,连接器将从该位置继续流化更改。如果没有存储LSN,连接器将从在服务器上创建PostgreSQL逻辑复制插槽的时间点开始流化更改。的 |
|
连接器执行数据库快照,并在流任何更改事件记录之前停开云体育电动老虎机止。如果连接器已经启动,但在停止之前没有完成快照,则连接器将重新启动快照进程,并在快照完成时停止。 |
|
连接器根据创建复制插槽的时间点执行数据库快照。开云体育电动老虎机这种模式是一种以无锁方式执行快照的好方法。 |
|
的 |
自定义快照SPI
类的实现用于更高级的用途io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter
接口。该接口允许控制连接器如何执行快照的大部分方面。这包括是否获取快照、打开快照事务的选项以及是否获取锁。
下面是该接口的完整API。所有内置快照模式都实现此接口。
/** *此接口用于确定关于快照过程的细节:**即:* -是否应该发生快照* -是否应该发生流* -应该使用哪些查询来快照**虽然Debezium提供了许多默认的快照模式,*实现者可以提供该接口的自定义实现,其中*可以提供更高级的功能,例如部分快照。开云体育官方注册网址* *实现必须为{@link #shouldSnapshot()}或{@link #shouldStream()}中的任何一个返回true,或者两者都返回true。*/ @孵化公共接口快照{void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState SlotState);/** * @如果快照需要快照则返回true */ boolean shouldSnapshot();/** * @返回true如果快照在拍摄快照后应该流*/ boolean shouldStream();/** * @return true如果在创建插槽时需要导出快照,则*可作为获取锁的替代*/默认布尔exportSnapshot() {return false;} /** *为指定的表生成一个有效的postgres查询字符串,或一个空的{@link可选}*跳过快照该表(但该表仍将从)** @param tableId表生成一个查询* @返回一个有效的查询字符串,或不跳过快照该表*/可选< string > buildSnapshotQuery(tableId tableId);** @param newSlotInfo如果为快照创建了一个新的slow,它包含来自* the create_replication_slot命令的信息*/ default string snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo){//我们正在使用与pg_backup相同的隔离级别Return " set transaction isolation level SERIALIZABLE, READ ONLY, deferable;";} /** *返回一个SQL语句,用于在快照期间锁定给定的表,如果特定的snapshot *实现需要的话。*/ default可选 snapshotTableLockingStatement(Duration lockTimeout, Set tableIds) {String lineSeparator = System.lineSeparator(); StringBuilder statements = new StringBuilder(); statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); // we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot // this does not prevent writes to the table, but prevents changes to the table's schema.... // DBZ-298 Quoting name in case it has been quoted originally; it does not do harm if it has not been quoted tableIds.forEach(tableId -> statements.append("LOCK TABLE ") .append(tableId.toDoubleQuotedString()) .append(" IN ACCESS SHARE MODE;") .append(lineSeparator)); return Optional.of(statements.toString()); } }
流的变化
PostgreSQL连接器通常将绝大多数时间用于从所连接的PostgreSQL服务器传输更改。这个机制依赖于PostgreSQL的复制协议.该协议使客户端能够在服务器事务日志的特定位置(称为日志序列号(log Sequence Numbers, lns))提交更改时接收来自服务器的更改。
每当服务器提交事务时,一个单独的服务器进程都会从逻辑解码插件.该函数处理来自事务的更改,将其转换为特定的格式(在Debezium插件的情况下是Protobuf或JSON),并将其写入输出流,然后供客户端使用。开云体育官方注册网址
Debe开云体育官方注册网址zium PostgreSQL连接器充当PostgreSQL客户端。当连接器接收到更改时,它将事件转换为Debezium开云体育官方注册网址创建,更新,或删除事件,包括该事件的LSN。PostgreSQL连接器将这些记录中的更改事件转发给运行在同一进程中的Kafka Connect框架。Kafka Connect进程异步地将更改事件记录按照它们生成的相同顺序写入相应的Kafka主题。
Kafka Connect会定期记录最新的数据抵消在另一个卡夫卡的话题。偏移量表示Debezium包含在每个事件中的特定于源的位置信息。开云体育官方注册网址对于PostgreSQL连接器,记录在每个更改事件中的LSN就是偏移量。
当Kafka Connect优雅地关闭时,它会停止连接器,将所有事件记录刷新到Kafka,并记录从每个连接器接收到的最后偏移量。当Kafka Connect重新启动时,它会读取每个连接器的最后记录偏移量,并在其最后记录偏移量处启动每个连接器。当连接器重新启动时,它向PostgreSQL服务器发送一个请求,以发送刚好在该位置之后开始的事件。
PostgreSQL连接器作为逻辑解码插件发送的事件的一部分检索模式信息。但是,连接器不检索关于哪些列组成主键的信息。连接器从JDBC元数据(侧通道)获得此信息。如果表的主键定义发生了变化(通过添加、删除或重命名主键列),来自JDBC的主键信息与逻辑解码插件生成的更改事件会有一小段时间不同步。在这段很短的时间内,可以使用不一致的密钥结构创建消息。为了防止这种不一致,更新主键结构如下:
|
PostgreSQL 10+逻辑解码支持(pgoutput
)
从PostgreSQL 10+开始,有一种逻辑复制流模式,称为pgoutput
PostgreSQL原生支持的。这意味着Debezium Postg开云体育官方注册网址reSQL连接器可以使用复制流,而不需要额外的插件。这对于不支持或不允许安装插件的环境尤其有价值。
看到设置PostgreSQL欲知详情。
主题名称
PostgreSQL连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。默认情况下,Kafka主题名为serverName.schemaName.的表地点:
serverName连接器的逻辑名称是否与
开云体育电动老虎机database.server.name
连接器配置属性。schemaName发生操作的数据库模式的名称。开云体育电动老虎机
的表发生操作的数据库表的名称。开云体育电动老虎机
例如,假设实现
在PostgreSQL安装中捕获更改的连接器的配置中的逻辑服务器名是postgres
开云体育电动老虎机数据库和库存
模式,包含四个表:产品
,products_on_hand
,客户
,订单
.连接器将流记录到以下四个Kafka主题:
fulfillment.inventory.products
fulfillment.inventory.products_on_hand
fulfillment.inventory.customers
fulfillment.inventory.orders
现在假设这些表不是特定模式的一部分,而是在默认模式中创建的公共
PostgreSQL模式。卡夫卡主题的名称是:
fulfillment.public.products
fulfillment.public.products_on_hand
fulfillment.public.customers
fulfillment.public.orders
元信息
除了开云体育电动老虎机数据库更改事件,由PostgreSQL连接器产生的每条记录都包含一些元数据。元数据包括事件发生在服务器上的位置,源分区的名称,Kafka主题的名称和事件应该去的分区,例如:
"sourcePartition": {"server": "fulfillment"}, "sourceOffset": {"lsn": "24023128", "txId": "555", "ts_ms": "1482918357011"}, "kafkaPartition": null
sourcePartition
的设置始终默认为开云体育电动老虎机database.server.name
连接器配置属性。sourceOffset
包含有关发生事件的服务器位置的信息:lsn
表示PostgreSQL日志序列号或抵消
在事务日志中。txId
表示引起事件的服务器事务的标识符。ts_ms
表示提交事务的服务器时间,格式为自epoch以来的毫秒数。
kafkaPartition
设置为零
意味着连接器不使用特定的Kafka分区。PostgreSQL连接器只使用一个Kafka Connect分区,它将生成的事件放置到一个Kafka分区中。
事务的元数据
开云体育官方注册网址Debezium可以生成表示事务边界的事件,并丰富数据更改事件消息。每笔交易开始
而且结束
, 开云体育官方注册网址Debezium生成一个包含以下字段的事件:
状态
-开始
或结束
id
唯一事务标识符的字符串表示event_count
(结束
事件)——事务发出的事件总数data_collections
(结束
事件)-对的数组data_collection
而且event_count
它提供了由来自给定数据收集的更改所发出的事件的数量
{"status": "BEGIN", "id": "571", "event_count": null, "data_collections": null} {"status": "END", "id": "571", "event_count": 2, "data_collections": [{"data_collection": "s1. 1. "A ", "event_count": 1}, {"data_collection": "s2. "A ", "event_count": 1}]}
事务事件被写入指定的主题开云体育电动老虎机database.server.name.transaction
.
启用事务元数据时,数据消息信封
是充实了新的事务
字段。这个字段以复合字段的形式提供关于每个事件的信息:
id
唯一事务标识符的字符串表示total_order
-该事件在事务生成的所有事件中的绝对位置data_collection_order
-该事件在事务发出的所有事件中的每数据收集位置
下面是一个消息的例子:
{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…},“人事处”:“c”、“ts_ms”:“1580390884335”,“交易”:{" id ": " 571 "、“total_order”:“1”,“data_collection_order”:" 1 "}}
数据变更事件
Debe开云体育官方注册网址zium PostgreSQL连接器为每个行级生成一个数据更改事件插入
,更新
,删除
操作。每个事件包含一个键和一个值。键和值的结构取决于所更改的表。
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。
下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式
字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器,并配置它来生成所有四个基本的变更事件部分,则变更事件具有以下结构:
{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
项 | 字段名 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3. |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始表相同.
从Kafka 0.10开始,Kafka可以选择记录事件键和值时间戳消息被创建(由生产者记录)或被Kafka写入日志的时间。 |
PostgreSQL连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名中的每个剩余字符、模式名和表名中的每个字符必须是拉丁字母、数字或下划线,即a-z、a-z、0-9或\_。如果存在无效字符,则将其替换为下划线字符。 如果逻辑服务器名、模式名或表名包含无效字符,并且唯一区分名称的字符无效,因此用下划线替换,则可能导致意外的冲突。 |
更改事件键
对于给定的表,更改事件的键具有一个结构,该结构在创建事件时包含表主键中每一列的字段。或者,如果表有副本的身份
设置为完整的
或使用索引
每个唯一键约束都有一个字段。
考虑一个客户
表中定义的公共
开云体育电动老虎机数据库模式和该表的更改事件键的示例。
CREATE TABLE customers (id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id));
如果开云体育电动老虎机database.server.name
属性的值PostgreSQL_server
,每变事件为客户
表,虽然它有这个定义有相同的键结构,在JSON中看起来像这样:
{"schema": {(1)"type": "struct", "name": "PostgreSQL_server.public.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){" name ": " id ", "指数":" 0 ","模式":{“类型”:“INT32”、“可选”:“假的 " } } ] }, " 有效载荷":{(5)id: "1"},}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容 |
2 |
|
定义键的有效负载结构的模式的名称。此模式描述已更改表的主键的结构。关键模式名具有该格式connector-name.开云体育电动老虎机数据库名称.表名.
|
3. |
|
指示事件键是否必须在其中包含值 |
4 |
|
属性中期望的每个字段 |
5 |
|
包含为其生成此更改事件的行的键。在本例中,键包含一个单键 |
虽然 |
如果表没有主键或唯一键,则更改事件的键为空。表中没有主键或唯一键约束的行不能被唯一标识。 |
更改事件值
change事件中的值比键稍微复杂一些。和键一样,值也有模式
Section和a有效载荷
部分。的模式
类的模式信封
的结构有效载荷
节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。
考虑用于显示更改事件键示例的同一个示例表:
CREATE TABLE customers (id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id));
表的更改的更改事件的值部分根据副本的身份
设置和事件要用于的操作。
副本的身份
副本的身份是一个特定于postgresql的表级设置,它决定了逻辑解码插件可用的信息量更新
而且删除
事件。更具体地说,是副本的身份
控件可以为所涉及的表列的先前值提供哪些(如果有)信息更新
或删除
事件发生时。
有4个可能的值副本的身份
:
默认的
—默认为此更新
而且删除
如果表有主键,则事件包含该表的主键列的先前值。对于一个更新
事件时,只显示值已更改的主键列。如果表没有主键,则连接器不会发出
更新
或删除
该表的事件。对于没有主键的表,连接器只发出创建事件。通常,没有主键的表用于将消息追加到表的末尾,这意味着更新
而且删除
事件没有用处。没有什么
-为更新
而且删除
操作不包含关于任何表列上一个值的任何信息。完整的
-为更新
而且删除
操作包含表中所有列的先前值。指数
索引名-为更新
而且删除
操作包含指定索引中包含的列的先前值。更新
事件还包含已更新值的索引列。
创建事件
对象中创建数据的操作所生成的更改事件的值部分,示例如下客户
表:
{"schema": {(1)"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "PostgreSQL_server.inventory.customers.Value",(2)"field": "before"}, {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "PostgreSQL_server.inventory.customers. "值”、“场”:“后”},{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“弦”、“可选”:假的,“场”:“分贝”},{“类型”:“弦”、“可选”:假的,“场”:"schema"}, {"type": "string", "optional": false, "field": "table"}, {"type": "int64", "optional": true, "field": "txId"}, {"type": "int64", "optional": true, "field": "lsn"}, {"type": "int64", "optional": true, "field": "xmin"}], "optional": false, "name": "io. debezum .开云体育官方注册网址connector.postgresql. source ",(3)"field": "source"}, {"type": "string", "optional": false, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "PostgreSQL_server.inventory.customers.Envelope"(4)}, "有效载荷":{(5)“之前”:空,(6)"后":{(7)“id”:1、“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{(8)“版本”:“1.2.5。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": true, "db": "postgres", "schema": "public", "table": "customers", "txId": 555, "lsn": 24023128, "xmin": null}, "op": "c",(9)“ts_ms”:1559033904863(10)}}
项 | 字段名 | 描述 | ||
---|---|---|---|---|
1 |
|
值的模式,它描述值的有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值模式都是相同的。 |
||
2 |
|
在 |
||
3. |
|
|
||
4 |
|
|
||
5 |
|
该值为实际数据。这是变更事件提供的信息。 |
||
6 |
|
可选字段,指定事件发生之前行的状态。当
|
||
7 |
|
可选字段,指定事件发生后行的状态。在本例中, |
||
8 |
|
描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:
|
||
9 |
|
返回string,描述导致连接器产生事件的操作类型。在这个例子中,
|
||
10 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
更新事件
样例中更新的更改事件的值客户
表的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。中的更新中连接器生成的事件中的更改事件值的示例客户
表:
{"schema":{…},“有效载荷”:{“前”:{/ / < 1 >“id”:1},“后”:{/ / < 2 >“id”:1、“first_name”:“安妮玛丽”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{/ / < 3 >“版本”:“1.2.5。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": null, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 24023128, "xmin": null}, "op": "u", // <4> "ts_ms": 1465584025523 // <5>}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
可选字段,其中包含数据库提交前行的值。开云体育电动老虎机在本例中,只有主键列, |
2 |
|
可选字段,指定事件发生后行的状态。在本例中, |
3. |
|
描述事件的源元数据的必填字段。的
|
4 |
|
返回string,描述操作类型。在一个更新事件值,则 |
5 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
更新一行的主键/唯一键的列将更改该行键的值。当一个键改变时,Debezium输出开云体育官方注册网址三个事件: |
主键更新
一个更新
更改行的主键字段的操作称为主键更改。对于主键更改,代替发送更新
事件记录时,连接器发送一个删除
事件记录的旧键和创建
新的(更新的)键的事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:
的
删除
事件记录有__开云体育官方注册网址debezium.newkey
作为消息头。此标头的值是更新行的新主键。的
创建
事件记录有__开云体育官方注册网址debezium.oldkey
作为消息头。此标头的值是已更新行的前一个(旧的)主键。
删除事件
的值删除改变事件有相同之处模式
部分为创建而且更新同一表的事件。的有效载荷
的一部分删除事件。客户
表是这样的:
{"schema":{…}, "payload": {"before": {(1)"id": 1}, "after": null,(2)“源”:{(3)“版本”:“1.2.5。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": null, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null}, "op": "d",(4)“ts_ms”:1465581902461(5)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
可选字段,指定事件发生之前行的状态。在一个删除事件值,则 |
2 |
|
可选字段,指定事件发生后行的状态。在一个删除事件值,则 |
3. |
|
描述事件的源元数据的必填字段。在一个删除事件值,则
|
4 |
|
返回string,描述操作类型。的 |
5 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
一个删除更改事件记录为使用者提供了处理删除该行所需的信息。
让消费者能够处理删除为没有主键的表生成的事件,请设置表的主键 |
PostgreSQL连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。
删除一行时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有相同键的消息,消息值必须为零
.为了实现这一点,PostgreSQL连接器遵循一个删除特别活动墓碑上事件,具有相同的键,但零
价值。
数据类型映射
PostgreSQL连接器用事件表示行更改,事件的结构类似于行所在的表。该事件为每个列值包含一个字段。该值在事件中如何表示取决于列的PostgreSQL数据类型。本节描述这些映射。
基本类型
下表描述了连接器如何将基本PostgreSQL数据类型映射到文字类型和一个语义类型在事件字段中。
文字类型描述了如何使用Kafka Connect模式类型逐字表示值:
INT8
,INT16
,INT32
,INT64
,FLOAT32
,FLOAT64
,布尔
,字符串
,字节
,数组
,地图
,结构体
.语义类型描述了Kafka Connect模式如何捕获意义该字段使用Kafka Connect模式的名称。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
的 |
|
|
|
的 |
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
|
|
|
|
带有时区信息的时间戳的字符串表示形式,其中时区为GMT。 |
|
|
|
带时区信息的时间值的字符串表示形式,其中时区为GMT。 |
|
|
|
类型的时间间隔的大约微秒数 |
|
|
|
遵循模式的间隔值的字符串表示形式 |
|
|
N/A |
要么是原始字节(默认值),要么是base64编码的字符串,要么是十六进制编码的字符串,取决于连接器二进制处理模式设置。 |
|
|
|
包含JSON文档、数组或标量的字符串表示形式。 |
|
|
|
包含XML文档的字符串表示形式。 |
|
|
|
包含PostgreSQL UUID值的字符串表示形式。 |
|
|
|
包含包含两个的结构 |
|
|
|
包含PostgreSQL LTREE值的字符串表示形式。 |
|
|
N/A |
|
|
|
N/A |
|
|
|
N/A |
整数范围。 |
|
|
N/A |
范围的 |
|
|
N/A |
范围的 |
|
|
N/A |
包含不带时区的时间戳范围的字符串表示形式。 |
|
|
N/A |
包含带有本地系统时区的时间戳范围的字符串表示形式。 |
|
|
N/A |
包含日期范围的字符串表示形式。它总是有一个排他的上界。 |
|
|
|
包含PostgreSQL的字符串表示形式 |
时间类型
除了PostgreSQL的TIMESTAMPTZ
而且TIMETZ
属性的值决定了数据类型(包含时区信息)如何映射时态类型time.precision.mode
连接器配置属性。下面几节描述这些映射:
time.precision.mode =自适应
当time.precision.mode
属性设置为自适应
,则连接器根据列的数据类型定义确定文字类型和语义类型。这确保了事件完全表示数据库中的值。开云体育电动老虎机
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自纪元以来的天数。 |
|
|
|
表示午夜过后的毫秒数,不包括时区信息。 |
|
|
|
表示午夜过后的微秒数,不包括时区信息。 |
|
|
|
表示从纪元开始的毫秒数,不包括时区信息。 |
|
|
|
表示自纪元以来的微秒数,不包括时区信息。 |
time.precision.mode = adaptive_time_microseconds
当time.precision.mode
配置属性设置为adaptive_time_microseconds
,连接器根据列的数据类型定义确定时态类型的文字类型和语义类型。这确保了事件完全表示数据库中的值,除了所有值开云体育电动老虎机时间
字段以微秒为单位捕获。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示以微秒为单位的时间值,不包括时区信息。PostgreSQL允许精度 |
|
|
|
表示过去纪元的毫秒数,不包括时区信息。 |
|
|
|
表示经过epoch的微秒数,不包括时区信息。 |
time.precision.mode =连接
当time.precision.mode
配置属性设置为连接
,连接器使用Kafka Connect逻辑类型。当消费者只能处理内置的Kafka Connect逻辑类型而不能处理可变精度的时间值时,这可能很有用。然而,由于PostgreSQL支持微秒精度,由连接器生成的事件使用连接
时间精度模式导致精度的损失当数据库列具有开云体育电动老虎机分数秒精度大于3的值。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
表示自epoch以来的天数。 |
|
|
|
表示从午夜开始的毫秒数,不包括时区信息。PostgreSQL允许 |
|
|
|
表示从纪元开始的毫秒数,不包括时区信息。PostgreSQL允许 |
时间戳
类型
的时间戳
类型表示不包含时区信息的时间戳。这些列将转换为基于UTC的等效Kafka Connect值。例如,时间戳
取值“2018-06-20 15:13:16.945104”由io.开云体育官方注册网址debezium.time.MicroTimestamp
值为“1529507596945104”time.precision.mode
未设置为连接
.
运行Kafka Connect和Debezium的JVM的时区不会影响这种转换。开云体育官方注册网址
十进制类型
PostgreSQL连接器配置属性的设置,decimal.handling.mode
确定连接器如何映射十进制类型。
当decimal.handling.mode
属性设置为精确的
,连接器使用Kafka Connectorg.apache.kafka.connect.data.Decimal
所有的逻辑类型小数
而且数字
列。这是默认模式。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
的 |
|
|
|
的 |
这条规则有一个例外。当数字
或小数
类型的使用没有比例限制,来自数据库的值对于每个值都有不同的(变量)比例。开云体育电动老虎机在本例中,连接器使用io.开云体育官方注册网址debezium.data.VariableScaleDecimal
,其中包含转移值的值和比例。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
包含两个字段的结构: |
|
|
|
包含两个字段的结构: |
当decimal.handling.mode
属性设置为双
,连接器表示所有小数
而且数字
将值作为Java的双精度值,并对它们进行编码,如下表所示。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
||
|
|
的最后一种可能设置decimal.handling.mode
配置属性为字符串
.在本例中,连接器表示小数
而且数字
值作为它们的格式化字符串表示,并按照下表所示对它们进行编码。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
||
|
|
PostgreSQL支持南
(不是数字)作为存储的特殊值小数
/数字
的设置时的值decimal.handling.mode
是字符串
或双
.在本例中,连接器进行编码南
作为翻倍。南
或者字符串常数南
.
HSTORE
类型
当hstore.handling.mode
连接器配置属性设置为json
(默认值),连接器表示HSTORE
值作为JSON值的字符串表示,并按照下表所示对它们进行编码。当hstore.handling.mode
属性设置为地图
时,连接器使用地图
的模式类型HSTORE
值。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
示例:使用JSON转换器的输出表示是 |
|
|
示例:使用JSON转换器的输出表示是 |
域类型
PostgreSQL支持基于其他底层类型的用户定义类型。当使用此类列类型时,Debezium将基于完整的类型层次结构公开列开云体育官方注册网址的表示形式。
捕获使用PostgreSQL域类型的列中的更改需要特别考虑。当将列定义为包含扩展默认数据库类型之一的域类型,并且域类型定义了自定义长度或比例时,生成的模式将继承已定义的长度或比例。开云体育电动老虎机 当将列定义为包含扩展另一个定义自定义长度或规模的域类型的域类型时,生成的模式就会这样做不继承已定义的长度或规模,因为这些信息在PostgreSQL驱动的列元数据中不可用。 |
网络地址类型
PostgreSQL拥有可以存储IPv4、IPv6和MAC地址的数据类型。最好使用这些类型而不是纯文本类型来存储网络地址。网络地址类型提供输入错误检查和专门的操作符和函数。
PostgreSQL数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
IPv4和IPv6组网 |
|
|
|
IPv4和IPv6主机和网络 |
|
|
|
MAC地址 |
|
|
|
MAC地址为EUI-64格式 |
PostGIS类型
PostgreSQL连接器支持所有PostGIS数据类型.
PostGIS数据类型 | 文字类型(模式类型) | 语义类型(模式名) | 笔记 |
---|---|---|---|
|
|
|
包含两个字段的结构:
格式请参见开放地理空间联盟简单特性访问规范. |
|
|
|
包含两个字段的结构:
格式请参见开放地理空间联盟简单特性访问规范. |
烤的价值观
PostgreSQL对页面大小有硬性限制。这意味着需要使用link::https://www.postgresql.org/docs/current/storage-toast.html[TOAST存储]来存储大于约8 kb的值。这会影响来自数据库的复制消息。开云体育电动老虎机使用TOAST机制存储且未被更改的值不包含在消息中,除非它们是表的复制标识的一部分。对于Debezium来说,没有安全的方法可以直接从开云体育官方注册网址数据库中读取带外缺失的值,因为这可能会导致竞争条件。开云体育电动老虎机因此,Debezium遵循以开云体育官方注册网址下规则来处理烘烤值:
表与
副本标识已满
- TOAST列值是之前
而且后
更改事件中的字段与任何其他列一样。表与
副本标识默认
-当收到更新
事件中,不属于副本标识的任何未开云体育电动老虎机更改的TOAST列值都不包含在事件中。类似地,当收到一个删除
事件中没有TOAST列(如果有的话)之前
字段。由于D开云体育官方注册网址ebezium在这种情况下不能安全地提供列值,连接器返回一个由连接器配置属性定义的占位符值,toasted.value.placeholder
.
与Amazon RDS实例相关的问题。的
|
设置
在使用PostgreSQL连接器监视PostgreSQL服务器上提交的更改之前,请确定您打算使用哪个逻辑解码插件。如果你计划不要使用本机pgoutput
逻辑复制流支持,则必须将逻辑解码插件安装到PostgreSQL服务器中。然后,启用复制槽位,并为用户配置足够的权限来执行复制。
如果数据库由服务开云体育电动老虎机托管,例如Heroku Postgres您可能无法安装插件。如果是这样,如果你正在使用PostgreSQL 10+,你可以使用pgoutput
解码器支持,以捕获数据库中的更改。开云体育电动老虎机如果不能这样做,则无法将Debezium与数据库一起使用。开云体育官方注册网址开云体育电动老虎机
云中的PostgreSQL
PostgreSQL在亚马逊RDS
捕获正在运行的PostgreSQL数据库中的更改是可能的开云体育电动老虎机Amazon RDS.这样做:
设置实例参数
rds.logical_replication
来1
.验证
wal_level
参数设置为逻辑
通过运行查询显示wal_level
作为数据库主用开云体育电动老虎机户。在多区域复制设置中可能不是这样。不能手动设置此选项。它是自动改变当rds.logical_replication
参数设置为1
.如果wal_level
不是逻辑
在上述更改之后,可能是因为参数组更改导致实例必须重新启动。这取决于您的维护窗口,也可以手动执行。设置Debezi开云体育官方注册网址um
plugin.name
参数wal2json
.如果你打算使用PostgreSQL 10+,你可以跳过这个步骤pgoutput
支持逻辑复制流。使用数据库主帐户开云体育电动老虎机进行复制,因为RDS目前不支持设置
复制
另一个帐户的特权。
确保您在Amazon RDS上使用最新版本的PostgreSQL 9.6、10或11。否则,旧版本的 截至2019年1月,RDS上的以下PostgreSQL版本都带有最新版本的
|
Azure上的PostgreSQL
Debezium可以与 一起使用开云体育官方注册网址Azure数开云体育电动老虎机据库PostgreSQL,支持wal2json
和 pgoutput
插件,Debezium也支持这两个插件。开云体育官方注册网址
将Azure复制支持设置为 逻辑
.您可以使用Azure CLI或者是Azure门户来配置这个。例如,要使用Azure CLI,下面是Az postgres服务器
需要执行的命令:
Az postgres服务器配置集——resource-group mygroup——server-name myserver——name azure。replication_support --value logical az postgres server restart --resource-group mygroup --name myserver
在使用 |
安装逻辑解码输出插件
看到PostgreSQL逻辑解码输出插件安装有关设置和测试逻辑解码插件的更详细说明。 |
从Debez开云体育官方注册网址ium 0.10开始,连接器支持PostgreSQL 10+逻辑复制流 |
从PostgreSQL 9.4开始,读取预写日志更改的唯一方法是安装逻辑解码输出插件。插件是用C语言编写、编译并安装在运行PostgreSQL服务器的机器上的。插件使用许多特定于PostgreSQL的api,如PostgreSQL的文档.
PostgreSQL连接器与Debezium支持的逻辑解码插件之一一起工作,对其中的更开云体育官方注册网址改进行编码Protobuf格式或JSON格式。请参阅所选插件的文档,以了解关于插件的需求、限制以及如何编译它的更多信息。
为了简单起见,Debezium还开云体育官方注册网址提供了一个基于普通PostgreSQL服务器映像的Docker映像,并在其上编译和安装插件。你可以使用这张图片作为安装所需详细步骤的示例。
Debe开云体育官方注册网址zium逻辑解码插件只在Linux机器上安装和测试过。对于Windows和其他操作系统,可能需要不同的安装步骤。 |
插件的差异
插件行为在所有情况下并不完全相同。已经确定了这些差异:
的
wal2json
插件无法处理带引号的标识符(问题)。的
wal2json
而且decoderbufs
插件为没有主键的表发出事件。的
wal2json
插件不支持特殊值,例如南
或∞
,用于浮点类型。的
wal2json
插件应该与schema.refresh.mode
连接器配置属性设置为columns_diff_exclude_unchanged_toast
.否则,当接收到包含未更改项的行的更改事件时烤面包
列,则发出的更改事件的字段中不包含该列的字段后
字段。这是因为wal2json
插件消息不包含用于此类列的字段。添加此功能的需求在
wal2json
发行98.参见columns_diff_exclude_unchanged_toast
下面将进一步说明使用它的含义。的
pgoutput
插件不会对没有主键的表发出所有事件。它只发出事件插入
操作。
在测试套件中跟踪所有最新的差异Java类.
配置PostgreSQL服务器
如果您正在使用其中一个受支持的逻辑解码插件也就是说,不是pgoutput
,并且已经安装,按照如下步骤配置PostgreSQL服务器:
要在启动时加载插件,请将以下内容添加到
postgresql.conf
文件:# MODULES shared_preload_libraries = 'decoderbufs,wal2json'(1)
若要配置复制插槽而不考虑所使用的解码器,请在
postgresql.conf
文件:#复制wal_level = logical(1)Max_wal_senders = 1(2)Max_replication_slots = 1(3)
1 指示服务器对预写日志使用逻辑解码。 2 指示服务器使用的最大值 1
处理WAL变更的单独流程。3. 指示服务器允许的最大值 1
为流式WAL更改而创建的复制插槽。
开云体育官方注册网址Debezium使用PostgreSQL的逻辑解码,它使用复制插槽。复制插槽保证保留Debezium所需的所有WAL段,即使在Debezium中断期间。开云体育官方注册网址因此,密切监视复制插槽非常重要,以避免过多的磁盘消耗和其他可能发生的情况,如复制插槽长时间未使用时的编目膨胀。有关更多信息,请参见PostgreSQL流复制文档.
如果你在和synchronous_commit
其他设置在
,建议设置为wal_writer_delay
到一个值,例如10毫秒,以实现更改事件的低延迟。否则,应用它的默认值,这会增加大约200毫秒的延迟。
阅读和理解关于PostgreSQL预写日志机制和配置的PostgreSQL文档强烈推荐。 |
设置权限
设置PostgreSQL服务器以运行Debezium连接器需要一个能够执行复制的开云体育官方注册网址数据库用户。开云体育电动老虎机复制只能由具有相应权限的数据库用户执行,且只能对配置的主机数量进行复制。开云体育电动老虎机此外,您必须配置PostgreSQL服务器,以允许在服务器机器和运行PostgreSQL连接器的主机之间进行复制。
PostgreSQL管理权限。
若要赋予用户复制权限,请定义具有复制权限的PostgreSQL角色至少的
复制
而且登录
权限。例如:创建角色名
默认情况下,超级用户同时拥有上述两种角色。
配置PostgreSQL服务器,允许在服务器机器和运行PostgreSQL连接器的主机之间进行复制。
pg_hba.conf
文件的例子:本地复制
信任(1)主机复制 127.0.0.1/32 trust(2)主机复制 ::1/128 trust(3) 1 指示服务器允许复制 <对于>
本地,即在服务器机器上。2 指示服务器允许 <对于>
在本地主机
接收复制更改使用IPV4
.3. 指示服务器允许 <对于>
在本地主机
接收复制更改使用IPV6
.
有关网络掩码的详细信息,请参见PostgreSQL文档. |
支持PostgreSQL拓扑
PostgreSQL连接器可以与独立的PostgreSQL服务器一起使用,也可以与PostgreSQL服务器集群一起使用。
正如前面提到的一开始, PostgreSQL(所有版本⇐12)只支持逻辑复制插槽主要的
服务器。这意味着PostgreSQL集群中的副本不能配置为逻辑复制,因此Debezium PostgreSQL连接器只能与主服务器连接和通信。开云体育官方注册网址如果此服务器失败,连接器将停止。当群集修复时,如果原始主服务器再次提升到主要的
,可以重新启动连接器。但是,如果是不同的PostgreSQL服务器使用插件和适当的配置被提升为主要的
时,必须更改连接器配置以指向新的主要的
服务器,然后可以重新启动连接器。
WAL磁盘空间消耗
在某些情况下,WAL文件所消耗的PostgreSQL磁盘空间可能会激增或超出通常的比例。造成这种情况的原因可能有以下几种:
连接器已接收到的数据的LSN在
confirmed_flush_lsn
服务器的列pg_replication_slots
视图。比这个LSN更老的数据不再可用,数据库负责回收磁盘空间。开云体育电动老虎机同样在
pg_replication_slots
看来,restart_lsn
列包含连接器可能需要的最老WAL的LSN。如果的值为confirmed_flush_lsn
是有规律地增加和价值的restart_lsn
滞后,数据库需要回收空间。开云体育电动老虎机数据库通开云体育电动老虎机常以批处理块的形式回收磁盘空间。这是预期的行为,用户不需要做任何操作。
数据库中有许多更新正在被跟踪,但只有极少数更新与连接器正开云体育电动老虎机在为其捕获更改的表和模式相关。这种情况可以通过周期性的心跳事件轻松解决。设置
heartbeat.interval.ms
连接器配置属性。PostgreSQL实例包含多个数据库,其中一个是高流量数据库。开云体育电动老虎机开云体育官方注册网址Debezium捕获与另一个数据库相比流量较低的另一个数据库中的更改。开云体育电动老虎机开云体育官方注册网址然后Debezium不能确认LSN,因为复制插槽工作在每个数据库,并且Debezium没有被调用。开云体育电动老虎机由于WAL由所有数据库共享,因此使用的数量趋于增开云体育电动老虎机长,直到Debezium为其捕获更改的数据库触发事件。开云体育官方注册网址为了克服这一问题,有必要:
命令启用周期心跳记录生成
heartbeat.interval.ms
连接器配置属性。定期从数据库发出更改事件,Debezium正在为其捕获更改。开云体育官方注册网址开云体育电动老虎机
在这种情况下
wal2json
解码器插件,它足以生成空事件。例如,可以通过截断空临时表来实现这一点。对于其他解码器插件,建议创建Debezium不捕获更改的补充表。开云体育官方注册网址
然后,单独的进程通过插入新行或重复更新同一行定期更新表。然后PostgreSQL调用Debezium, 开云体育官方注册网址Debezium确认最新的LSN并允许数据库回收WAL空间。开云体育电动老虎机的方法可以自动执行此任务
heartbeat.action.query
连接器配置属性。
对于使用PostgreSQL的AWS RDS的用户,在空闲环境中可能会出现类似于高流量/低流量场景的情况。AWS RDS导致对其自己的系统表的写入在频繁的基础上(5分钟)对客户端不可见。同样,定期释放事件可以解决问题。 |
部署
与动物园管理员,卡夫卡,卡夫卡连接安装后,部署Debezium PostgreSQL连接器的其余任务是下载开云体育官方注册网址连接器的插件存档,将JAR文件解压缩到Kafka Connect环境中,并将JAR文件的目录添加到卡夫卡连接的plugin.path
.然后需要重新启动Kafka Connect进程来获取新的JAR文件。
如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Zookeeper, Kafka, PostgreSQL和Kafka连接已经安装和准备运行的PostgreSQL连接器。你也可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium.
连接器配置示例
以下是PostgreSQL连接器的配置示例,该连接器连接到PostgreSQL服务器,端口为5432,地址为192.168.99.100,逻辑名为fullfillment
.通常情况下,您可以在配置Debezium PostgreS开云体育官方注册网址QL连接器. json
文件中使用连接器可用的配置属性。
您可以选择为模式和表的一个子集生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。
{"name": "inventory-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.postgresql. postgresconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“5432”,(4)“开云体育电动老虎机数据库。user": "postgres",(5)“开云体育电动老虎机数据库。密码”:“postgres”,(6)“开云体育电动老虎机数据库。dbname" : "postgres",(7):开云体育电动老虎机“database.server.name fullfillment”,(8)”表。白名单”:“public.inventory”(9)}}
1 | 在Kafka Connect服务中注册连接器时的名称。 |
2 | 这个PostgreSQL连接器类的名称。 |
3. | PostgreSQL服务器地址。 |
4 | PostgreSQL服务器的端口号。 |
5 | 属性的PostgreSQL用户的名称需要的特权. |
6 | 属性的PostgreSQL用户的密码需要的特权. |
7 | 要连接的PostgreSQL数据库的名称开云体育电动老虎机 |
8 | PostgreSQL服务器/集群的逻辑名称,它形成一个命名空间,用于连接器写入的所有Kafka主题的名称,Kafka Connect模式名称,以及使用Avro转换器时对应的Avro模式的名称空间。 |
9 | 连接器将监视的由该服务器托管的所有表的列表。这是可选的,还有其他属性用于列出要从监视中包含或排除的模式和表。 |
看到PostgreSQL连接器属性的完整列表可以在这些配置中指定。
您可以使用帖子
命令到正在运行的Kafka Connect服务。该服务记录配置并启动连接到PostgreSQL数据库的连接器任务,并将更改事件记录流到Kafka主题。开云体育电动老虎机
添加连接器配置
要开始运行PostgreSQL连接器,请创建一个连接器配置,并将该配置添加到Kafka Connect集群中。
的PostgreSQL服务器配置为支持逻辑复制。
的逻辑解码插件安装。
完成PostgreSQL连接器的安装。
为PostgreSQL连接器创建一个配置。
使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。
当连接器启动时,它将执行一致性快照配置连接器的PostgreSQL服务器数据库。开云体育电动老虎机然后连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到Kafka主题。
监控
Debe开云体育官方注册网址zium PostgreSQL连接器提供了两种类型的指标,除了Zookeeper、Kafka和Kafka Connect提供的内置JMX指标支持之外。
开云体育官方注册网址Debezium监控文档提供了如何使用JMX公开这些指标的详细信息。
快照指标
的MBean是开云体育官方注册网址debezium.postgres: type = connector-metrics上下文=快照,server =<开云体育电动老虎机 database.server.name >
.
属性 | 类型 | 描述 |
---|---|---|
|
|
连接器读取的最后一个快照事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
快照中包含的表的总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否启动。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。 |
|
|
映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。 |
流指标
的MBean是开云体育官方注册网址debezium.postgres: type = connector-metrics、上下文=流媒体服务器=<开云体育电动老虎机 database.server.name >
.
属性 | 类型 | 描述 |
---|---|---|
|
|
连接器读取的最后一个流事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机 |
|
|
从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机 |
|
|
提交的已处理事务的数量。 |
|
|
上次接收事件的坐标。 |
|
|
最后处理的事务的事务标识符。 |
连接器配置属性
Debe开云体育官方注册网址zium PostgreSQL连接器具有许多配置属性,您可以使用这些属性为应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:
以下配置属性为要求除非有默认值可用。
财产 | 默认的 | 描述 |
---|---|---|
连接器的唯一名称。尝试使用相同的名称再次注册将失败。所有Kafka Connect连接器都需要这个属性。 |
||
连接器的Java类的名称。始终使用值 |
||
|
应该为此连接器创建的最大任务数。PostgreSQL连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。 |
|
|
PostgreSQL的名称逻辑解码插件安装在PostgreSQL服务器上。 支持的值为 如果你正在使用 |
|
|
PostgreSQL逻辑解码槽的名称,它是为特定数据库/模式的特定插件的流更改而创建的。开云体育电动老虎机服务器使用这个插槽将事件流传输到您正在配置的Debezium连接器。开云体育官方注册网址 槽位名称必须符合PostgreSQL复制槽命名规则,这些州:“每个复制插槽都有一个名称,可以包含小写字母、数字和下划线。” |
|
|
当连接器以预期的优雅方式停止时,是否删除逻辑复制插槽。默认行为是,当连接器停止时,复制插槽仍然为连接器配置。当连接器重新启动时,具有相同的复制插槽可以使连接器从停止的地方开始处理。 设置为 |
|
|
使用时为流更改而创建的PostgreSQL发布的名称 如果该发布还不存在,则在启动时创建所有的表.开云体育官方注册网址然后Debezium应用它自己的白名单/黑名单过滤(如果配置了的话),以限制发布更改感兴趣的特定表的事件。连接器用户必须具有超级用户权限才能创建此发布,因此通常最好在第一次启动连接器之前创建发布。 如果发布已经存在,无论是针对所有表还是配置了表的一个子集,Debezium将使用定义好的发布。开云体育官方注册网址 |
|
PostgreSQL数据库服务器的IP地址或主机名。开云体育电动老虎机 |
||
|
PostgreSQL数据库服务器的整型端口号。开云体育电动老虎机 |
|
连接PostgreSQL数据库服务器的用户名。开云体育电动老虎机 |
||
连接PostgreSQL数据库服务器时使用的密码。开云体育电动老虎机 |
||
用于流化更改的PostgreSQL数据库的名称。开云体育电动老虎机 |
||
逻辑名称,用于标识特定PostgreSQL数据库服务器或Debezium在其中捕获更改的集群,并为其提供名称空间。开云体育官方注册网址开云体育电动老虎机在数据库服务器逻辑名中只能使用字母数字字符和下划线。开云体育电动老虎机逻辑名在所有其他连接器中应该是唯一的,因为它被用作从该连接器接收记录的所有Kafka主题的主题名前缀。 |
||
一个可选的、以逗号分隔的正则表达式列表,该列表与您要使用的模式名称匹配想要捕获变更。任何没有包含在白名单中的模式名都不会被捕获其更改。默认情况下,捕获所有非系统模式的更改。不也定了吗 |
||
一个可选的、以逗号分隔的正则表达式列表,该列表与您要使用的模式名称匹配不想要捕捉变化。黑名单中没有包含任何模式的名称都将捕获其更改,系统模式除外。不也定了吗 |
||
一个可选的、以逗号分隔的正则表达式列表,它匹配您想要捕获其更改的表的完全限定表标识符。没有包含在白名单中的任何表的更改都不会被捕获。每个标识符都是这样的schemaName.的表.默认情况下,连接器捕获每个模式中每个非系统表中的更改,这些表的更改正在被捕获。不也定了吗 |
||
一个可选的、以逗号分隔的正则表达式列表,它为您更改的表匹配完全限定的表标识符不想要捕捉。没有包含在黑名单中的任何表的更改都被捕获。每个标识符都是这样的schemaName.的表.不也定了吗 |
||
一个可选的、以逗号分隔的正则表达式列表,它与应该包含在变更事件记录值中的列的完全限定名称相匹配。列的完全限定名的格式为schemaName.的表.columnName.不也定了吗 |
||
一个可选的、以逗号分隔的正则表达式列表,它匹配应从更改事件记录值中排除的列的完全限定名称。列的完全限定名的格式为schemaName.的表.columnName.不也定了吗 |
||
|
时间、日期和时间戳可以用不同的精度表示: |
|
|
指定连接器应如何处理的值 |
|
|
指定连接器应如何处理的值 |
|
|
指定连接器应如何处理的值 |
|
|
是否使用加密连接到PostgreSQL服务器。选项包括: |
|
包含客户端SSL证书的文件的路径。看到PostgreSQL文档获取更多信息。 |
||
包含客户端SSL私钥的文件的路径。看到PostgreSQL文档获取更多信息。 |
||
从指定的文件中访问客户端私钥的密码 |
||
包含用于验证服务器的根证书的文件的路径。看到PostgreSQL文档获取更多信息。 |
||
|
启用TCP保持连接探测,以验证数据库连接是否仍然活跃。开云体育电动老虎机看到PostgreSQL文档获取更多信息。 |
|
|
属性之后是否生成墓碑事件删除事件。 |
|
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。列的完全限定名的格式为schemaName.的表.columnName.在更改事件记录中,如果这些列中的值长于指定的字符数,则将被截断长度属性名。您可以在单个配置中指定具有不同长度的多个属性。长度必须是正整数,例如, |
|
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。列的完全限定名的格式为schemaName.的表.columnName.在更改事件值中,指定表列中的值将被替换为长度星号数目( |
|
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。列的完全限定名的格式为schemaName.的表.columnName.在更改事件值中,指定列中的值将替换为假名。 |
|
N/A |
一个可选的、以逗号分隔的正则表达式列表,它与列的完全限定名匹配。列的完全限定名的格式为开云体育电动老虎机数据库名.的表.columnName,或开云体育电动老虎机数据库名.schemaName.的表.columnName. |
|
N/A |
一个可选的、以逗号分隔的正则表达式列表,用于匹配某些列的特定于数据库的数据类型名称。开云体育电动老虎机完全限定数据类型名称的格式为开云体育电动老虎机数据库名.的表.typeName,或开云体育电动老虎机数据库名.schemaName.的表.typeName. |
|
空字符串 |
使用正则表达式匹配表列名的分号分隔的表列表。连接器将匹配列中的值映射到它发送给Kafka主题的更改事件记录中的关键字段。当一个表没有主键,或者当你想根据一个不是主键的字段来排序Kafka主题中的更改事件记录时,这是很有用的。 |
|
all_tables |
仅当流通过使用发生变化时应用的 |
|
字节 |
指定二进制( |
以下先进的配置属性具有在大多数情况下都适用的默认值,因此很少需要在连接器的配置中指定。
财产 | 默认的 | 描述 | ||
---|---|---|---|---|
|
指定连接器启动时执行快照的条件: |
|||
类的实现的完整Java类名 |
||||
|
正整数值,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果连接器在此时间间隔内无法获取表锁,则快照失败。连接器如何执行快照提供细节。 |
|||
控制快照中包含哪些表行。此属性仅影响快照。它不会影响逻辑解码插件生成的事件。在表单中指定以逗号分隔的全限定表名列表开云体育电动老虎机databaseName.tableName. |
||||
|
指定连接器在事件处理过程中应该如何对异常做出反应: |
|||
|
阻塞队列的最大大小为正整数值。连接器将从流复制接收到的更改事件放在阻塞队列中,然后再将它们写入Kafka。这个队列可以提供反压力,例如,写入Kafka的记录比它应该的要慢,或者Kafka不可用。 |
|||
|
正整数值,指定连接器处理的每批事件的最大大小。 |
|||
|
正整数值,指定连接器在开始处理一批事件之前等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|||
|
指定连接器遇到数据类型未知的字段时的连接器行为。默认行为是连接器从更改事件中省略该字段并记录警告。
|
|||
一个以分号分隔的SQL语句列表,连接器在建立到数据库的JDBC连接时执行这些语句。开云体育电动老虎机要使用分号作为字符而不是分隔符,请指定两个连续的分号, |
||||
|
控制连接器向Kafka主题发送心跳消息的频率。默认行为是连接器不发送心跳消息。 |
|||
|
控制连接器向其发送心跳消息的主题的名称。主题名称有这样的模式: |
|||
指定当连接器发送心跳消息时连接器在源数据库上执行的查询。开云体育电动老虎机 |
||||
|
指定触发表的内存模式刷新的条件。 |
|||
在连接器启动时执行快照之前,连接器应等待的毫秒间隔。如果在集群中启动多个连接器,此属性有助于避免快照中断,因为快照中断可能导致连接器的重新平衡。 |
||||
|
在快照期间,连接器以行为单位读取表内容。此属性指定批处理中的最大行数。 |
|||
以分号分隔的参数列表,以传递给配置的逻辑解码插件。例如, 如果您正在使用 |
||||
|
指示是否对字段名进行清除以遵守Avro命名要求. |
|||
|
如果连接到复制槽位失败,这是连续尝试连接的最大次数。 |
|||
|
当连接器连接到复制插槽失败时,重试尝试之间等待的毫秒数。 |
|||
|
指定连接器提供的常量,以指示原始值是数据库不提供的烘烤值。开云体育电动老虎机如果设置为 |
|||
|
确定连接器是否生成具有事务边界的事件,并使用事务元数据丰富更改事件信封。指定 |
连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。
出现问题时的行为
开云体育官方注册网址Debezium是一个分布式系统,可以捕获多个上游数据库中的所有更改;开云体育电动老虎机它从不错过或丢失任何事件。当系统正常运行或被仔细管理时,Debezium提供开云体育官方注册网址只有一天交付每个变更事件记录。
如果确实发生了故障,则系统不会丢失任何事件。然而,当它从错误中恢复时,它可能会重复一些更改事件。在这些不正常的情况下,Debezium就像Kafka一样提供开云体育官方注册网址了帮助至少一次变更事件的交付。
本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址
配置和启动错误
在以下情况下,连接器在尝试启动时失败,在日志中报告错误/异常,并停止运行:
连接器的配置无效。
连接器无法通过指定的连接参数成功连接到PostgreSQL。
连接器正在从PostgreSQL WAL中先前记录的位置重新启动(通过使用LSN),并且PostgreSQL不再有可用的历史。
在这些情况下,错误消息有关于问题的详细信息,可能还有建议的解决方案。在纠正配置或解决PostgreSQL问题后,重新启动连接器。
PostgreSQL不可用
当连接器正在运行时,它所连接的PostgreSQL服务器可能由于各种原因变得不可用。如果发生这种情况,连接器将失败并报错并停止。当服务器再次可用时,重新启动连接器。
PostgreSQL连接器在外部以PostgreSQL LSN的形式存储最后处理的偏移量。在连接器重新启动并连接到服务器实例之后,连接器与服务器通信以继续从该特定偏移量进行流处理。只要Debezium复制插槽保持不变,这个偏移量就可用。开云体育官方注册网址永远不要删除主服务器上的复制插槽,否则将丢失数据。有关插槽被移除的故障案例,请参见下一节。
集群的失败
从发行版12开始,PostgreSQL允许逻辑复制插槽仅在主服务器上.这意味着您可以将Debezium PostgreSQL连接器仅指开云体育官方注册网址向数据库集群的活动主服务器。开云体育电动老虎机而且,复制槽本身不会传播到副本。如果主服务器宕机,则必须提升一个新的主服务器。
新的主要必须有逻辑解码插件已安装和一个复制槽,该复制槽已配置为供插件和您希望捕获更改的数据库使用。开云体育电动老虎机只有这样,您才能将连接器指向新服务器并重新启动连接器。
当发生故障转移时,有一些重要的注意事项,您应该暂停Debezium,直到您可以验证您有一个完整的复制插槽,没有丢失数据。开云体育官方注册网址故障转移后:
在允许应用程序写入数据之前,必须有一个进程重新创建Debezium复制插槽开云体育官方注册网址新主要的这一点至关重要。如果没有这个过程,您的应用程序可能会错过更改事件。
您可能需要验证Debezium是否能够读取插槽中的所有更改开云体育官方注册网址在旧的初选失败之前.
恢复和验证是否丢失任何更改的一种可靠方法是将失败的主服务器的备份恢复到它发生故障之前的位置。虽然这在管理上很困难,但它允许您检查复制插槽是否有任何未使用的更改。
在PostgreSQL社区中有关于一个叫做 关于故障转移插槽概念的更多信息已经出现这篇博文. |
Kafka Connect进程优雅地停止
假设Kafka Connect正在以分布式模式运行,并且Kafka Connect进程优雅地停止。在关闭该进程之前,Kafka Connect会将该进程的连接器任务迁移到该组中的另一个Kafka Connect进程。新的连接器任务开始处理之前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。
Kafka连接进程崩溃
如果Kafka连接器进程意外停止,任何正在运行的连接器任务都将终止,而不记录最近处理的偏移量。当Kafka Connect以分布式模式运行时,Kafka Connect会重新启动其他进程上的连接器任务。然而,PostgreSQL连接器从上次偏移量开始恢复记录通过早期的过程。这意味着新的替换任务可能会生成一些在崩溃之前处理的相同的更改事件。重复事件的数量取决于偏移刷新周期和崩溃前更改的数据量。
由于在故障恢复过程中可能会出现重复事件,因此使用者应该始终预测到一些重复事件。开云体育官方注册网址Debezium的变化是幂等的,所以一系列事件的结果总是相同的状态。
在每个更改事件记录中,Debezium连接器插入有关事件起开云体育官方注册网址源的源特定信息,包括PostgreSQL服务器的事件时间、服务器事务的ID以及写入事务更改的预写日志中的位置。使用者可以跟踪此信息,特别是LSN,以确定某个事件是否重复。
Kafka不可用
当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。定期地,在Kafka Connect配置中指定的频率,Kafka Connect会记录在这些更改事件中出现的最新偏移量。如果Kafka代理变得不可用,运行连接器的Kafka Connect进程会反复尝试重新连接到Kafka代理。换句话说,连接器任务将暂停,直到重新建立连接,此时连接器将从停止的位置恢复。
连接器停止一段时间
如果连接器正常停止,则可以继续使用数据库。开云体育电动老虎机任何更改都记录在PostgreSQL WAL中。当连接器重新启动时,它将从中断的地方恢复流更改。也就是说,它为连接器停止时所做的所有数据库更改生成更改事件记录。开云体育电动老虎机
一个正确配置的Kafka集群能够处理大量的吞吐量。Kafka Connect是根据Kafka最佳实践编写的,如果有足够的资源,Kafka Connect连接器也可以处理大量的数据库更改事件。开云体育电动老虎机因此,在停止一段时间后,当Debezium连接器重新启动时,它很可能会赶上在停止时所做的数据库更改。开云体育官方注册网址开云体育电动老虎机这个过程有多快取决于Kafka的能力和性能,以及对PostgreSQL中的数据所做的修改量。