开云体育官方注册网址Debezium连接器PostgreSQL

Debe开云体育官方注册网址zium PostgreSQL连接器捕获PostgreSQL数据库模式中的行级更改。开云体育电动老虎机有关与连接器兼容的PostgreSQL版本的信息,请参见开云体育官方注册网址Debezium发布概述

当连接器第一次连接到PostgreSQL服务器或集群时,它会获取所有模式的一致快照。快照完成后,连接器将持续捕获行级更改,这些更改插入、更新和删除数据库内容,并提交给PostgreSQL数据库。开云体育电动老虎机连接器生成数据更改事件记录,并将其传输到Kafka主题。对于每个表,默认行为是连接器将所有生成的事件流到该表的单独Kafka主题中。应用程序和服务使用来自该主题的数据更改事件记录。

概述

PostgreSQL的逻辑解码特性在9.4版中引入。它是一种机制,允许提取提交到事务日志中的更改,并在控件的帮助下以用户友好的方式处理这些更改输出插件.输出插件使客户端能够使用这些更改。

PostgreSQL连接器包含两个主要部分,它们一起工作来读取和处理数据库更改:开云体育电动老虎机

  • 逻辑解码输出插件。您可能需要安装您选择使用的输出插件。在运行PostgreSQL服务器之前,必须配置一个使用所选输出插件的复制插槽。该插件可以是以下插件之一:

    • decoderbufs它基于Protobuf,由Debezium社区维护。开云体育官方注册网址

    • pgoutput是PostgreSQL 10+中标准的逻辑解码输出插件。它由PostgreSQL社区维护,并由PostgreSQL自己用于逻辑复制.该插件始终存在,因此不需要安装其他库。Debe开云体育官方注册网址zium连接器将原始复制事件流直接解释为更改事件。

  • 读取所选逻辑解码输出插件产生的更改的Java代码(实际的Kafka Connect连接器)。它使用PostgreSQL的流复制协议,通过PostgreSQLJDBC驱动程序

连接器将生成更改事件为每个被捕获的行级插入、更新和删除操作,并在单独的Kafka主题中为每个表发送更改事件记录。客户端应用程序读取与感兴趣的数据库表对应的Kafka主题,并可以对从这些主题接收到的每一个行级事件做出反应。开云体育电动老虎机

PostgreSQL通常会在一段时间后清除write-ahead log (WAL)段。这意味着连接器不具有对数据库所做的所有更改的完整历史。开云体育电动老虎机因此,当PostgreSQL连接器第一次连接到一个特定的PostgreSQL数据库时,它首先执行一个开云体育电动老虎机一致的快照每个数据库模式的。开云体育电动老虎机在连接器完成快照之后,它继续从创建快照的确切位置开始流化更改。通过这种方式,连接器以所有数据的一致视图开始,并且不会忽略在拍摄快照时所做的任何更改。

这个连接器容错能力强。当连接器读取更改并产生事件时,它会记录每个事件的WAL位置。如果连接器由于任何原因(包括通信故障、网络问题或崩溃)而停止,重新启动时连接器将继续读取它上次停止的WAL。这包括快照。如果连接器在快照期间停止,则连接器在重新启动时开始一个新的快照。

该连接器依赖并反映了PostgreSQL逻辑解码特性,该特性有以下限制:

  • 逻辑解码不支持DDL更改。这意味着连接器无法向使用者报告DDL更改事件。

  • 逻辑译码复制槽位仅在上支持主要的服务器。当存在一个PostgreSQL服务器集群时,连接器只能在活动服务器上运行主要的服务器。它不能继续运行温暖的备用副本。如果主要的服务器故障或降级,连接器停止。后主要的服务器已经恢复,您可以重新启动连接器。如果已升级到其他PostgreSQL服务器主要的,重新启动连接器前,请先调整连接器配置。

出现问题时的行为描述出现问题时连接器的操作。

开云体育官方注册网址Debezium目前只支持UTF-8字符编码的数据库。开云体育电动老虎机使用单字节字符编码,不可能正确处理包含扩展ASCII码字符的字符串。

连接器如何工作

为了优化配置和运行Debezium PostgreSQL连接器,了开云体育官方注册网址解连接器如何执行快照、流更改事件、确定Kafka主题名称以及使用元数据是很有帮助的。

安全

要使用Debezium开云体育官方注册网址连接器从PostgreSQL数据库传输更改,连接器必须使用数据库中的特定权限进行操作。开云体育电动老虎机尽管授予必要特权的一种方法是为用户提供超级用户特权,这样做可能会暴露你的PostgreSQL数据未经授权的访问。与其向Debezium用户授予过多的特权,不如创建一个专用的Debezium复制用户,并向其授予特开云体育官方注册网址定的特权。

有关为Debezium PostgreSQL用户配置特权的更多信息,请参见开云体育官方注册网址设置权限.有关PostgreSQL逻辑复制安全的更多信息,请参见PostgreSQL的文档

快照

大多数PostgreSQL服务器被配置为不保留WAL段中数据库的完整历史。开云体育电动老虎机这意味着PostgreSQL连接器将无法通过只读取WAL来查看数据库的整个历史。开云体育电动老虎机因此,连接器第一次启动时执行初始化一致的快照数据库的。开云体育电动老虎机执行快照的默认行为包括以下步骤。属性可以更改此行为snapshot.mode连接器配置属性到一个值以外最初的

  1. 属性启动事务可序列化,只读,可延迟隔离级别,以确保此事务中的后续读取针对数据的单个一致版本。任何数据的变化,由于后续插入更新,删除其他客户端的操作对该事务不可见。

  2. 读取服务器事务日志中的当前位置。

  3. 扫描数据库表和模式开云体育电动老虎机,生成一个事件,并将该事件写入相应的特定于表的Kafka主题。

  4. 提交事务。

  5. 在连接器偏移中记录快照的成功完成。

如果连接器失败,重新平衡,或在步骤1开始后但在步骤5完成之前停止,则重新启动连接器后开始一个新的快照。在连接器完成它的初始快照之后,PostgreSQL连接器继续从它在步骤2中读取的位置进行流处理。这确保连接器不会错过任何更新。如果连接器由于任何原因再次停止,在重新启动时,连接器将继续从之前停止的地方传输更改。

表1。的选项snapshot.mode连接器配置属性
选项 描述

总是

连接器在启动时总是执行快照。快照完成后,连接器继续按照上述顺序中的步骤3对更改进行流式处理。这种模式在以下情况下很有用:

  • 据了解,一些WAL段已被删除,不再可用。

  • 集群发生故障后,升级了一个新的主服务器。的总是快照模式确保连接器不会错过在提升新主服务器之后、在新主服务器上重新启动连接器之前所做的任何更改。

从来没有

连接器从不执行快照。以这种方式配置连接器时,其启动时的行为如下所示。如果Kafka偏移主题中有先前存储的LSN,连接器将从该位置继续流化更改。如果没有存储LSN,连接器将从在服务器上创建PostgreSQL逻辑复制插槽的时间点开始流化更改。的从来没有只有当您知道所有感兴趣的数据仍然反映在WAL中时,快照模式才有用。

initial_only

连接器执行数据库快照,并在流任何更改事件记录之前停开云体育电动老虎机止。如果连接器已经启动,但在停止之前没有完成快照,则连接器将重新启动快照进程,并在快照完成时停止。

出口

弃用,所有模式都是无锁的。

自定义

自定义快照模式允许您注入自己的实现io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter接口。设置snapshot.custom.class在你的Kafka Connect集群的类路径中添加configuration属性,如果使用EmbeddedEngine.详情请参见自定义快照SPI

Ad hoc快照

默认情况下,连接器仅在第一次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据都只通过流处理传入。

但是,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供一种重新捕获表数据的机制,Debezium包含了一个执行临时快照的选项。开云体育官方注册网址数据库中的以下更改可能会导致执行临时快照:开云体育电动老虎机

  • 连接器配置被修改以捕获一组不同的表。

  • Kafka主题被删除,必须重新构建。

  • 数据损坏是由于配置错误或其他问题造成的。

您可以对先前捕获快照的表重新运行快照,方法是启动所谓的特别的快照.临时快照需要使用信号表.通过向Debezium信令表发送信号请求,可以启动一个临时快照。开云体育官方注册网址

当您初始化现有表的临时快照时,连接器将内容追加到已存在的表主题。如果先前存在的主题被删除,Debezium可以自动创建一个主题开云体育官方注册网址自动创建主题启用。

临时快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,也可以仅捕获数据库中表的一个子集。开云体育电动老虎机此外,快照可以捕获数据库中表内容的一个子集。开云体育电动老虎机

指定要捕获的表execute-snapshot发送到信令表的消息。属性的类型execute-snapshot信号增量,并提供要包含在快照中的表的名称,如下表所示:

表2。ad hoc的例子execute-snapshot信号记录
默认的 价值

类型

增量

指定要运行的快照类型。
可选设置类型。目前,您只能请求增量快照。

数据收集

N/A

包含与要快照的表的完全限定名称匹配的正则表达式的数组。
名称的格式与signal.data.collection配置选项。

附加条件

N/A

一个可选字符串,它根据表的列指定一个条件,以捕获表内容的一个子集。

触发临时快照

属性添加项来初始化临时快照execute-snapshot发送到信令表的信号类型。连接器处理消息后,开始快照操作。快照进程读取第一个和最后一个主键值,并使用这些值作为每个表的起点和终点。根据表中条目的数量和配置的块大小,Debezium将表划分为块,然后依次对每个块进行快照,每次快照一个。开云体育官方注册网址

目前,execute-snapshot动作类型触发器增量快照只有。有关更多信息,请参见增量快照

增量快照

为了提供管理快照的灵活性,Debezium包含了一个补充的快照机制,称为开云体育官方注册网址增量快照.增量快照依赖于Debezium机制开云体育官方注册网址发送信号到Debezium连接器开云体育官方注册网址.增量快照基于DDD-3设计文档。

在增量快照中,Debezium不像在初始快照中那样一次性捕获数据库的全部状态,而是在一系列可配置的块中分阶段捕获每个表。开云体育官方注册网址开云体育电动老虎机可以指定希望快照捕获的表和每个块的大小.块大小决定了快照在数据库上的每次获取操作期间收集的行数。开云体育电动老虎机增量快照的默认chunk大小为1kb。

随着增量快照的进行,Debezium使用水印来跟踪进度,维护它捕获的每个表开云体育官方注册网址行的记录。与标准的初始快照过程相比,这种分阶段捕获数据的方法具有以下优点:

  • 您可以在流数据捕获的同时并行运行增量快照,而不是将流数据延迟到快照完成。在整个快照过程中,连接器继续从更改日志中捕获接近实时的事件,并且两个操作都不会阻塞另一个操作。

  • 当增量快照进程中断时,可以恢复增量快照,不会丢失数据。进程恢复后,快照将从它停止的位置开始,而不是从开始重新捕获表。

  • 您可以在任何时候按需运行增量快照,并根据需要重复该过程以适应数据库更新。开云体育电动老虎机例如,在修改连接器配置以向其添加表之后,可以重新运行快照table.include.list财产。

增量快照流程

运行增量快照时,Debezium根据主键对每个表进行排序,然后根据主键将表分开云体育官方注册网址成块配置的块大小.它逐个块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个事件。该事件表示块快照开始时行的值。

随着快照的进行,其他进程可能会继续访问数据库,可能会修改表记录。开云体育电动老虎机为了反映这些变化,插入更新,或删除操作像往常一样提交到事务日志中。类似地,正在进行的Debezium流处理继续检开云体育官方注册网址测这些更改事件,并向Kafka发出相应的更改事件记录。

Debe开云体育官方注册网址zium如何解决具有相同主键的记录之间的冲突

在某些情况下,更新删除流流程发出的事件按顺序接收。也就是说,流处理过程可能在快照捕获包含表行的块之前发出修改表行的事件事件。当快照最终发出相应的事件时,其值已被取代。为了确保按正确的逻辑顺序处理不按顺序到达的增量快照事件,Debezium采用了一种缓冲方案来解决冲突。开云体育官方注册网址只有在快照事件和流事件之间的冲突被解决之后,Debezium才会向Kafka发送事件记录。开云体育官方注册网址

快照窗口

协助解决晚到车辆之间的碰撞事件和流事件修改同一表行,Debezium采用了所谓的开云体育官方注册网址快照窗口.快照窗口定义了增量快照为指定表块捕获数据的时间间隔。在块的快照窗口打开之前,Debezium遵循其通常的行为,从事务日志中直接向下游的目标Kaf开云体育官方注册网址ka主题发送事件。但是从特定块的快照打开的那一刻起,直到它关闭,Debezium执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。开云体育官方注册网址

对于每个数据收集,Debezium会发出两种类型的事件,并将开云体育官方注册网址它们的记录存储在一个目的地Kafka主题中。它直接从表捕获的快照记录发出为操作。同时,随着用户继续更新数据收集中的记录,事务日志被更新以反映每次提交,Debezium就会发出开云体育官方注册网址更新删除每个更改的操作。

当快照窗口打开时,Debezium开始处理快照块,它将快照记录传递到开云体育官方注册网址内存缓冲区。的主键在快照窗口期间缓冲区中的事件与传入流事件的主键进行比较。如果没有匹配到,流事件记录将直接发送到Kafka。如果D开云体育官方注册网址ebezium检测到匹配,它将丢弃缓冲事件,并将流记录写入目标主题,因为流事件在逻辑上取代静态快照事件。块的快照窗口关闭后,缓冲区仅包含不存在相关事务日志事件的事件。开云体育官方注册网址Debezium释放这些剩余的事件到表的Kafka主题。

连接器对每个快照块重复该过程。

在运行增开云体育官方注册网址量快照时,PostgreSQL的Debezium连接器不支持模式更改。如果执行架构更改之前增量快照开始但是发送信号然后直通配置选项开云体育电动老虎机database.autosave设置为保守的正确地处理模式更改。

触发增量快照

目前,创建增量快照的唯一方式是发送自组织快照信号到源数据库上的信令表。开云体育电动老虎机将一个信号作为SQL提交给信令表插入查询。

在Debez开云体育官方注册网址ium检测到信号表中的变化后,它读取信号,并运行所请求的快照操作。

提交的查询指定要包含在快照中的表,还可以指定快照操作的类型。目前,快照操作的唯一有效选项是默认值,增量

若要指定要包含在快照中的表,请提供数据收集列出表的数组或用于匹配表的正则表达式数组,例如
{“数据收集”:["。MyFirstTable”、“公众。MySecondTable ")}

数据收集增量快照信号阵列没有默认值。如果数据收集数组为空时,Debezium检测开云体育官方注册网址到不需要任何操作,因此不执行快照。

如果要包含在快照中的表的名称包含点()中的数据库、模式或表的名称,以将表添加到开云体育电动老虎机数据收集数组时,必须用双引号转义名称的每个部分。

类中存在的表公共Schema,它有名字我的。表格,请使用以下格式:“公共”。“My.Table”

先决条件
过程
  1. 发送一个SQL查询,将临时增量快照请求添加到信令表:

    插入< signalTable >(id, type, data)' < id > '' < snapshotType >”, '{"data-collections": ["<表>”、“<表>“,”类型”:“< snapshotType >”、“附加条件”:“<附加条件>“}”);

    例如,

    INSERT INTO myschema.开云体育官方注册网址debezium_signal (id, type, data)(1)值(“ad-hoc-1”,(2)“execute-snapshot”,(3)”{代码基于schema1中“数据收集”:["。表1”、“schema2.table2”),(4)“类型”:“增量”},(5)“附加条件”:“颜色=蓝色“}”);(6)

    的值id类型,数据命令中的参数对应信令表字段

    样例中参数说明如下:

    表3。向信令表发送增量快照信号的SQL命令字段说明
    价值 描述

    1

    myschema.开云体育官方注册网址debezium_signal

    指定源数据库上信令表的全限定名。开云体育电动老虎机

    2

    ad-hoc-1

    id参数指定分配为id信号请求的标识符。
    使用此字符串标识信令表中条目的日志消息。开云体育官方注册网址Debezium不使用这个字符串。相反,在快照期间,Debezium生成自己的快照开云体育官方注册网址id字符串作为水印信号。

    3.

    execute-snapshot

    指定类型参数指定信号要触发的操作。

    4

    数据收集

    的必需组件数据信号的字段,该字段指定表名或正则表达式的数组,以匹配要包含在快照中的表名。
    对象中指定连接器的信令表的名称时使用的格式相同,该数组列出了根据表的全限定名匹配表的正则表达式signal.data.collection配置属性。

    5

    增量

    一个可选的类型组成部分数据信号的字段,指定要运行的快照操作的类型。
    目前,唯一有效的选项是默认值,增量
    如果不指定值,连接器将运行增量快照。

    6

    附加条件

    一个可选字符串,它根据表的列指定一个条件,以捕获表内容的子集。有关的更多信息附加条件参数,看到的临时增量快照附加条件

的临时增量快照附加条件

如果希望快照仅包含表中内容的子集,则可以通过添加附加条件参数表示快照信号。

典型快照的SQL查询形式如下:

Select * from<表>....

通过添加附加条件参数,则追加一个在哪里输入SQL查询的条件,示例如下:

Select * from<表>在哪里<附加条件>....

下面的示例显示了一个SQL查询,该查询向信令表发送一个附加条件的临时增量快照请求:

插入< signalTable >(id, type, data)' < id > '' < snapshotType >”, '{"data-collections": ["<表>”、“<表>“,”类型”:“< snapshotType >”、“附加条件”:“<附加条件>“}”);

例如,假设你有一个产品表,包含以下列:

  • id(主键)

  • 颜色

  • 数量

的增量快照产品表中仅包含其中的数据项颜色=蓝色,可以使用下面的SQL语句触发快照:

INSERT INTO myschema.开云体育官方注册网址debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1. data ", ' "data-collections": ["schema1. data ")产品”),“类型”:“增量”、“附加条件”:“颜色=蓝色"});

附加条件参数还允许您传递基于多个列的条件。例如,使用产品表中,您可以提交一个查询,该查询将触发一个增量快照,该快照仅包含那些项目的数据颜色=蓝色而且量> 10

INSERT INTO myschema.开云体育官方注册网址debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1. data ", ' "data-collections": ["schema1. data ")product "],"type":"incremental", "additional-condition":"color=blue AND quantity>10"}');

下面的示例显示了连接器捕获的增量快照事件的JSON。

[示例]增量快照事件消息
{“前”:零,“后”:{“pk”:“1”,“价值”:“新数据”},“源”:{…“快照”:“增量”(1)},“人事处”:“r”,(2)"ts_ms":"1620393591654", "transaction":null}
字段名 描述

1

快照

指定要运行的快照操作的类型。
目前,唯一有效的选项是默认值,增量
指定一个类型您提交给信令表的SQL查询中的值是可选的。
如果不指定值,连接器将运行增量快照。

2

人事处

事件类型。
快照事件的值为r,表示操作。

停止增量快照

您还可以通过向源数据库上的表发送信号来停止增量快照。开云体育电动老虎机通过发送SQL向表提交停止快照信号插入查询在Debez开云体育官方注册网址ium检测到信号表中的变化后,它将读取信号,如果增量快照操作正在进行,则停止增量快照操作。

提交的查询指定的快照操作增量,以及要删除的当前运行快照的表(可选)。

先决条件
过程
  1. 向信令表发送一个SQL查询来停止临时增量快照:

    插入< signalTable >(id, type, data)' < id > ', 'stop-snapshot', '{"data-collections": ["<表>”、“<表>”,“类型”:“增量”}’);

    例如,

    INSERT INTO myschema.开云体育官方注册网址debezium_signal (id, type, data)(1)值(“ad-hoc-1”,(2)“stop-snapshot”,(3)”{代码基于schema1中“数据收集”:["。表1”、“schema2.table2”),(4)“类型”:“增量”}’);(5)

    的值id类型,数据signal命令中的参数对应信令表字段

    样例中参数说明如下:

    表4。向信令表发送停止增量快照信号的SQL命令字段说明
    价值 描述

    1

    myschema.开云体育官方注册网址debezium_signal

    指定源数据库上信令表的全限定名。开云体育电动老虎机

    2

    ad-hoc-1

    id参数指定分配为id信号请求的标识符。
    使用此字符串标识信令表中条目的日志消息。开云体育官方注册网址Debezium不使用这个字符串。

    3.

    stop-snapshot

    指定类型参数指定信号要触发的操作。

    4

    数据收集

    的可选组件数据信号的字段,该字段指定表名或正则表达式数组,以匹配要从快照中删除的表名。
    对象中指定连接器的信令表的名称时使用的格式相同,该数组列出了根据表的全限定名匹配表的正则表达式signal.data.collection配置属性。如果这个分量数据字段,则信号停止正在进行的整个增量快照。

    5

    增量

    的必需组件数据信号的字段,该字段指定要停止的快照操作的类型。
    目前,唯一有效的选项是增量
    如果没有指定类型值时,信号停止增量快照失败。

自定义快照SPI

类的实现用于更高级的用途io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter接口。该接口允许控制连接器如何执行快照的大部分方面。这包括是否获取快照、打开快照事务的选项以及是否获取锁。

下面是该接口的完整API。所有内置快照模式都实现此接口。

/** *此接口用于确定关于快照过程的细节:**即:* -是否应该发生快照* -是否应该发生流* -应该使用哪些查询来快照**虽然Debezium提供了许多默认的快照模式,*实现者可以提供该接口的自定义实现,其中*可以提供更高级的功能,例如部分快照。开云体育官方注册网址* *实现必须为{@link #shouldSnapshot()}或{@link #shouldStream()}中的任何一个返回true,或者两者都返回true。*/ @孵化公共接口快照{void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState SlotState);/** * @如果快照需要快照则返回true */ boolean shouldSnapshot();/** * @返回true如果快照在拍摄快照后应该流*/ boolean shouldStream();/** ** @return如果流应从快照*事务开始恢复,则为true;如果连接器恢复并进行快照,则为false, *流应从之前停止的地方恢复。*/默认布尔值shouldStreamEventsStartingFromSnapshot(){返回true;} / * * *生成一个有效的postgres指定表的查询字符串,或空{@link可选}*跳过这个表快照(但这表仍将流从)* * @param tableId表生成一个查询* @param snapshotSelectColumns列中使用快照选择基于列*包括/排除过滤器* @return有效的查询字符串,或没有跳过这个表快照* /可选<字符串> buildSnapshotQuery (tableId tableId,列表<字符串> snapshotSelectColumns);** @param newSlotInfo如果为快照创建了一个新的slow,它包含来自* the create_replication_slot命令的信息*/ default string snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo){//我们正在使用与pg_backup相同的隔离级别Return " set transaction isolation level SERIALIZABLE, READ ONLY, deferable;"; } /** * Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter * implementation. */ default Optional snapshotTableLockingStatement(Duration lockTimeout, Set tableIds) { String lineSeparator = System.lineSeparator(); StringBuilder statements = new StringBuilder(); statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); // we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot // this does not prevent writes to the table, but prevents changes to the table's schema.... // DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted tableIds.forEach(tableId -> statements.append("LOCK TABLE ") .append(tableId.toDoubleQuotedString()) .append(" IN ACCESS SHARE MODE;") .append(lineSeparator)); return Optional.of(statements.toString()); } /** * Lifecycle hook called once the snapshot phase is finished. */ default void snapshotCompleted() { // no operation } }

流的变化

PostgreSQL连接器通常将绝大多数时间用于从所连接的PostgreSQL服务器传输更改。这个机制依赖于PostgreSQL的复制协议.该协议使客户端能够在服务器事务日志的特定位置(称为日志序列号(log Sequence Numbers, lns))提交更改时接收来自服务器的更改。

每当服务器提交事务时,一个单独的服务器进程都会从逻辑解码插件.该函数处理来自事务的更改,将其转换为特定的格式(在Debezium插件的情况下是Protobuf或JSON),并将其写入输出流,然后供客户端使用。开云体育官方注册网址

Debe开云体育官方注册网址zium PostgreSQL连接器充当PostgreSQL客户端。当连接器接收到更改时,它将事件转换为Debezium开云体育官方注册网址创建更新,或删除事件,包括该事件的LSN。PostgreSQL连接器将这些记录中的更改事件转发给运行在同一进程中的Kafka Connect框架。Kafka Connect进程异步地将更改事件记录按照它们生成的相同顺序写入相应的Kafka主题。

Kafka Connect会定期记录最新的数据抵消在另一个卡夫卡的话题。偏移量表示Debezium包含在每个事件中的特定于源的位置信息。开云体育官方注册网址对于PostgreSQL连接器,记录在每个更改事件中的LSN就是偏移量。

当Kafka Connect优雅地关闭时,它会停止连接器,将所有事件记录刷新到Kafka,并记录从每个连接器接收到的最后偏移量。当Kafka Connect重新启动时,它会读取每个连接器的最后记录偏移量,并在其最后记录偏移量处启动每个连接器。当连接器重新启动时,它向PostgreSQL服务器发送一个请求,以发送刚好在该位置之后开始的事件。

PostgreSQL连接器作为逻辑解码插件发送的事件的一部分检索模式信息。但是,连接器不检索关于哪些列组成主键的信息。连接器从JDBC元数据(侧通道)获得此信息。如果表的主键定义发生了变化(通过添加、删除或重命名主键列),来自JDBC的主键信息与逻辑解码插件生成的更改事件会有一小段时间不同步。在这段很短的时间内,可以使用不一致的密钥结构创建消息。为了防止这种不一致,更新主键结构如下:

  1. 将数据库或应用程开云体育电动老虎机序设置为只读模式。

  2. 让Deb开云体育官方注册网址ezium处理所有剩余事件。

  3. 停止Deb开云体育官方注册网址ezium。

  4. 更新相关表中的主键定义。

  5. 将数据库或应用程开云体育电动老虎机序设置为读/写模式。

  6. 重启Debezi开云体育官方注册网址um。

PostgreSQL 10+逻辑解码支持(pgoutput

从PostgreSQL 10+开始,有一种逻辑复制流模式,称为pgoutputPostgreSQL原生支持的。这意味着Debezium Postg开云体育官方注册网址reSQL连接器可以使用复制流,而不需要额外的插件。这对于不支持或不允许安装插件的环境尤其有价值。

看到设置PostgreSQL欲知详情。

主题名称

默认情况下,PostgreSQL连接器为所有人写入更改事件插入更新,删除对特定于该表的单个Apache Kafka主题的表中发生的操作。连接器使用以下约定来命名更改事件主题:

topicPrefix.schemaName.tableName

下面的列表提供了默认名称组件的定义:

topicPrefix

属性指定的主题前缀topic.prefix配置属性。

schemaName

发生更改事件的数据库模式的名称。开云体育电动老虎机

的表

发生更改事件的数据库表的名称。开云体育电动老虎机

例如,假设实现在PostgreSQL安装中捕获更改的连接器的配置中的逻辑服务器名是postgres开云体育电动老虎机数据库和库存模式,包含四个表:产品products_on_hand客户,订单.连接器将流记录到以下四个Kafka主题:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

现在假设这些表不是特定模式的一部分,而是在默认模式中创建的公共PostgreSQL模式。卡夫卡主题的名称是:

  • fulfillment.public.products

  • fulfillment.public.products_on_hand

  • fulfillment.public.customers

  • fulfillment.public.orders

连接器应用类似的命名约定来标记其事务元数据主题

如果默认的主题名称不满足您的需求,您可以配置自定义的主题名称。要配置自定义主题名称,需要在逻辑主题路由SMT中指定正则表达式。有关使用逻辑主题路由SMT自定义主题命名的详细信息,请参见主题的路由

事务的元数据

开云体育官方注册网址Debezium可以生成表示事务边界的事件,并丰富数据更改事件消息。

Debezium何时接收事务元开云体育官方注册网址数据的限制

开云体育官方注册网址Debezium仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

每笔交易开始而且结束, 开云体育官方注册网址Debezium生成一个包含以下字段的事件:

状态

开始结束

id

由Postgres事务ID本身和给定操作的LSN(以冒号分隔)组成的唯一事务标识符的字符串表示,即格式为txID: LSN

ts_ms

事务边界事件的时间(开始结束事件)。如果数据源没有向Debezium提供事件时间,则该字段表示Debeziu开云体育官方注册网址m处理事件的时间。

event_count(结束事件)

事务发出的事件总数。

data_collections(结束事件)

对的数组data_collection而且event_count元素,该元素指示连接器为源自数据集合的更改发出的事件数。

例子
{"status": "BEGIN", "id": "571:53195829", "ts_ms": " 1486500577125 ", "event_count": null, "data_collections": null} {"status": "END", "id": "571:53195832", "ts_ms": " 1486500577691 ", "event_count": 2, "data_collections": [{"data_collection": "s1. 1. "A ", "event_count": 1}, {"data_collection": "s2. "A ", "event_count": 1}]}

方法覆盖topic.transaction选项时,事务事件被写入指定的主题< topic.prefix >.transaction

更改数据事件丰富

启用事务元数据时,数据消息信封是充实了新的事务字段。这个字段以复合字段的形式提供关于每个事件的信息:

id

唯一事务标识符的字符串表示形式。

total_order

事件在事务生成的所有事件中的绝对位置。

data_collection_order

事件在事务发出的所有事件中的每数据收集位置。

下面是一个消息的例子:

{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…},“人事处”:“c”、“ts_ms”:“1580390884335”,“交易”:{" id ":“571:53195832”、“total_order”:“1”,“data_collection_order”:" 1 "}}

数据变更事件

Debe开云体育官方注册网址zium PostgreSQL连接器为每个行级生成一个数据更改事件插入更新,删除操作。每个事件包含一个键和一个值。键和值的结构取决于所更改的表。

开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。

下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器并配置它来生成所有四个基本的更改事件部分,则更改事件的结构如下:

{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
表5所示。变更事件基本内容概述
字段名 描述

1

模式

第一个模式字段是事件键的一部分。它指定了一个Kafka Connect模式,用来描述事件键的内容有效载荷部分。换句话说,是第一个模式字段描述已更改表的主键的结构,如果表没有主键,则描述唯一键。

属性可以覆盖表的主键message.key.columns连接器配置属性.在本例中,第一个模式字段描述由该属性标识的键的结构。

2

有效载荷

第一个有效载荷字段是事件键的一部分。它具有前面所描述的结构模式字段,它包含已更改行的键。

3.

模式

第二个模式字段是事件值的一部分。它指定Kafka Connect模式,描述事件值中的内容有效载荷部分。换句话说,是第二种模式描述已更改行的结构。通常,这个模式包含嵌套的模式。

4

有效载荷

第二个有效载荷字段是事件值的一部分。它具有前面所描述的结构模式字段,它包含已更改行的实际数据。

默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始表相同

从Kafka 0.10开始,Kafka可以选择记录事件键和值时间戳消息被创建(由生产者记录)或被Kafka写入日志的时间。

PostgreSQL连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名中的每个剩余字符、模式名和表名中的每个字符必须是拉丁字母、数字或下划线,即a-z、a-z、0-9或\_。如果存在无效字符,则将其替换为下划线字符。

如果逻辑服务器名、模式名或表名包含无效字符,并且唯一区分名称的字符无效,因此用下划线替换,则可能导致意外的冲突。

更改事件键

对于给定的表,更改事件的键具有一个结构,该结构在创建事件时包含表主键中每一列的字段。或者,如果表有副本的身份设置为完整的使用索引每个唯一键约束都有一个字段。

考虑一个客户表中定义的公共开云体育电动老虎机数据库模式和该表的更改事件键的示例。

例表
CREATE TABLE customers (id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id));
更改事件键的示例

如果topic.prefix属性的值PostgreSQL_server,每变事件为客户表,虽然它有这个定义有相同的键结构,在JSON中看起来像这样:

{"schema": {(1)"type": "struct", "name": "PostgreSQL_server.public.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){" name ": " id ", "指数":" 0 ","模式":{“类型”:“INT32”、“可选”:“假的 " } } ] }, " 有效载荷":{(5)id: "1"},}
表6所示。更改事件键的描述
字段名 描述

1

模式

密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容有效载荷部分。

2

PostgreSQL_server.inventory.customers.Key

定义键的有效负载结构的模式的名称。此模式描述已更改表的主键的结构。关键模式名具有该格式connector-name开云体育电动老虎机数据库名称表名关键.在这个例子中:

  • PostgreSQL_server生成此事件的连接器的名称。

  • 库存包含已更改的表开云体育电动老虎机的数据库。

  • 客户已更新的表。

3.

可选

指示事件键是否必须在其中包含值有效载荷字段。在本例中,需要键的有效负载中的值。当表没有主键时,键的有效载荷字段中的值是可选的。

4

字段

属性中期望的每个字段有效载荷,包括每个字段的名称、索引和模式。

5

有效载荷

包含为其生成此更改事件的行的键。在本例中,键包含一个单键id字段,其值为1

虽然column.exclude.list而且column.include.list连接器配置属性允许您仅捕获表列的一个子集,主键或唯一键中的所有列始终包含在事件的键中。

如果表没有主键或唯一键,则更改事件的键为空。表中没有主键或唯一键约束的行不能被唯一标识。

更改事件值

change事件中的值比键稍微复杂一些。和键一样,值也有模式Section和a有效载荷部分。的模式类的模式信封的结构有效载荷节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。

考虑用于显示更改事件键示例的同一个示例表:

CREATE TABLE customers (id SERIAL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id));

表的更改的更改事件的值部分根据副本的身份设置和事件要用于的操作。

副本的身份

副本的身份是一个特定于postgresql的表级设置,它决定了逻辑解码插件可用的信息量更新而且删除事件。更具体地说,是副本的身份控件可以为所涉及的表列的先前值提供哪些(如果有)信息更新删除事件发生时。

有4个可能的值副本的身份

  • 默认的—默认为此更新而且删除如果表有主键,则事件包含该表的主键列的先前值。对于一个更新事件时,只显示值已更改的主键列。

    如果表没有主键,则连接器不会发出更新删除该表的事件。对于没有主键的表,连接器只发出创建事件。通常,没有主键的表用于将消息追加到表的末尾,这意味着更新而且删除事件没有用处。

  • 没有什么-为更新而且删除操作不包含关于任何表列上一个值的任何信息。

  • 完整的-为更新而且删除操作包含表中所有列的先前值。

  • 指数索引名-为更新而且删除操作包含指定索引中包含的列的先前值。更新事件还包含已更新值的索引列。

创建事件

对象中创建数据的操作所生成的更改事件的值部分,示例如下客户表:

{"schema": {(1)"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "first_name"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "PostgreSQL_server.inventory.customers.Value",(2)"field": "before"}, {"type": "struct", "fields": [{"type": "int32", "optional": false, "field": "id"}, {"type": "string", "optional": false, "field": "last_name"}, {"type": "string", "optional": false, "field": "email"}], "optional": true, "name": "PostgreSQL_server.inventory.customers. "价值", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "xmin" } ], "optional": false, "name": "io.debezium.connector.postgresql.Source",(3)"field": "source"}, {"type": "string", "optional": false, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "PostgreSQL_server.inventory.customers.Envelope"(4)}, "有效载荷":{(5)“之前”:空,(6)"后":{(7)“id”:1、“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{(8)“版本”:“2.1.2。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": true, "db": "postgres", "sequence": "[\"24023119\",\"24023128\"]" schema": "public", "table": "customers", "txId": 555, "lsn": 24023128, "xmin": null}, "op": "c",(9)“ts_ms”:1559033904863(10)}}
表7所示。的描述创建事件值字段
字段名 描述

1

模式

值的模式,它描述值的有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值模式都是相同的。

2

的名字

模式节中,每的名字Field指定值的有效负载中字段的模式。

PostgreSQL_server.inventory.customers.Value有效负载的模式是之前而且字段。此模式特定于客户表格

的模式名称之前而且字段的格式为logicalName的表value,确保模式名在数据库中是唯一的。开云体育电动老虎机这意味着当使用Avro转换器,每个逻辑源中每个表的结果Avro模式都有自己的演变和历史。

3.

的名字

io.开云体育官方注册网址debezium.connector.postgresql.Source有效负载的模式是字段。这个模式是特定于PostgreSQL连接器的。连接器将其用于生成的所有事件。

4

的名字

PostgreSQL_server.inventory.customers.Envelope有效负载的总体结构的模式在哪里PostgreSQL_server是连接器名称,库存是数据库,和开云体育电动老虎机客户是桌子。

5

有效载荷

该值为实际数据。这是变更事件提供的信息。

事件的JSON表示形式似乎比它们描述的行要大得多。这是因为JSON表示必须包括消息的模式和有效负载部分。然而,通过使用Avro转换器,你可以显著减少连接器流到Kafka主题的消息的大小。

6

之前

可选字段,指定事件发生之前行的状态。当人事处字段是c对于create,就像在本例中一样之前字段是因为这个更改事件是针对新内容的。

该字段是否可用取决于副本的身份为每一张桌子设置。

7

可选字段,指定事件发生后行的状态。在本例中,字段包含新行的值idfirst_namelast_name,电子邮件列。

8

描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:

  • 开云体育官方注册网址Debezium版本

  • 连接器类型和名称

  • 开云体育电动老虎机包含新行的数据库和表

  • 附加偏移量信息的stringized JSON数组。第一个值总是最近提交的LSN,第二个值总是当前提交的LSN。任何一个值都可以是

  • 模式名

  • 如果事件是快照的一部分

  • 执行操作的事务ID

  • 数据库日志中操作的偏移量开云体育电动老虎机

  • 在数据库中进行更改的时间戳开云体育电动老虎机

9

人事处

返回string,描述导致连接器产生事件的操作类型。在这个例子中,c表示该操作创建了一行。有效值为:

  • c=创建

  • u=更新

  • d=删除

  • r= read(仅适用于快照)

  • t=截断

  • =消息

10

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

更新事件

样例中更新的更改事件的值客户表的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。中的更新中连接器生成的事件中的更改事件值的示例客户表:

{"schema":{…}, "payload": {"before": {(1)"id": 1}, "after": {(2)"id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "source": {(3)“版本”:“2.1.2。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 24023128, "xmin": null}, "op": "u",(4)“ts_ms”:1465584025523(5)}}
表8所示。的描述更新事件值字段
字段名 描述

1

之前

可选字段,其中包含数据库提交前行的值。开云体育电动老虎机在本例中,只有主键列,id因为桌子的原因副本的身份默认情况下,默认的.+ For a更新事件来包含该行中所有列的先前值,则必须更改客户表通过运行更改表客户的副本标识为满

2

可选字段,指定事件发生后行的状态。在本例中,first_name价值就是现在安妮玛丽

3.

描述事件的源元数据的必填字段。的字段结构中的字段与创建事件,但有些值不同。源元数据包括:

  • 开云体育官方注册网址Debezium版本

  • 连接器类型和名称

  • 开云体育电动老虎机包含新行的数据库和表

  • 模式名

  • 如果事件是快照的一部分(总是更新事件)

  • 执行操作的事务ID

  • 数据库日志中操作的偏移量开云体育电动老虎机

  • 在数据库中进行更改的时间戳开云体育电动老虎机

4

人事处

返回string,描述操作类型。在一个更新事件值,则人事处字段值为u,表示该行因更新而更改。

5

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

更新一行的主键/唯一键的列将更改该行键的值。当一个键改变时,Debezium输出开云体育官方注册网址三个事件:删除事件和墓碑上的事件使用该行的旧键,然后使用该行的新键执行一个事件。详情见下一节。

主键更新

一个更新更改行的主键字段的操作称为主键更改。对于主键更改,代替发送更新事件记录时,连接器发送一个删除事件记录的旧键和创建新的(更新的)键的事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:

  • 删除事件记录有__开云体育官方注册网址debezium.newkey作为消息头。此标头的值是更新行的新主键。

  • 创建事件记录有__开云体育官方注册网址debezium.oldkey作为消息头。此标头的值是已更新行的前一个(旧的)主键。

删除事件

的值删除改变事件有相同之处模式部分为创建而且更新同一表的事件。的有效载荷的一部分删除事件。客户表是这样的:

{"schema":{…}, "payload": {"before": {(1)"id": 1}, "after": null,(2)“源”:{(3)“版本”:“2.1.2。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null}, "op": "d",(4)“ts_ms”:1465581902461(5)}}
表9所示。的描述删除事件值字段
字段名 描述

1

之前

可选字段,指定事件发生之前行的状态。在一个删除事件值,则之前字段包含在数据库提交时删除该行之前该行中的值。开云体育电动老虎机

在本例中,之前字段只包含主键列,因为表的副本的身份设置是默认的

2

可选字段,指定事件发生后行的状态。在一个删除事件值,则字段是,表示该行已不存在。

3.

描述事件的源元数据的必填字段。在一个删除事件值,则字段结构与for相同创建而且更新同一表的事件。许多字段值也相同。在一个删除事件值,则ts_ms而且lsn字段值以及其他值可能已经更改。但是,字段在删除事件值提供相同的元数据:

  • 开云体育官方注册网址Debezium版本

  • 连接器类型和名称

  • 开云体育电动老虎机包含已删除行的数据库和表

  • 模式名

  • 如果事件是快照的一部分(总是删除事件)

  • 执行操作的事务ID

  • 数据库日志中操作的偏移量开云体育电动老虎机

  • 在数据库中进行更改的时间戳开云体育电动老虎机

4

人事处

返回string,描述操作类型。的人事处字段值为d,表示该行已被删除。

5

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

一个删除更改事件记录为使用者提供了处理删除该行所需的信息。

让消费者能够处理删除为没有主键的表生成的事件,请设置表的主键副本的身份完整的.当一个表没有主键时副本的身份设置为默认的没有什么,一个删除事件没有之前字段。

PostgreSQL连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。

墓碑上的事件

删除一行时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有相同键的消息,消息值必须为.为了实现这一点,PostgreSQL连接器遵循一个删除特别活动墓碑上事件,具有相同的键,但价值。

截断事件

一个截断更改事件表示表已被截断。消息键是在这种情况下,消息值看起来像这样:

{"schema":{…}, "payload": {"source": {(1)“版本”:“2.1.2。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "public", "table": "customers", "txId": 556, "lsn": 46523128, "xmin": null}, "op": "t",(2)“ts_ms”:1559033904961(3)}}
表10。的描述截断事件值字段
字段名 描述

1

描述事件的源元数据的必填字段。在一个截断事件值,则字段结构与for相同创建更新,删除同一表的事件,提供以下元数据:

  • 开云体育官方注册网址Debezium版本

  • 连接器类型和名称

  • 开云体育电动老虎机包含新行的数据库和表

  • 模式名

  • 如果事件是快照的一部分(总是删除事件)

  • 执行操作的事务ID

  • 数据库日志中操作的偏移量开云体育电动老虎机

  • 在数据库中进行更改的时间戳开云体育电动老虎机

2

人事处

返回string,描述操作类型。的人事处字段值为t,表示该表被截断。

3.

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

如果一个截断语句应用于多个表截断将发出每个截断表的更改事件记录。

请注意,由于截断事件表示对整个表所做的更改,并且没有消息键,除非您使用单个分区的主题,否则与表相关的更改事件没有顺序保证(创建更新等)和截断该表的事件。例如,消费者可能会收到一个更新事件发生后,截断事件,当从不同分区读取这些事件时。

消息事件

方法支持此事件类型pgoutputPostgres 14+插件(Postgres文档

一个消息事件表示已将通用逻辑解码消息直接插入WAL,通常使用pg_logical_emit_message函数。消息键为a结构体使用一个名为前缀在本例中,携带插入消息时指定的前缀。对于事务性消息,message值如下所示:

{"schema":{…}, "payload": {"source": {(1)“版本”:“2.1.2。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "" table": "", "txId": 556, "lsn": 46523128, "xmin": null}, "op": "m",(2)“ts_ms”:1559033904961,(3)"信息":{(4)"prefix": "foo", "content": "Ymfy"}}}

与其他事件类型不同,非事务性消息没有任何关联开始结束交易活动。对于非事务性消息,message值如下所示:

{"schema":{…}, "payload": {"source": {(1)“版本”:“2.1.2。Final", "connector": "postgresql", "name": "PostgreSQL_server", "ts_ms": 1559033904863, "snapshot": false, "db": "postgres", "schema": "" table": "", "lsn": 46523128, "xmin": null}, "op": "m",(2)“ts_ms”:1559033904961(3)"信息":{(4)"prefix": "foo", "content": "Ymfy"}}
表11所示。的描述消息事件值字段
字段名 描述

1

描述事件的源元数据的必填字段。在一个消息事件值,则场结构不会有表格模式任何信息消息事件也只会有txId如果消息事件是事务性的。

  • 开云体育官方注册网址Debezium版本

  • 连接器类型和名称

  • 开云体育电动老虎机数据库名称

  • 架构名称(总是""消息事件)

  • 表名(总是""消息事件)

  • 如果事件是快照的一部分(总是消息事件)

  • 执行操作的事务ID (对于非事务性消息事件)

  • 数据库日志中操作的偏移量开云体育电动老虎机

  • 事务性消息:消息插入WAL的时间戳

  • 事务性消息;连接器遇到消息时的时间戳

2

人事处

返回string,描述操作类型。的人事处字段值为,表示这是a消息事件。

3.

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对于事务消息事件,ts_ms属性。对象指示在事务性数据库中进行更改的时间开云体育电动老虎机消息事件。通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

对于非事务性消息事件,对象的ts_ms属性的时间消息事件,而payload.ts_ms指示连接器处理事件的时间。这种差异是由于提交时间戳没有出现在Postgres的通用逻辑消息格式中,并且非事务性逻辑消息之前没有开始事件(具有时间戳信息)。

4

消息

字段,其中包含消息元数据

数据类型映射

PostgreSQL连接器用事件表示行更改,事件的结构类似于行所在的表。该事件为每个列值包含一个字段。该值在事件中如何表示取决于列的PostgreSQL数据类型。下面几节描述连接器如何将PostgreSQL数据类型映射到文字类型和一个语义类型在事件字段中。

  • 文字类型描述了如何使用Kafka Connect模式类型逐字表示值:INT8INT16INT32INT64FLOAT32FLOAT64布尔字符串字节数组地图,结构体

  • 语义类型描述了Kafka Connect模式如何捕获意义该字段使用Kafka Connect模式的名称。

如果默认的数据类型转换不能满足您的需求,您可以这样做创建自定义转换器对于连接器。

基本类型

下表描述了连接器如何映射基本类型。

表12。PostgreSQL基本数据类型的映射
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

布尔

布尔

N/A

位(1)

布尔

N/A

比特(> 1)

字节

io.开云体育官方注册网址debezium.data.Bits

长度模式参数包含一个表示比特数的整数。由此产生的byte []以小端序形式包含位,并调整大小以包含指定的位数。例如,numBytes = n/8 + (n % 8 == 0 ?0: 1)在哪里n是比特数。

有些不同((M))

字节

io.开云体育官方注册网址debezium.data.Bits

长度Schema参数包含一个表示比特数的整数(2^31 - 1,如果没有给出列的长度)。由此产生的byte []以小端序形式包含位,并根据内容调整大小。指定的大小(M)的length参数中存储io.开云体育官方注册网址debezium.data.Bits类型。

短整型SMALLSERIAL

INT16

N/A

整数串行

INT32

N/A

长整型数字BIGSERIALOID

INT64

N/A

真正的

FLOAT32

N/A

双精度

FLOAT64

N/A

CHAR ((M))

字符串

N/A

VARCHAR ((M))

字符串

N/A

字符((M))

字符串

N/A

性格不同((M))

字符串

N/A

TIMESTAMPTZ带时区的时间戳

字符串

io.开云体育官方注册网址debezium.time.ZonedTimestamp

带有时区信息的时间戳的字符串表示形式,其中时区为GMT。

TIMETZ时区时间

字符串

io.开云体育官方注册网址debezium.time.ZonedTime

带时区信息的时间值的字符串表示形式,其中时区为GMT。

区间[P]

INT64

io.开云体育官方注册网址debezium.time.MicroDuration
(默认)

类型的时间间隔的大约微秒数365.25 / 12.0每月平均天数公式。

区间[P]

字符串

io.开云体育官方注册网址debezium.time.Interval
(当interval.handling.mode设置为字符串

遵循模式的间隔值的字符串表示形式P <年> Y <月> M > <天DT <时间> H <分钟> M <秒>,例如,P1Y2M3DT4H5M6.78S

BYTEA

字节字符串

N/A

要么是原始字节(默认值),要么是base64编码的字符串,要么是base64-url-safe-encoded string,要么是十六进制编码的字符串,取决于连接器二进制处理模式设置。

开云体育官方注册网址Debezium只支持Postgresbytea_output值的配置十六进制.看到这个Postgres二进制数据类型的文档

JSONJSONB

字符串

io.开云体育官方注册网址debezium.data.Json

包含JSON文档、数组或标量的字符串表示形式。

XML

字符串

io.开云体育官方注册网址debezium.data.Xml

包含XML文档的字符串表示形式。

UUID

字符串

io.开云体育官方注册网址debezium.data.Uuid

包含PostgreSQL UUID值的字符串表示形式。

结构体

io.开云体育官方注册网址debezium.data.geometry.Point

包含包含两个的结构FLOAT64字段,(x, y).每个字段表示一个几何点的坐标。

LTREE

字符串

io.开云体育官方注册网址debezium.data.Ltree

包含PostgreSQL LTREE值的字符串表示形式。

CITEXT

字符串

N/A

INET

字符串

N/A

INT4RANGE

字符串

N/A

整数范围。

INT8RANGE

字符串

N/A

范围的长整型数字

NUMRANGE

字符串

N/A

范围的数字

TSRANGE

字符串

N/A

包含不带时区的时间戳范围的字符串表示形式。

TSTZRANGE

字符串

N/A

包含带有本地系统时区的时间戳范围的字符串表示形式。

DATERANGE

字符串

N/A

包含日期范围的字符串表示形式。它总是有一个排他的上界。

枚举

字符串

io.开云体育官方注册网址debezium.data.Enum

包含PostgreSQL的字符串表示形式枚举价值。控件中维护允许的值集允许模式参数。

时间类型

除了PostgreSQL的TIMESTAMPTZ而且TIMETZ属性的值决定了数据类型(包含时区信息)如何映射时态类型time.precision.mode连接器配置属性。下面几节描述这些映射:

time.precision.mode =自适应

time.precision.mode属性设置为自适应,则连接器根据列的数据类型定义确定文字类型和语义类型。这确保了事件完全表示数据库中的值。开云体育电动老虎机

表13。映射时time.precision.mode自适应
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

日期

INT32

io.开云体育官方注册网址debezium.time.Date

表示自纪元以来的天数。

时间(1)时间(2)(3)

INT32

io.开云体育官方注册网址debezium.time.Time

表示午夜过后的毫秒数,不包括时区信息。

(4)时间(5)(6)

INT64

io.开云体育官方注册网址debezium.time.MicroTime

表示午夜过后的微秒数,不包括时区信息。

时间戳(1)时间戳(2)时间戳(3)

INT64

io.开云体育官方注册网址debezium.time.Timestamp

表示从纪元开始的毫秒数,不包括时区信息。

时间戳(4)时间戳(5)时间戳(6)时间戳

INT64

io.开云体育官方注册网址debezium.time.MicroTimestamp

表示自纪元以来的微秒数,不包括时区信息。

time.precision.mode = adaptive_time_microseconds

time.precision.mode配置属性设置为adaptive_time_microseconds,连接器根据列的数据类型定义确定时态类型的文字类型和语义类型。这确保了事件完全表示数据库中的值,除了所有值开云体育电动老虎机时间字段以微秒为单位捕获。

表14。映射时time.precision.modeadaptive_time_microseconds
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

日期

INT32

io.开云体育官方注册网址debezium.time.Date

表示自纪元以来的天数。

时间([P])

INT64

io.开云体育官方注册网址debezium.time.MicroTime

表示以微秒为单位的时间值,不包括时区信息。PostgreSQL允许精度P在0-6的范围内存储到微秒精度。

时间戳(1)时间戳(2)时间戳(3)

INT64

io.开云体育官方注册网址debezium.time.Timestamp

表示经过纪元的毫秒数,不包括时区信息。

时间戳(4)时间戳(5)时间戳(6)时间戳

INT64

io.开云体育官方注册网址debezium.time.MicroTimestamp

表示经过epoch的微秒数,不包括时区信息。

time.precision.mode =连接

time.precision.mode配置属性设置为连接,连接器使用Kafka Connect逻辑类型。当消费者只能处理内置的Kafka Connect逻辑类型而不能处理可变精度的时间值时,这可能很有用。然而,由于PostgreSQL支持微秒精度,由连接器生成的事件使用连接时间精度模式导致精度的损失当数据库列具有开云体育电动老虎机分数秒精度大于3的值。

表15。映射时time.precision.mode连接
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

日期

INT32

org.apache.kafka.connect.data.Date

表示自纪元以来的天数。

时间([P])

INT64

org.apache.kafka.connect.data.Time

表示从午夜开始的毫秒数,不包括时区信息。PostgreSQL允许P在0-6范围内存储微秒精度,尽管这种模式会导致精度损失P大于3。

时间戳([P])

INT64

org.apache.kafka.connect.data.Timestamp

表示从纪元开始的毫秒数,不包括时区信息。PostgreSQL允许P在0-6范围内存储微秒精度,尽管这种模式会导致精度损失P大于3。

时间戳类型

时间戳类型表示不包含时区信息的时间戳。这些列将转换为基于UTC的等效Kafka Connect值。例如,时间戳取值“2018-06-20 15:13:16.945104”由io.开云体育官方注册网址debezium.time.MicroTimestamp值为“1529507596945104”time.precision.mode未设置为连接

运行Kafka Connect和Debezium的JVM的时区不会影响这种转换。开云体育官方注册网址

PostgreSQL支持使用+ /无限时间戳列。这些特殊值被转换为带值的时间戳9223372036825200000如果是正无穷或者-9223372036832400000在负无穷的情况下。这种行为模仿了PostgreSQL JDBC驱动程序的标准行为org.postgresql.PGStatement接口参考。

十进制类型

PostgreSQL连接器配置属性的设置decimal.handling.mode确定连接器如何映射十进制类型。

decimal.handling.mode属性设置为精确的,连接器使用Kafka Connectorg.apache.kafka.connect.data.Decimal所有的逻辑类型小数数字而且列。这是默认模式。

表16所示。映射时decimal.handling.mode精确的
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

数字((M [D]))

字节

org.apache.kafka.connect.data.Decimal

规模模式参数包含一个整数,表示小数点移位了多少位。

小数((M [D]))

字节

org.apache.kafka.connect.data.Decimal

规模模式参数包含一个整数,表示小数点移位了多少位。

钱((M [D]))

字节

org.apache.kafka.connect.data.Decimal

规模模式参数包含一个整数,表示小数点移位了多少位。的规模模式参数由money.fraction.digits连接器配置属性。

这条规则有一个例外。当数字小数类型的使用没有比例限制,来自数据库的值对于每个值都有不同的(变量)比例。开云体育电动老虎机在本例中,连接器使用io.开云体育官方注册网址debezium.data.VariableScaleDecimal,其中包含转移值的值和比例。

表17所示。的映射小数而且数字类型,当没有规模限制时
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

数字

结构体

io.开云体育官方注册网址debezium.data.VariableScaleDecimal

包含两个字段的结构:规模类型的INT32其中包含传输值的刻度和价值类型的字节以未缩放的形式包含原始值。

小数

结构体

io.开云体育官方注册网址debezium.data.VariableScaleDecimal

包含两个字段的结构:规模类型的INT32其中包含传输值的刻度和价值类型的字节以未缩放的形式包含原始值。

decimal.handling.mode属性设置为,连接器表示所有小数数字而且将值作为Java的双精度值,并对它们进行编码,如下表所示。

表18。映射时decimal.handling.mode
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)

数字((M [D]))

FLOAT64

小数((M [D]))

FLOAT64

钱((M [D]))

FLOAT64

的最后一种可能设置decimal.handling.mode配置属性为字符串.在本例中,连接器表示小数数字而且值作为它们的格式化字符串表示,并按照下表所示对它们进行编码。

表19。映射时decimal.handling.mode字符串
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)

数字((M [D]))

字符串

小数((M [D]))

字符串

钱((M [D]))

字符串

PostgreSQL支持(不是数字)作为存储的特殊值小数/数字的设置时的值decimal.handling.mode字符串.在本例中,连接器进行编码作为翻倍。南或者字符串常数

HSTORE类型

PostgreSQL连接器配置属性的设置hstore.handling.mode确定连接器如何映射HSTORE值。

dhstore.handling.mode属性设置为json(默认值),连接器表示HSTORE值作为JSON值的字符串表示,并按照下表所示对它们进行编码。当hstore.handling.mode属性设置为地图时,连接器使用地图的模式类型HSTORE值。

表20。映射HSTORE数据类型
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

HSTORE

字符串

io.开云体育官方注册网址debezium.data.Json

示例:使用JSON转换器的输出表示是{"key": "val"}

HSTORE

地图

N/A

示例:使用JSON转换器的输出表示是{"key": "val"}

域类型

PostgreSQL支持基于其他底层类型的用户定义类型。当使用此类列类型时,Debezium将基于完整的类型层次结构公开列开云体育官方注册网址的表示形式。

捕获使用PostgreSQL域类型的列中的更改需要特别考虑。当将列定义为包含扩展默认数据库类型之一的域类型,并且域类型定义了自定义长度或比例时,生成的模式将继承已定义的长度或比例。开云体育电动老虎机

当将列定义为包含扩展另一个定义自定义长度或规模的域类型的域类型时,生成的模式就会这样做继承已定义的长度或规模,因为这些信息在PostgreSQL驱动的列元数据中不可用。

网络地址类型

PostgreSQL拥有可以存储IPv4、IPv6和MAC地址的数据类型。最好使用这些类型而不是纯文本类型来存储网络地址。网络地址类型提供输入错误检查和专门的操作符和函数。

表21。网络地址类型的映射
PostgreSQL数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

INET

字符串

N/A

IPv4和IPv6组网

CIDR

字符串

N/A

IPv4和IPv6主机和网络

MACADDR

字符串

N/A

MAC地址

MACADDR8

字符串

N/A

MAC地址为EUI-64格式

PostGIS类型

PostgreSQL连接器支持所有PostGIS数据类型

表22。PostGIS数据类型的映射
PostGIS数据类型 文字类型(模式类型) 语义类型(模式名)和Notes

几何
(平面)

结构体

io.开云体育官方注册网址debezium.data.geometry.Geometry

包含两个字段的结构:

  • srid (INT32)-空间引用系统标识符,定义存储在结构中的几何对象类型。

  • wkb(字节)-以众所周知的二进制格式编码的几何对象的二进制表示。

地理位置
(球)

结构体

io.开云体育官方注册网址debezium.data.geometry.Geography

包含两个字段的结构:

  • srid (INT32)-空间引用系统标识符,定义存储在结构中的地理对象类型。

  • wkb(字节)-以众所周知的二进制格式编码的几何对象的二进制表示。

烤的价值观

PostgreSQL对页面大小有硬性限制。这意味着需要存储大于8 kb左右的值吐司存储.这会影响来自数据库的复制消息。开云体育电动老虎机使用TOAST机制存储且未被更改的值不包含在消息中,除非它们是表的复制标识的一部分。对于Debezium来说,没有安全的方法可以直接从开云体育官方注册网址数据库中读取带外缺失的值,因为这可能会导致竞争条件。开云体育电动老虎机因此,Debezium遵循以开云体育官方注册网址下规则来处理烘烤值:

  • 表与副本标识已满- TOAST列值是之前而且更改事件中的字段与任何其他列一样。

  • 表与副本标识默认-当收到更新事件中,不属于副本标识的任何未开云体育电动老虎机更改的TOAST列值都不包含在事件中。类似地,当收到一个删除事件中没有TOAST列(如果有的话)之前字段。由于D开云体育官方注册网址ebezium在这种情况下不能安全地提供列值,连接器返回一个由连接器配置属性定义的占位符值,unavailable.value.placeholder

默认值

如果为数据库模式中的某列指定了默认值,PostgreSQL连接器将尽可能地尝试将该值传播到Kafka模式开云体育电动老虎机。支持大多数常见数据类型,包括:

  • 布尔

  • 数字类型(INT浮动数字等)。

  • 文本类型(字符VARCHAR文本等)。

  • 时间类型(日期时间时间间隔时间戳TIMESTAMPTZ

  • JSONJSONBXML

  • UUID

注意,对于时态类型,默认值的解析由PostgreSQL库提供;因此,任何通常由PostgreSQL支持的字符串表示也应该被连接器支持。

如果默认值是由函数生成的,而不是直接内联指定的,则连接器将导出等效的0对于给定的数据类型。这些值包括:

  • 布尔

  • 0具有适当的精度,用于数值类型

  • 文本/XML类型的空字符串

  • {}JSON类型

  • 1970-01-01日期时间戳TIMESTAMPTZ类型

  • 00:00时间

  • 时代时间间隔

  • 00000000-0000-0000-0000-000000000000UUID

这种支持目前只扩展到函数的显式使用。例如,CURRENT_TIMESTAMP (6)括号支持,但是CURRENT_TIMESTAMP不是。

支持默认值的传播主要是为了在使用PostgreSQL连接器和模式注册表时允许安全的模式演变,模式注册表强制模式版本之间的兼容性。由于这个主要问题,以及不同插件的刷新行为,Kafka模式中的默认值不能保证总是与数据库模式中的默认值同步。开云体育电动老虎机

  • 默认值可能会在Kafka模式中出现“late”,这取决于某个插件何时/如何触发内存模式的刷新。如果默认值在刷新之间多次更改,那么Kafka模式中的值可能永远不会出现/被跳过

  • 默认值可能在Kafka模式中“较早”出现,如果在连接器有记录等待处理时触发模式刷新。这是因为列元数据是在刷新时从数据库中读取的,而不是出现在复制消息中。开云体育电动老虎机如果连接器在后面并发生刷新,或者在连接器启动时,如果连接器停止一段时间而继续向源数据库写入更新,则可能发生这种情况。开云体育电动老虎机

这种行为可能出乎意料,但仍然是安全的。只有模式定义会受到影响,而消息中显示的实际值将与写入源数据库的值保持一致。开云体育电动老虎机

建立Postgres

在使用PostgreSQL连接器监视PostgreSQL服务器上提交的更改之前,请确定您打算使用哪个逻辑解码插件。如果你计划要使用本机pgoutput逻辑复制流支持,则必须将逻辑解码插件安装到PostgreSQL服务器中。然后,启用复制槽位,并为用户配置足够的权限来执行复制。

如果数据库由服务开云体育电动老虎机托管,例如Heroku Postgres您可能无法安装插件。如果是这样,如果你正在使用PostgreSQL 10+,你可以使用pgoutput解码器支持,以捕获数据库中的更改。开云体育电动老虎机如果不能这样做,则无法将Debezium与数据库一起使用。开云体育官方注册网址开云体育电动老虎机

云中的PostgreSQL

PostgreSQL在亚马逊RDS

捕获正在运行的PostgreSQL数据库中的更改是可能的开云体育电动老虎机Amazon RDS.这样做:

  • 设置实例参数rds.logical_replication1

  • 验证wal_level参数设置为逻辑通过运行查询显示wal_level作为数据库RD开云体育电动老虎机S主用户。在多区域复制设置中可能不是这样。不能手动设置此选项。它是自动改变rds.logical_replication参数设置为1.如果wal_level未设置为逻辑在进行上述更改之后,可能是因为在参数组更改之后必须重新启动实例。在维护窗口期间会重新启动,也可以手动启动重新启动。

  • 设置Debezi开云体育官方注册网址umplugin.name参数pgoutput

  • 属性的AWS帐户启动逻辑复制rds_replication的角色。角色授予管理逻辑槽和使用逻辑槽进行数据流的权限。默认情况下,AWS上只有主用户帐户具有rds_replication在Amazon RDS上的角色。若要允许主帐户以外的用户帐户发起逻辑复制,必须将权限授予该帐户rds_replication的角色。例如,授予rds_replication< my_user >.你一定有超级用户授予rds_replication用户的角色。若要允许主帐户以外的帐户创建初始快照,必须授予选择对要捕获的表上的帐户的权限。有关PostgreSQL逻辑复制安全性的更多信息,请参见PostgreSQL的文档

Azure上的PostgreSQL

Debezium可以与 一起使用开云体育官方注册网址Azure数开云体育电动老虎机据库PostgreSQL,支持pgoutput逻辑解码 插件,由Debezium支持。开云体育官方注册网址

将Azure复制支持设置为 逻辑.您可以使用Azure CLI或者是Azure门户来配置这个。例如,要使用Azure CLI,下面是Az postgres服务器需要执行的命令:

Az postgres服务器配置集——resource-group mygroup——server-name myserver——name azure。replication_support --value logical az postgres server restart --resource-group mygroup --name myserver

PostgreSQL在CrunchyBridge上

它可以使用Debezium与开云体育官方注册网址CrunchyBridge;逻辑复制已经开启。的pgoutput插件可用。您必须创建一个复制用户并提供正确的权限。

在使用pgoutput插件,建议您配置过滤后的随着publication.autocreate.mode.如果你使用all_tables的默认值publication.autocreate.mode,并且没有找到发布,连接器将尝试使用 创建一个发布为所有表创建publish ;,但由于缺乏权限而失败。

安装逻辑解码输出插件

看到PostgreSQL逻辑解码输出插件安装有关设置和测试逻辑解码插件的更详细说明。

从PostgreSQL 9.4开始,读取预写日志更改的唯一方法是安装逻辑解码输出插件。插件是用C语言编写、编译并安装在运行PostgreSQL服务器的机器上的。插件使用许多特定于PostgreSQL的api,如PostgreSQL的文档

PostgreSQL连接器与Debezium支持的逻辑解码插件之一一起工作,以接收来自开云体育官方注册网址数据库的更改事件开云体育电动老虎机Protobuf格式或者是pgoutput格式。的pgoutput插件是随PostgreSQL数据库而来的。开云体育电动老虎机有关使用Protobuf的更多详细信息,请访问decoderbufs插件,请参阅插件文档其中讨论了它的要求、限制以及如何编译它。

为了简单起见,Debezium还开云体育官方注册网址提供了基于上游PostgreSQL服务器映像的容器映像,在此映像之上编译并安装插件。你可以使用这张图片作为安装所需详细步骤的示例。

Debe开云体育官方注册网址zium逻辑解码插件只在Linux机器上安装和测试过。对于Windows和其他操作系统,可能需要不同的安装步骤。

插件的差异

插件行为在所有情况下并不完全相同。已经确定了这些差异:

  • 在流处理过程中检测到模式更改时,所有插件都会刷新数据库中的模式元数据开云体育电动老虎机pgoutput插件在某种程度上更“渴望”触发这样的刷新。例如,对列的默认值的更改将触发刷新pgoutput,而其他插件不会意识到这个更改,直到另一个更改触发刷新(例如。增加一个新列。)这是由于的行为pgoutput而不是Debezium本身。开云体育官方注册网址

在测试套件中跟踪所有最新的差异Java类

配置PostgreSQL服务器

如果你正在使用逻辑解码插件除了pgoutput,安装完成后,需要对PostgreSQL服务器进行如下配置:

  1. 要在启动时加载插件,请将以下内容添加到postgresql.conf文件:

    # MODULES shared_preload_libraries = 'decoderbufs'(1)
    1 指示服务器加载decoderbufs启动时的逻辑解码插件(插件的名称在Protobuf使文件)。
  2. 若要配置复制插槽而不考虑所使用的解码器,请在postgresql.conf文件:

    #复制wal_level = logical(1)
    1 指示服务器对预写日志使用逻辑解码。

根据您的需求,在使用Debezium时,您可能必须设置其他PostgreSQL流复制参数。开云体育官方注册网址例子包括max_wal_senders而且max_replication_slots用于增加可以并发访问发送服务器的连接器数量,以及wal_keep_size用于限制复制插槽将保留的最大WAL大小。有关配置流复制的详细信息,请参见PostgreSQL的文档

开云体育官方注册网址Debezium使用PostgreSQL的逻辑解码,它使用复制插槽。复制插槽保证保留Debezium所需的所有WAL段,即使在Debezium中断期间。开云体育官方注册网址因此,密切监视复制插槽非常重要,以避免过多的磁盘消耗和其他可能发生的情况,如复制插槽长时间未使用时的编目膨胀。有关更多信息,请参见PostgreSQL流复制文档

如果你在和synchronous_commit其他设置,建议设置为wal_writer_delay到一个值,例如10毫秒,以实现更改事件的低延迟。否则,应用它的默认值,这会增加大约200毫秒的延迟。

设置权限

设置PostgreSQL服务器以运行Debezium连接器需要一个可以执行复制的开云体育官方注册网址数据库用户。开云体育电动老虎机复制只能由具有相应权限的数据库用户执行,且只能对配置的主机数量进行复制。开云体育电动老虎机

不过,默认情况下,超级用户拥有必要的权限复制而且登录中提到的角色安全,最好不要为Debezium复制用户提供更高的权限。开云体育官方注册网址相反,应该创建一个具有所需最低权限的开云体育官方注册网址Debezium用户。

先决条件
  • PostgreSQL管理权限。

过程
  1. 要为用户提供复制权限,请定义具有复制权限的PostgreSQL角色至少复制而且登录权限,然后将该角色授予用户。例如:

    创建角色<名称>复制登录;

设置特权以使Debezium在使用时创建PostgreSQ开云体育官方注册网址L发布pgoutput

如果你使用pgoutput作为逻辑解码插件,Debezium必须以具有特定权限的用户在数据开云体育官方注册网址库中操作。开云体育电动老虎机

开云体育官方注册网址Debezium流更改PostgreSQL源表的事件出版物为表创建的。发布包含一组经过筛选的更改事件,这些事件由一个或多个表生成。每个发布中的数据都是根据发布规范筛选的。该规范可以由PostgreSQL数据库管理员创建,也可以由Debezium连接器创建。开云体育官方注册网址开云体育电动老虎机为了允许Debezium P开云体育官方注册网址ostgreSQL连接器创建发布并指定要复制到发布中的数据,连接器必须使用数据库中的特定权限进行操作。开云体育电动老虎机

有几个选项可以确定如何创建发布。通常,在设置连接器之前,最好手动为想要捕获的表创建发布。但是,您可以配置环境,允许Debezium自动创建发布,并指定添加到发布中的数据。开云体育官方注册网址

开云体育官方注册网址Debezium使用包含列表和排除列表属性来指定如何在发布中插入数据。有关启用Debezium创建发布的选项的更多信息,请参见开云体育官方注册网址publication.autocreate.mode

Debe开云体育官方注册网址zium要创建PostgreSQL发布,必须以具有以下权限的用户运行:

  • 数据库中的复制权限,以便将表添加到发布。开云体育电动老虎机

  • 创建数据库上用于添加发布的权限。开云体育电动老虎机

  • 选择复制初始表数据的权限。表所有者自动拥有选择表的权限。

若要向发布添加表,用户必须是表的所有者。但是由于源表已经存在,您需要一种机制来与原始所有者共享所有权。为了启用共享所有权,您需要创建一个PostgreSQL复制组,然后将现有的表所有者和复制用户添加到组中。

过程
  1. 创建复制组。

    创建角色< replication_group >
  2. 将表的原始所有者添加到组中。

    将replication_group授权给< original_owner >
  3. 添加Debezi开云体育官方注册网址um复制用户到组中。

    将replication_group授权给< replication_user >
  4. 将表的所有权转移到< replication_group >

    ALTER TABLE< table_name >replication_group的所有者;

为了让D开云体育官方注册网址ebezium指定捕获配置,的值publication.autocreate.mode必须设置为过滤后的

配置PostgreSQL以允许与Debezium连接器主机进行复制开云体育官方注册网址

为了使Debeziu开云体育官方注册网址m能够复制PostgreSQL数据,您必须配置数据库以允许与运行PostgreSQL连接器的主机进行复制。开云体育电动老虎机要指定允许使用数据库进行复制的客户端,请向PostgreSQL基于主机的身份验证文件中添加条目,开云体育电动老虎机pg_hba.conf.有关的更多信息pg_hba.conf文件的更多信息,请参考PostgreSQL的文档。

过程
  • pg_hba.conf文件指定可以与数据库主机进行复制的Deb开云体育官方注册网址ezium连接器主机。开云体育电动老虎机例如,

    pg_hba.conf文件的例子:
    本地复制信任(1)主机复制 127.0.0.1/32 trust(2)主机复制::1/128 trust(3)
    1 指示服务器允许复制<对于>本地,即在服务器机器上。
    2 指示服务器允许<对于>本地主机接收复制更改使用IPV4
    3. 指示服务器允许<对于>本地主机接收复制更改使用IPV6

有关网络掩码的详细信息,请参见PostgreSQL文档

支持PostgreSQL拓扑

PostgreSQL连接器可以与独立的PostgreSQL服务器一起使用,也可以与PostgreSQL服务器集群一起使用。

正如前面提到的一开始, PostgreSQL(所有版本⇐12)只支持逻辑复制插槽主要的服务器。这意味着PostgreSQL集群中的副本不能配置为逻辑复制,因此Debezium PostgreSQL连接器只能与主服务器连接和通信。开云体育官方注册网址如果此服务器失败,连接器将停止。当群集修复时,如果原始主服务器再次提升到主要的,则可以重新启动连接器。但是,如果是不同的PostgreSQL服务器使用插件和适当的配置被提升为主要的时,必须更改连接器配置以指向新的主要的服务器,然后可以重新启动连接器。

WAL磁盘空间消耗

在某些情况下,WAL文件所消耗的PostgreSQL磁盘空间可能会激增或超出通常的比例。造成这种情况的原因可能有以下几种:

  • 连接器已接收到的数据的LSN在confirmed_flush_lsn服务器的列pg_replication_slots视图。比这个LSN更老的数据不再可用,数据库负责回收磁盘空间。开云体育电动老虎机

    同样在pg_replication_slots看来,restart_lsn列包含连接器可能需要的最老WAL的LSN。如果的值为confirmed_flush_lsn是有规律地增加和价值的restart_lsn滞后,数据库需要回收空间。开云体育电动老虎机

    数据库通开云体育电动老虎机常以批处理块的形式回收磁盘空间。这是预期的行为,用户不需要做任何操作。

  • 数据库中有许多更新正在被跟踪,但只有极少数更新与连接器正开云体育电动老虎机在为其捕获更改的表和模式相关。这种情况可以通过周期性的心跳事件轻松解决。设置heartbeat.interval.ms连接器配置属性。

  • PostgreSQL实例包含多个数据库,其中一个是高流量数据库。开云体育电动老虎机开云体育官方注册网址Debezium捕获与另一个数据库相比流量较低的另一个数据库中的更改。开云体育电动老虎机开云体育官方注册网址然后Debezium不能确认LSN,因为复制插槽工作在每个数据库,并且Debezium没有被调用。开云体育电动老虎机由于WAL由所有数据库共享,因此使用的数量趋于增开云体育电动老虎机长,直到Debezium为其捕获更改的数据库触发事件。开云体育官方注册网址为了克服这一问题,有必要:

    • 命令启用周期心跳记录生成heartbeat.interval.ms连接器配置属性。

    • 定期从数据库发出更改事件,Debezium正在为其捕获更改。开云体育官方注册网址开云体育电动老虎机

    然后,单独的进程通过插入新行或重复更新同一行定期更新表。然后PostgreSQL调用Debezium, 开云体育官方注册网址Debezium确认最新的LSN并允许数据库回收WAL空间。开云体育电动老虎机的方法可以自动执行此任务heartbeat.action.query连接器配置属性。

对于使用PostgreSQL的AWS RDS的用户,在空闲环境中可能会出现类似于高流量/低流量场景的情况。AWS RDS导致对其自己的系统表的写入在频繁的基础上(5分钟)对客户端不可见。同样,定期释放事件可以解决问题。

部署

要部署Debezium 开云体育官方注册网址PostgreSQL连接器,您需要安装Debezium PostgreSQL连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。

过程
  1. 下载Debezium开云体育官方注册网址PostgreSQL连接器插件存档

  2. 将文件解压缩到Kafka Connect环境中。

  3. 将包含JAR文件的目录添加到卡夫卡连接的plugin.path

  4. 重新启动Kafka Connect进程以获取新的JAR文件。

如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Zookeeper, Kafka, PostgreSQL和Kafka连接已经安装和准备运行的PostgreSQL连接器。你也可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium

连接器配置示例

以下是PostgreSQL连接器的配置示例,该连接器连接到PostgreSQL服务器,端口为5432,地址为192.168.99.100,逻辑名为实现.通常,通过设置连接器可用的配置属性,可以在JSON文件中配开云体育官方注册网址置Debezium PostgreSQL连接器。

您可以选择为数据库中的模式和表的子集生成事件。开云体育电动老虎机可选地,您可以忽略、屏蔽或截断包含敏感数据的列、大于指定大小的列或不需要的列。

{"name": "fulfillment-connector",(1)"config": {"connector.class": "io. 开云体育官方注册网址debezum .connector.postgresql. postgresconnector ",(2)“开云体育电动老虎机数据库。主机名”:“192.168.99.100”,(3)“开云体育电动老虎机数据库。港”:“5432”,(4)“开云体育电动老虎机数据库。user": "postgres",(5)“开云体育电动老虎机数据库。密码”:“postgres”,(6)“开云体育电动老虎机数据库。dbname" : "postgres",(7)”的话题。前缀": "fulfillment",(8)“table.include。名单”:“public.inventory”(9)}}
1 在Kafka Connect服务中注册连接器时的名称。
2 这个PostgreSQL连接器类的名称。
3. PostgreSQL服务器地址。
4 PostgreSQL服务器的端口号。
5 属性的PostgreSQL用户的名称需要的特权
6 属性的PostgreSQL用户的密码需要的特权
7 要连接的PostgreSQL数据库的名称开云体育电动老虎机
8 PostgreSQL服务器/集群的主题前缀,它形成一个名称空间,用于连接器写入的所有Kafka主题的名称,Kafka Connect模式名称,以及使用Avro转换器时对应的Avro模式的名称空间。
9 连接器将监视的由该服务器托管的所有表的列表。这是可选的,还有其他属性用于列出要从监视中包含或排除的模式和表。

看到PostgreSQL连接器属性的完整列表可以在这些配置中指定。

您可以使用帖子命令到正在运行的Kafka Connect服务。该服务记录配置并启动一个执行以下操作的连接器任务:

  • 连接PostgreSQL数据库。开云体育电动老虎机

  • 读取事务日志。

  • 流将事件记录更改为Kafka主题。

添加连接器配置

要运行Debezi开云体育官方注册网址um PostgreSQL连接器,请创建一个连接器配置并将该配置添加到Kafka Connect集群中。

先决条件
过程
  1. 为PostgreSQL连接器创建一个配置。

  2. 使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。

结果

连接器启动后,它将执行一致性快照配置连接器的PostgreSQL服务器数据库。开云体育电动老虎机然后连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到Kafka主题。

连接器属性

Debe开云体育官方注册网址zium PostgreSQL连接器具有许多配置属性,您可以使用这些属性为应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:

以下配置属性为要求除非有默认值可用。

表23。所需的连接器配置属性
财产 默认的 描述

没有默认的

连接器的唯一名称。尝试使用相同的名称再次注册将失败。所有Kafka Connect连接器都需要这个属性。

没有默认的

连接器的Java类的名称。始终使用值io.开云体育官方注册网址debezium.connector.postgresql.PostgresConnector用于PostgreSQL连接器。

1

应该为此连接器创建的最大任务数。PostgreSQL连接器总是使用单个任务,因此不使用这个值,所以默认值总是可以接受的。

decoderbufs

PostgreSQL的名称逻辑解码插件安装在PostgreSQL服务器上。

支持的值为decoderbufs,pgoutput

开云体育官方注册网址

PostgreSQL逻辑解码槽的名称,它是为特定数据库/模式的特定插件的流更改而创建的。开云体育电动老虎机服务器使用这个插槽将事件流传输到您正在配置的Debezium连接器。开云体育官方注册网址

槽位名称必须符合PostgreSQL复制槽命名规则,这些州:“每个复制插槽都有一个名称,可以包含小写字母、数字和下划线。”

当连接器以预期的优雅方式停止时,是否删除逻辑复制插槽。默认行为是,当连接器停止时,复制插槽仍然为连接器配置。当连接器重新启动时,具有相同的复制插槽可以使连接器从停止的地方开始处理。

设置为真正的仅在测试或开发环境中。删除插槽允许数据库丢弃WAL段。开云体育电动老虎机当连接器重新启动时,它会执行一个新的快照,或者它可以从Kafka Connect偏移量主题中的一个持久偏移量继续。

dbz_publication

使用时为流更改而创建的PostgreSQL发布的名称pgoutput

如果该发布还不存在,则在启动时创建所有的表.开云体育官方注册网址然后Debezium应用它自己的包含/排除列表过滤(如果配置了的话),以限制发布更改感兴趣的特定表的事件。连接器用户必须具有超级用户权限才能创建此发布,因此通常最好在第一次启动连接器之前创建发布。

如果发布已经存在,无论是针对所有表还是配置了表的一个子集,Debezium将使用定义好的发布。开云体育官方注册网址

没有默认的

PostgreSQL数据库服务器的IP地址或主机名。开云体育电动老虎机

5432

PostgreSQL数据库服务器的整型端口号。开云体育电动老虎机

没有默认的

连接PostgreSQL数据库服务器的用户名。开云体育电动老虎机

没有默认的

连接PostgreSQL数据库服务器时使用的密码。开云体育电动老虎机

没有默认的

用于流化更改的PostgreSQL数据库的名称。开云体育电动老虎机

没有默认的

主题前缀,为Debezium在其中捕获更改的特定PostgreSQL数据库服务器或集群提供名称空间。开云体育官方注册网址开云体育电动老虎机该前缀在所有其他连接器中应该是唯一的,因为它被用作从该连接器接收记录的所有Kafka主题的主题名前缀。数据库服务器逻辑名中只能使用字母数字字符、连字符、点和下划线。开云体育电动老虎机

不要更改此属性的值。如果更改了name值,在重新启动后,连接器不会继续向原始主题发出事件,而是向名称基于新值的主题发出后续事件。

没有默认的

一个可选的、以逗号分隔的正则表达式列表,该列表与您要使用的模式名称匹配想要捕获变更。中未包含的任何模式名称schema.include.list被排除在捕获其更改之外。默认情况下,捕获所有非系统模式的更改。

为了匹配模式的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与模式的整个标识符匹配;它不匹配可能出现在模式名称中的子字符串。
如果在配置中包含此属性,则不要同时设置schema.exclude.list财产。

没有默认的

一个可选的、以逗号分隔的正则表达式列表,该列表与您要使用的模式名称匹配想要捕捉变化。中未包含其名称的模式schema.exclude.list捕获其更改(系统模式除外)。

为了匹配模式的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与模式的整个标识符匹配;它不匹配可能出现在模式名称中的子字符串。
如果在配置中包含此属性,则不要设置schema.include.list财产。

没有默认的

一个可选的、以逗号分隔的正则表达式列表,它匹配您想要捕获其更改的表的完全限定表标识符。设置此属性时,连接器仅从指定的表捕获更改。每个标识符都是这样的schemaName的表.默认情况下,连接器捕获每个模式中每个非系统表中的更改,这些表的更改正在被捕获。

为了匹配表的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与表的整个标识符匹配;它不匹配表名中可能存在的子字符串。
如果在配置中包含此属性,则不要同时设置table.exclude.list财产。

没有默认的

一个可选的、以逗号分隔的正则表达式列表,它匹配不希望捕获其更改的表的完全限定表标识符。每个标识符都是这样的schemaName的表.设置此属性后,连接器将捕获未指定的每个表的更改。

为了匹配表的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与表的整个标识符匹配;它不匹配表名中可能存在的子字符串。
如果在配置中包含此属性,则不要设置table.include.list财产。

没有默认的

一个可选的、以逗号分隔的正则表达式列表,它与应该包含在变更事件记录值中的列的完全限定名称相匹配。列的完全限定名的格式为schemaName的表columnName

为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,该表达式用于匹配列的整个名称字符串;它不匹配可能出现在列名中的子字符串。
如果在配置中包含此属性,则不要同时设置column.exclude.list财产。

没有默认的

一个可选的、以逗号分隔的正则表达式列表,它匹配应从更改事件记录值中排除的列的完全限定名称。列的完全限定名的格式为schemaName的表columnName

为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,该表达式用于匹配列的整个名称字符串;它不匹配可能出现在列名中的子字符串。
如果在配置中包含此属性,则不要设置column.include.list财产。

自适应

时间、日期和时间戳可以用不同的精度表示:

自适应根据数据库列的类型,使用毫秒、微秒或纳秒精度值捕获与数据库中完全相同的时间和时间戳值。开云体育电动老虎机

adaptive_time_microseconds根据数据库列的类型,使用毫秒、微秒或纳秒精度值捕获与数据库中相同的日期、日期时间和时间戳值。开云体育电动老虎机一个例外是时间类型字段,这些字段总是以微秒的形式捕获。

连接always通过使用Kafka Connect的内置表示来表示时间和时间戳值时间日期,时间戳,无论数据库列的精度如何,都使用毫秒精度。开云体育电动老虎机看到时间值

精确的

指定连接器应如何处理的值小数而且数字列:

精确的通过使用java.math.BigDecimal在变更事件中以二进制形式表示值。

通过使用值,这可能会导致精度的损失,但更容易使用。

字符串将值编码为格式化的字符串,这很容易使用,但关于真实类型的语义信息将丢失。看到十进制类型

地图

指定连接器应如何处理的值hstore列:

地图通过使用地图

json通过使用json字符串.此设置将值编码为格式化的字符串,例如{"key": "val"}.看到PostgreSQLHSTORE类型

数字

指定连接器应如何处理的值时间间隔列:

数字表示使用近似微秒数的间隔。

字符串使用字符串模式表示准确地表示间隔P <年> Y <月> M > <天DT <时间> H <分钟> M <秒>.例如:P1Y2M3DT4H5M6.78S.看到PostgreSQL基本类型

禁用

是否使用加密连接到PostgreSQL服务器。选项包括:

禁用使用未加密的连接。

需要使用安全(加密)连接,如果无法建立则失败。

verify-ca表现得像需要还会根据配置的证书颁发机构(CA)证书验证服务器TLS证书,如果没有找到有效的匹配CA证书,则会失败。

verify-full表现得像verify-ca但是还会验证服务器证书是否与连接器试图连接的主机相匹配。看到PostgreSQL文档获取更多信息。

没有默认的

包含客户端SSL证书的文件的路径。看到PostgreSQL文档获取更多信息。

没有默认的

包含客户端SSL私钥的文件的路径。看到PostgreSQL文档获取更多信息。

没有默认的

从指定的文件中访问客户端私钥的密码开云体育电动老虎机database.sslkey.看到PostgreSQL文档获取更多信息。

没有默认的

包含用于验证服务器的根证书的文件的路径。看到PostgreSQL文档获取更多信息。

真正的

启用TCP保持连接探测,以验证数据库连接是否仍然活跃。开云体育电动老虎机看到PostgreSQL文档获取更多信息。

真正的

控制是否删除事件之后是一个墓碑事件。

真正的-删除操作用a表示删除事件和后续的墓碑事件。

-只有一个删除事件被触发。

当一个源记录被删除后,触发一个墓碑事件(默认行为)允许Kafka完全删除与被删除行的键相关的所有事件日志压实已为主题启用。

N/A

一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。类指定的字符数时,如果要截断一组列中的数据,请设置此属性长度属性名。集长度到正整数值,例如,column.truncate.to.20.chars

列的完全限定名遵循以下格式:< schemaName ><表>< columnName >.为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配可能出现在列名中的子字符串。

您可以在单个配置中指定具有不同长度的多个属性。

N/A

一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。如果希望连接器屏蔽一组列的值,请设置此属性,例如,如果这些列包含敏感数据。集长度为正整数,以将指定列中的数据替换为星号()指定的字符长度属性名。集长度0(0)将指定列中的数据替换为空字符串。

列的完全限定名遵循以下格式:schemaName的表columnName.为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配可能出现在列名中的子字符串。

您可以在单个配置中指定具有不同长度的多个属性。

N/A

一个可选的、以逗号分隔的正则表达式列表,它与基于字符的列的完全限定名匹配。列的完全限定名的格式为< schemaName ><表>< columnName >
为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配可能出现在列名中的子字符串。在产生的更改事件记录中,指定列的值将被假名替换。

假名由应用指定的参数产生的散列值组成hashAlgorithm而且.根据所使用的散列函数,将维护引用完整性,而列值将被假名替换。中描述了支持的哈希函数MessageDigest节Java密码体系结构标准算法名称文档。

在下面的例子中,CzQMA0cB5K是随机选择的盐。

column.mask.hash.sha with.salt——256.。CzQMA0cB5K =库存。订单。自定义erName, inventory.shipment.customerName

必要时,笔名会自动缩短为列的长度。连接器配置可以包括多个属性,这些属性指定不同的哈希算法和盐。

取决于hashAlgorithm使用,选中的数据集与实际数据集相匹配时,得到的数据集可能不会被完全屏蔽。

如果值在不同的位置或系统中被散列,则应该使用散列策略版本2来确保保真度。

N/A

一个可选的、以逗号分隔的正则表达式列表,它与您希望连接器为其发出表示列元数据的额外参数的列的完全限定名称匹配。设置此属性后,连接器将向事件记录模式添加以下字段:

  • __开云体育官方注册网址debezium.source.column.type

  • __开云体育官方注册网址debezium.source.column.length

  • __开云体育官方注册网址debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(对于变宽类型)。
启用连接器发出这些额外的数据可以帮助适当调整接收器数据库中特定的数字或基于字符的列的大小。开云体育电动老虎机

列的完全限定名遵循以下格式之一:开云体育电动老虎机数据库名的表columnName,或开云体育电动老虎机数据库名schemaName的表columnName
为了匹配列的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配可能出现在列名中的子字符串。

N/A

一个可选的、以逗号分隔的正则表达式列表,用于指定为数据库中的列定义的数据类型的完全限定名称。开云体育电动老虎机当设置此属性时,对于具有匹配数据类型的列,连接器将发出事件记录,其中在其模式中包含以下额外字段:

  • __开云体育官方注册网址debezium.source.column.type

  • __开云体育官方注册网址debezium.source.column.length

  • __开云体育官方注册网址debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(对于变宽类型)。
启用连接器发出这些额外的数据可以帮助适当调整接收器数据库中特定的数字或基于字符的列的大小。开云体育电动老虎机

列的完全限定名遵循以下格式之一:开云体育电动老虎机数据库名的表typeName,或开云体育电动老虎机数据库名schemaName的表typeName
为了匹配数据类型的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与数据类型的整个名称字符串匹配;表达式与类型名中可能出现的子字符串不匹配。

有关特定于postgresql的数据类型名称的列表,请参见PostgreSQL数据类型映射

空字符串

一个表达式列表,用于指定连接器用来为发布到指定表的Kafka主题的更改事件记录形成自定义消息键的列。

默认情况下,Debezi开云体育官方注册网址um使用表的主键列作为它发出的记录的消息键。为了代替默认值,或者为缺少主键的表指定一个键,您可以基于一个或多个列配置自定义消息键。

若要为表建立自定义消息键,请列出表,然后列出用作消息键的列。每个列表条目采用以下格式:

< fully-qualified_tableName >< keyColumn >< keyColumn >

若要基于多个列名创建表键,请在列名之间插入逗号。

每个全限定表名都是一个正则表达式,格式如下:

< schemaName ><表>

该属性可以包括多个表的条目。使用分号分隔列表中的表项。

下面的示例为表设置消息键inventory.customers而且purchase.orders

inventory.customers: pk1 pk2; (. *) .purchaseorders: pk3 pk4

对于桌子来说inventory.customer,列pk1而且pk2指定为消息键。为任何模式中的表,列pk3而且pk4服务器作为消息键。

用于创建自定义消息键的列的数量没有限制。但是,最好使用指定唯一键所需的最小数目。

all_tables

仅当流通过使用发生变化时应用pgoutput插件.该设置决定如何创建出版应该工作。指定以下值之一:

all_tables—如果发布存在,连接器将使用它。如果发布不存在,连接器将为数据库中连接器正在捕获更改的所有表创建发布。开云体育电动老虎机连接器要创建发布,必须通过具有创建发布和执行复制权限的数据库用户帐户访问数据库。开云体育电动老虎机使用以下SQL命令授予所需的权限为所有表创建publish ;

禁用—连接器不尝试创建发布。数据开云体育电动老虎机库管理员或配置为执行复制的用户必须在运行连接器之前创建发布。如果连接器无法找到发布,则连接器抛出异常并停止。

过滤后的—如果发布存在,连接器将使用它。类所指定的匹配当前筛选器配置的表,如果不存在发布,则连接器将为这些表创建一个新的发布schema.include.listschema.exclude.list,table.include.list,table.exclude.list连接器配置属性。例如:CREATE PUBLICATION FOR TABLE .如果发布存在,连接器将更新与当前筛选器配置匹配的表的发布。例如:ALTER PUBLICATION SET TABLE

字节

指定二进制(bytea)列应该在更改事件中表示:

字节表示二进制数据为字节数组。

base64将二进制数据表示为base64编码的字符串。

base64-url-safe将二进制数据表示为base64-url-safe-encoded字符串。

十六进制表示二进制数据为十六进制编码(base16)字符串。

没有一个

指定应如何调整模式名称以与连接器使用的消息转换器兼容。可能的设置:

  • 没有一个不应用任何调整。

  • avro将不能在Avro类型名称中使用的字符替换为下划线。

2

指定转换Postgres时应该使用多少个十进制数字类型java.math.BigDecimal,表示更改事件中的值。仅适用于以下情况decimal.handling.mode设置为精确的

没有默认的

一个可选的、以逗号分隔的正则表达式列表,它与希望连接器捕获的逻辑解码消息前缀的名称相匹配。默认情况下,连接器捕获所有逻辑解码消息。设置此属性时,连接器仅捕获带有该属性指定的前缀的逻辑解码消息。排除所有其他逻辑解码消息。

为了匹配消息前缀的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与整个消息前缀字符串匹配;表达式不匹配可能出现在前缀中的子字符串。

如果在配置中包含此属性,则不要同时设置message.prefix.exclude.list财产。

的结构信息消息事件及其排序语义,请参见消息事件

没有默认的

一个可选的、以逗号分隔的正则表达式列表,它与您不希望连接器捕获的逻辑解码消息前缀的名称相匹配。设置此属性时,连接器不会捕获使用指定前缀的逻辑解码消息。捕获所有其他消息。
若要排除所有逻辑解码消息,请将此属性的值设置为.*

为了匹配消息前缀的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与整个消息前缀字符串匹配;表达式不匹配可能出现在前缀中的子字符串。

如果在配置中包含此属性,则不要同时设置message.prefix.include.list财产。

的结构信息消息事件及其排序语义,请参见消息事件

以下先进的配置属性具有在大多数情况下都适用的默认值,因此很少需要在连接器的配置中指定。

表24。高级连接器配置属性
财产 默认的 描述

没有默认的

属性的符号名称的逗号分隔列表自定义转换器连接器可以使用的实例。例如,

国际标准图书编号

您必须设置转换器属性使连接器能够使用自定义转换器。

对于为连接器配置的每个转换器,还必须添加.type属性,该属性指定实现转换器接口的类的全限定名称。的.type属性使用以下格式:

< converterSymbolicName >.type

例如,

isbn。类型:io.debezium.test.IsbnConverter

如果希望进一步控制已配置转换器的行为,可以添加一个或多个配置参数来将值传递给转换器。若要将任何其他配置参数与转换器关联起来,请在参数名称前加上转换器的符号名称。
例如,

isbn.schema.name: io.开云体育官方注册网址debezium.postgresql.type.Isbn

最初的

指定连接器启动时执行快照的条件:

最初的—只有在逻辑服务器名没有记录偏移时,连接器才会执行快照。

总是—连接器每次启动时执行一次快照。

从来没有—连接器不执行快照。以这种方式配置连接器时,其启动时的行为如下所示。如果Kafka偏移主题中有先前存储的LSN,连接器将从该位置继续流化更改。如果没有存储LSN,连接器将从在服务器上创建PostgreSQL逻辑复制插槽的时间点开始流化更改。的从来没有只有当您知道所有感兴趣的数据仍然反映在WAL中时,快照模式才有用。

initial_only—连接器执行初始快照,然后停止,不处理任何后续更改。

出口——弃用

自定义—连接器将根据snapshot.custom.class属性的自定义实现io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter接口。

有关更多信息,请参见snapshot.mode选项

没有默认的

类的实现的完整Java类名io.开云体育官方注册网址debezium.connector.postgresql.spi.Snapshotter接口。当snapshot.mode属性设置为自定义.看到自定义快照SPI

中指定的所有表table.include.list

匹配完全限定名称的可选、逗号分隔的正则表达式列表(< schemaName >。<表>)中包含在快照中的表。指定的项必须在连接器的table.include.list财产。此属性仅在连接器为snapshot.mode属性的值设置为从来没有
此属性不影响增量快照的行为。

为了匹配表的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与表的整个名称字符串匹配;它不匹配表名中可能存在的子字符串。

10000

正整数值,指定在执行快照时等待获得表锁的最大时间(以毫秒为单位)。如果连接器在此时间间隔内无法获取表锁,则快照失败。连接器如何执行快照提供细节。

没有默认的

指定要包含在快照中的表行。如果希望快照仅包含表中行的子集,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。

属性包含表单中以逗号分隔的全限定表名列表< schemaName >。<表>.例如,

“snapshot.select.statement。覆盖”:“inventory.products customers.orders”

对于列表中的每个表,再添加一个配置属性,用于指定选择语句,以便连接器在获取快照时在表上运行。指定的选择语句确定要包含在快照中的表行子集。使用以下格式指定此文件的名称选择声明属性:

snapshot.select.statement.overrides。< schemaName ><表>.例如,snapshot.select.statement.overrides.customers.orders

例子:

从一个customers.orders包含软删除列的表,delete_flag,如果您希望快照只包含那些未被软删除的记录,则添加以下属性:

“snapshot.select.statement。覆盖”:“客户。订单", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器仅包括其记录Delete_flag = 0

失败

指定连接器在事件处理过程中应该如何对异常做出反应:

失败传播异常,指示有问题事件的偏移量,并导致连接器停止。

警告记录有问题事件的偏移量,跳过该事件并继续处理。

跳过跳过有问题的事件并继续处理。

2048

正整数值,指定连接器处理的每批事件的最大大小。

8192

正整数值,指定阻塞队列可以容纳的最大记录数。当Debe开云体育官方注册网址zium从数据库读取事件流时,它会在将事件写入Kafka之前开云体育电动老虎机将其放在阻塞队列中。如果连接器接收消息的速度比写入Kafka的速度快,或者Kafka变得不可用,阻塞队列可以为从数据库读取更改事件提供反压力。开云体育电动老虎机当连接器定期记录偏移量时,队列中保存的事件将被忽略。总是设置的值max.queue.size大于…的值max.batch.size

0

一个长整数值,以字节为单位指定阻塞队列的最大容量。默认情况下,不为阻塞队列指定卷限制。若要指定队列可以使用的字节数,请将此属性设置为正的长值。
如果max.queue.size时,当队列大小达到任一属性指定的限制时,将阻塞对队列的写入。例如,如果你设置max.queue.size = 1000,max.queue.size.in.bytes = 5000,当队列中有1000条记录时,或者队列中记录的容量达到5000字节时,写入队列被阻塞。

500

正整数值,指定连接器在开始处理一批事件之前等待新更改事件出现的毫秒数。缺省值为500毫秒。

指定连接器遇到数据类型未知的字段时的连接器行为。默认行为是连接器从更改事件中省略该字段并记录警告。

将此属性设置为真正的如果您希望更改事件包含字段的不透明二进制表示。这让消费者可以解码字段。属性可以控制准确的表示形式二进制处理模式财产。

消费者面临向后兼容性问题的风险include.unknown.datatypes设置为真正的.不仅特定于数据库的二进制表示在不同开云体育电动老虎机版本之间可能会发生变化,而且如果数据类型最终被Debezium支持,则数据类型将以逻辑类型的形式发送到下游,这将需要消费者进行调整。开云体育官方注册网址通常,当遇到不受支持的数据类型时,创建一个特性请求,以便添加支持。

没有默认的

一个以分号分隔的SQL语句列表,连接器在建立到数据库的JDBC连接时执行这些语句。开云体育电动老虎机要使用分号作为字符而不是分隔符,请指定两个连续的分号,;;

连接器可以自行决定建立JDBC连接。因此,此属性仅用于配置会话参数,而不适用于执行DML语句。

连接器在创建用于读取事务日志的连接时不执行这些语句。

10000

向服务器发送复制连接状态更新的频率,以毫秒为单位。
该属性还控制在数据库关闭时检查数据库状态以检测死连接的频率。开云体育电动老虎机

0

控制连接器向Kafka主题发送心跳消息的频率。默认行为是连接器不发送心跳消息。

心跳消息对于监视连接器是否正在接收来自数据库的更改事件非常有用。开云体育电动老虎机心跳消息可能有助于减少连接器重新启动时需要重新发送的更改事件的数量。要发送心跳消息,请将此属性设置为正整数,表示心跳消息之间的毫秒数。

当正在跟踪的数据库中有许多更新,但只有极少数更新与连接器正在为其捕获更改的表和模式相关时,就需要心跳消息。开云体育电动老虎机在这种情况下,连接器像往常一样从数据库事务日志中读取,但很少向Kafka发出更改记录。开云体育电动老虎机这意味着没有偏移量更新提交给Kafka,连接器没有机会将最新检索到的LSN发送到数据库。开云体育电动老虎机数据库保开云体育电动老虎机留WAL文件,其中包含连接器已经处理过的事件。发送心跳消息使连接器能够将最新检索到的LSN发送到数据库,从而允许数据库回收不再需要的WAL文件所使用的磁盘空间。开云体育电动老虎机

没有默认的

指定当连接器发送心跳消息时连接器在源数据库上执行的查询。开云体育电动老虎机

这对于解决中描述的情况非常有用WAL磁盘空间消耗,其中从低流量数据库和高流量数据库在同一主机上捕获更改可以防止Debezium处理WAL记开云体育电动老虎机录,从而确认数据库中的WAL位置。开云体育官方注册网址要解决这种情况,在低流量数据库中创建一个心跳表,并将此属性设置为一条将记录插入该表的语句,例如:开云体育电动老虎机

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

这允许连接器接收来自低流量数据库的更改并确认它们的LSNs,从而防止数据库主机上无限制的WAL增长。开云体育电动老虎机

columns_diff

指定触发表的内存模式刷新的条件。

columns_diff是最安全的模式。它确保内存模式始终与数据库表的模式保持同步。开云体育电动老虎机

columns_diff_exclude_unchanged_toast如果与从传入消息派生的模式存在差异,则指示连接器刷新内存中的模式缓存,除非未更改的TOASTable数据完全解释了这种差异。

如果存在频繁更新的表,其中的TOASTed数据很少是更新的一部分,则此设置可以显著提高连接器性能。但是,如果从表中删除TOASTable列,则内存模式可能会过时。

没有默认的

在连接器启动时执行快照之前,连接器应等待的毫秒间隔。如果在集群中启动多个连接器,此属性有助于避免快照中断,因为快照中断可能导致连接器的重新平衡。

10240

在快照期间,连接器以行为单位读取表内容。此属性指定批处理中的最大行数。

没有默认的

以分号分隔的参数列表,以传递给配置的逻辑解码插件。例如,添加表= public.table public.table2; include-lsn = true

真正的如果连接器配置设置key.convertervalue.converter属性到Avro转换器。

如果不是。

指示是否对字段名进行清除以遵守Avro命名要求

6

如果连接到复制槽位失败,这是连续尝试连接的最大次数。

10000(10秒)

当连接器连接到复制插槽失败时,重试尝试之间等待的毫秒数。

__开云体育官方注册网址debezium_unavailable_value

指定连接器提供的常量,以指示原始值是数据库不提供的烘烤值。开云体育电动老虎机如果设置为unavailable.value.placeholder十六进制:前缀期望字符串的其余部分表示十六进制编码的字节。看到烤的价值观更多细节。

确定连接器是否生成具有事务边界的事件,并使用事务元数据丰富更改事件信封。指定真正的如果您希望连接器执行此操作。看到事务的元数据获取详细信息。

真正的

确定连接器是否应该在源postgres数据库中提交已处理记录的LSN,以便删除WAL日志。开云体育电动老虎机指定如果您不希望连接器执行此操作。请注意,如果设置为LSN将不被Debezium承认,因此WAL日志将不会被清除,这开云体育官方注册网址可能会导致磁盘空间问题。用户需要在Debezium外部处理LSN的确认。开云体育官方注册网址

10000(10秒)

在发生可检索错误后重新启动连接器之前等待的毫秒数。

t

以逗号分隔的操作类型列表,将在流处理期间跳过。操作包括:c插入/创建、u的更新,d删除,t对于截断,和没有一个不跳过任何操作。默认情况下,将跳过截断操作。

无默认值

用于向连接器发送信号的数据集合的全限定名称。
使用以下格式指定集合名称:
< schemaName ><表>

1024

在增量快照块期间连接器获取并读入内存的最大行数。增加块大小可以提供更高的效率,因为快照运行更少的大大小快照查询。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。

0

从复制插槽读取XMIN的频率(以毫秒为单位)。XMIN值提供了一个新的复制插槽可以从哪里开始的下界。的默认值0禁用跟踪XMIN跟踪。

io.开云体育官方注册网址debezium.schema.SchemaTopicNamingStrategy

TopicNamingStrategy类的名称默认为,该类应用于确定数据更改、模式更改、事务、心跳事件等的主题名称SchemaTopicNamingStrategy

指定主题名称的分隔符,默认为

10000

用于在有界并发散列映射中保存主题名称的大小。此缓存将帮助确定与给定数据集合对应的主题名称。

__开云体育官方注册网址debezium-heartbeat

控制连接器向其发送心跳消息的主题的名称。主题名称有这样的模式:

topic.heartbeat.prefixtopic.prefix

例如,如果主题前缀为实现,默认主题名为__开云体育官方注册网址debezium-heartbeat.fulfillment

事务

控制连接器向其发送事务元数据消息的主题的名称。主题名称有这样的模式:

topic.prefixtopic.transaction

例如,如果主题前缀为实现,默认主题名为fulfillment.transaction

直通连接器配置属性

连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。

一定要咨询卡夫卡的文档Kafka生产者和消费者的所有配置属性。PostgreSQL连接器使用新的使用者配置属性

监控

Debe开云体育官方注册网址zium PostgreSQL连接器提供了两种类型的指标,除了Zookeeper、Kafka和Kafka Connect提供的内置JMX指标支持之外。

  • 快照指标在执行快照时提供有关连接器操作的信息。

  • 流指标当连接器捕获变更和流化变更事件记录时,提供有关连接器操作的信息。

开云体育官方注册网址Debezium监控文档提供了如何使用JMX公开这些指标的详细信息。

快照指标

MBean开云体育官方注册网址debezium.postgres: type = connector-metrics上下文=快照,server =< postgresql.server.name >

快照度量不会公开,除非快照操作是活动的,或者自上次连接器启动以来发生了快照。

下表列出了可用的快照指标。

属性 类型 描述

字符串

连接器读取的最后一个快照事件。

自连接器读取并处理最近事件以来的毫秒数。

自上次启动或重置以来,此连接器已看到的事件总数。

已由连接器上配置的包含/排除列表筛选规则筛选的事件数。

string []

连接器捕获的表列表。

int

用于在快照和主Kafka Connect循环之间传递事件的队列长度。

int

用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。

int

快照中包含的表的总数。

int

快照尚未复制的表数。

布尔

快照是否启动。

布尔

快照是否暂停。

布尔

快照是否中止。

布尔

快照是否完成。

快照到目前为止所花费的总秒数,即使没有完成。还包括快照暂停的时间。

快照暂停的总秒数。如果快照被暂停了多次,那么暂停的时间就会累积起来。

Map < String,长>

映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。

队列的最大缓冲区,以字节为单位。该指标是可用的,如果max.queue.size.in.bytes设置为正的长值。

队列中记录的当前卷,以字节为单位。

在执行增量快照时,连接器还提供了以下额外的快照度量:

属性 类型 描述

字符串

当前快照块的标识符。

字符串

定义当前块的主键集的下界。

字符串

定义当前块的主键集的上界。

字符串

当前快照表的主键集的下界。

字符串

当前快照表的主键集的上限。

流指标

MBean开云体育官方注册网址debezium.postgres: type = connector-metrics、上下文=流媒体服务器=< postgresql.server.name >

下表列出了可用的流媒体指标。

属性 类型 描述

字符串

连接器读取的最后一个流事件。

自连接器读取并处理最近事件以来的毫秒数。

自上次启动或度量重置以来,此连接器所看到的事件总数。

自上次启动或指标重置以来,此连接器所看到的创建事件的总数。

自上次启动或指标重置以来,此连接器所看到的更新事件总数。

自上次启动或指标重置以来,此连接器所看到的删除事件的总数。

已由连接器上配置的包含/排除列表筛选规则筛选的事件数。

string []

连接器捕获的表列表。

int

用于在streamer和主Kafka Connect循环之间传递事件的队列长度。

int

用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。

布尔

标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机

从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机

提交的已处理事务的数量。

Map < String, String >

上次接收事件的坐标。

字符串

最后处理的事务的事务标识符。

队列的最大缓冲区,以字节为单位。该指标是可用的,如果max.queue.size.in.bytes设置为正的长值。

队列中记录的当前卷,以字节为单位。

出现问题时的行为

开云体育官方注册网址Debezium是一个分布式系统,可以捕获多个上游数据库中的所有更改;开云体育电动老虎机它从不错过或丢失任何事件。当系统正常运行或被仔细管理时,Debezium提供开云体育官方注册网址只有一天交付每个变更事件记录。

如果确实发生了故障,则系统不会丢失任何事件。然而,当它从错误中恢复时,它可能会重复一些更改事件。在这些不正常的情况下,Debezium就像Kafka一样提供开云体育官方注册网址了帮助至少一次变更事件的交付。

本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址

配置和启动错误

在以下情况下,连接器在尝试启动时失败,在日志中报告错误/异常,并停止运行:

  • 连接器的配置无效。

  • 连接器无法通过指定的连接参数成功连接到PostgreSQL。

  • 连接器正在从PostgreSQL WAL中先前记录的位置重新启动(通过使用LSN),并且PostgreSQL不再有可用的历史。

在这些情况下,错误消息有关于问题的详细信息,可能还有建议的解决方案。在纠正配置或解决PostgreSQL问题后,重新启动连接器。

PostgreSQL不可用

当连接器正在运行时,它所连接的PostgreSQL服务器可能由于各种原因变得不可用。如果发生这种情况,连接器将失败并报错并停止。当服务器再次可用时,重新启动连接器。

PostgreSQL连接器在外部以PostgreSQL LSN的形式存储最后处理的偏移量。在连接器重新启动并连接到服务器实例之后,连接器与服务器通信以继续从该特定偏移量进行流处理。只要Debezium复制插槽保持不变,这个偏移量就可用。开云体育官方注册网址永远不要删除主服务器上的复制插槽,否则将丢失数据。有关插槽被移除的故障案例,请参见下一节。

集群的失败

从发行版12开始,PostgreSQL允许逻辑复制插槽仅在主服务器上.这意味着您可以将Debezium PostgreSQL连接器仅指开云体育官方注册网址向数据库集群的活动主服务器。开云体育电动老虎机而且,复制槽本身不会传播到副本。如果主服务器宕机,则必须提升一个新的主服务器。

一些托管PostgresSQL服务(例如AWS RDS和GCP CloudSQL)通过磁盘复制实现对备用服务器的复制。这意味着复制插槽将被复制,并且在故障转移后将保持可用。

新的主要必须有逻辑解码插件已安装和一个复制槽,该复制槽已配置为供插件和您希望捕获更改的数据库使用。开云体育电动老虎机只有这样,您才能将连接器指向新服务器并重新启动连接器。

当发生故障转移时,有一些重要的注意事项,您应该暂停Debezium,直到您可以验证您有一个完整的复制插槽,没有丢失数据。开云体育官方注册网址故障转移后:

  • 在允许应用程序写入数据之前,必须有一个进程重新创建Debezium复制插槽开云体育官方注册网址主要的这一点至关重要。如果没有这个过程,您的应用程序可能会错过更改事件。

  • 您可能需要验证Debezium是否能够读取插槽中的所有更改开云体育官方注册网址在旧的初选失败之前

恢复和验证是否丢失任何更改的一种可靠方法是将失败的主服务器的备份恢复到它发生故障之前的位置。虽然这在管理上很困难,但它允许您检查复制插槽是否有任何未使用的更改。

在PostgreSQL社区中有关于一个叫做故障转移槽这将有助于缓解这个问题,但截至PostgreSQL 12,它们还没有实现。然而,PostgreSQL 13正在积极开发支持备用逻辑解码,这是实现故障转移的主要需求。你可以在这个链接中找到更多相关信息:0 w50sragcs + jrktBXuJAWGZQdSTMa57CCY + Dh-xbg@mail.gmail.com”“>社区线程.

关于故障转移插槽概念的更多信息已经出现这篇博文

Kafka Connect进程优雅地停止

假设Kafka Connect正在以分布式模式运行,并且Kafka Connect进程优雅地停止。在关闭该进程之前,Kafka Connect会将该进程的连接器任务迁移到该组中的另一个Kafka Connect进程。新的连接器任务开始处理之前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。

Kafka连接进程崩溃

如果Kafka连接器进程意外停止,任何正在运行的连接器任务都将终止,而不记录最近处理的偏移量。当Kafka Connect以分布式模式运行时,Kafka Connect会重新启动其他进程上的连接器任务。然而,PostgreSQL连接器从上次偏移量开始恢复记录通过早期的过程。这意味着新的替换任务可能会生成一些在崩溃之前处理的相同的更改事件。重复事件的数量取决于偏移刷新周期和崩溃前更改的数据量。

由于在故障恢复过程中可能会出现重复事件,因此使用者应该始终预测到一些重复事件。开云体育官方注册网址Debezium的变化是幂等的,所以一系列事件的结果总是相同的状态。

在每个更改事件记录中,Debezium连接器插入有关事件起开云体育官方注册网址源的源特定信息,包括PostgreSQL服务器的事件时间、服务器事务的ID以及写入事务更改的预写日志中的位置。使用者可以跟踪此信息,特别是LSN,以确定某个事件是否重复。

Kafka不可用

当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。定期地,在Kafka Connect配置中指定的频率,Kafka Connect会记录在这些更改事件中出现的最新偏移量。如果Kafka代理变得不可用,运行连接器的Kafka Connect进程会反复尝试重新连接到Kafka代理。换句话说,连接器任务将暂停,直到重新建立连接,此时连接器将从停止的位置恢复。

连接器停止一段时间

如果连接器正常停止,则可以继续使用数据库。开云体育电动老虎机任何更改都记录在PostgreSQL WAL中。当连接器重新启动时,它将从中断的地方恢复流更改。也就是说,它为连接器停止时所做的所有数据库更改生成更改事件记录。开云体育电动老虎机

一个正确配置的Kafka集群能够处理大量的吞吐量。Kafka Connect是根据Kafka最佳实践编写的,如果有足够的资源,Kafka Connect连接器也可以处理大量的数据库更改事件。开云体育电动老虎机因此,在停止一段时间后,当Debezium连接器重新启动时,它很可能会赶上在停止时所做的数据库更改。开云体育官方注册网址开云体育电动老虎机这个过程有多快取决于Kafka的能力和性能,以及对PostgreSQL中的数据所做的修改量。