开云体育官方注册网址Debezium连接器云扳手

该连接器目前处于孵化状态,即准确的语义,配置选项等可能会根据我们收到的反馈在未来的修订中改变。如果您遇到任何问题,请告诉我们。

开云体育官方注册网址Debezium的Cloud Spanner连接器消费并输出Spanner变更流数据到Kafka主题中。

一个扳手改变流观察和流Spanner数据库的数据更改-插入,更新,删除-在接近实时开云体育电动老虎机。Spanner连接器抽象了查询Spanner变更流的细节。使用此连接器,您不必管理变更流分区生命周期,这在您直接使用扳手API

连接器目前不支持快照特性。Kafka连接器第一次连接到Spanner数据库时,它将从提供的时间戳中传输更改,如果没有提供时间戳,则传输当前时开云体育电动老虎机间戳。

概述

为了读取和处理数据库更改,连接器从更改流开云体育电动老虎机进行查询。

连接器为每个被捕获的行级插入、更新和删除操作生成一个更改事件,并为单独Kafka主题中的每个表发送更改事件记录。客户端应用程序读取与感兴趣的数据库表对应的Kafka主题,并可以对从这些主题接收到的每一个行级事件做出反应。开云体育电动老虎机

这个连接器容错能力强。当连接器读取更改并产生事件时,它会记录为每个事件处理的最后一次提交时间戳更改流分区.如果连接器由于任何原因(包括通信故障、网络问题或崩溃)而停止,重新启动时连接器将继续流记录它上次停止的地方。

连接器如何工作

为了优化配置和运行Debezium扳手连接器,了解连接器流如何更改开云体育官方注册网址事件、确定Kafka主题名称和使用元数据是很有帮助的。

流的变化

Debe开云体育官方注册网址zium扳手连接器将所有时间用于从它所订阅的变更流传输变更。当表上发生更改时,Spanner在数据库中写入相应的更改流记录,与数据更改同步。开云体育电动老虎机为了扩展更改流的写入和读取,Spanner将内部更改流存储与数据库数据分开并合并。开云体育电动老虎机为了支持在数据库写入规模时近乎实时地读取变更流记录,Spanner API设计用于使用变更流分区并发查询变更流。开云体育电动老虎机查看扳手改变流分区模型

连接器在Spanner API上提供了查询变更流的抽象。使用这个连接器,您不必管理变更流分区生命周期。连接器为您提供了一个数据更改记录流,这样您就可以更自由地关注应用程序逻辑,而不是特定的API细节和动态更改流分区。

当订阅变更流时,连接器需要提供项目ID、Spanner实例ID、Spanner数据库ID以及变更流名称。开云体育电动老虎机用户还可以选择提供开始时间戳和结束时间戳。看到本节有关连接器配置属性的更详细列表。

当连接器接收到更改时,它将事件转换为Debezium开云体育官方注册网址创建更新,或删除事件。连接器将这些更改事件以记录的形式转发给运行在同一进程中的Kafka Connect框架。Kafka Connect进程异步地将更改事件记录按照它们生成的相同顺序写入相应的Kafka主题。

Kafka Connect会定期记录最新的数据抵消在另一个卡夫卡的话题。偏移量表示Debezium包含在每个事件中的特定于源的位置信息。开云体育官方注册网址对于扳手连接器,更改流分区最后处理的提交时间戳是偏移量。

当Kafka Connect优雅地关闭时,它会停止连接器,将所有事件记录刷新到Kafka,并记录从每个连接器接收到的最后偏移量。当Kafka Connect重新启动时,它会读取每个连接器的最后记录偏移量,并在其最后记录偏移量处启动每个连接器。

在流处理期间,扳手连接器还在以下元数据主题中记录元数据信息。不建议修改以下主题的内容或配置:

  • _consumer_offsets: Kafka自动创建的主题。存储Kafka连接器中创建的消费者偏移量

  • _kafka-connect-offsets: Kafka Connect自动创建的主题。存储连接器偏移量。

  • _sync_topic_spanner_connector_connectorname:由连接器自动创建的主题。存储关于变更流分区的元数据。

  • _rebalancing_topic_spanner_connector_connectorname:由连接器自动创建的主题。用于确定连接器任务的活动状态。

  • _开云体育官方注册网址debezium-heartbeat.connectorname:用于处理扳手更改流心跳的主题。

主题名称

扳手连接器将单个表上的所有插入、更新和删除操作的事件写入单个Kafka主题。默认情况下,Kafka主题名为topicPrefixconnectorName的表地点:

  • topicPrefix主题前缀是否与topic.prefix连接器配置属性。

  • connectorName用户指定的连接器名称。

  • 的表发生操作的数据库表的名称。开云体育电动老虎机

例如,假设扳手是连接器配置中的逻辑名称,该连接器正在从Spanner更改流中捕获更改,该更改流跟踪包含四个表的数据库中的更改:开云体育电动老虎机表1表二table3,table4.连接器将流记录到以下四个Kafka主题:

  • spanner.table1

  • spanner.table2

  • spanner.table3

  • spanner.table4

数据变更事件

Debe开云体育官方注册网址zium扳手连接器为每个行级生成一个数据更改事件插入更新,删除操作。每个事件包含一个键和一个值。键和值是分开的文档。键和值的结构取决于所更改的表。

开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。键的模式永远不会改变。请注意,该值的模式是变更流自连接器开始时间以来跟踪的表中所有列的合并。

下面的JSON骨架文档展示了键和值文档的基本结构。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器决定了键和值文档的表示。一个模式字段仅在配置转换器以产生更改事件键或更改事件值时才在更改事件键或更改事件值中。同样,事件键和事件有效负载只有在配置转换器以产生它们时才会出现。如果你使用JSON转换器,并将其配置为生成模式,则更改事件具有以下结构:

// Key {"schema": {(1)...}, "有效载荷":{(2)...}} //值{"schema": {(3)...}, "有效载荷":{(4)...}}
表1。变更事件基本内容概述
字段名 描述

1

模式

第一个模式字段是事件键的一部分。它指定了一个Kafka Connect模式,用来描述事件键的内容有效载荷部分。换句话说,是第一个模式字段描述主键的结构。

2

有效载荷

第一个有效载荷字段是事件键的一部分。它具有前面所描述的结构模式字段,它包含已更改行的键。

3.

模式

第二个模式字段是事件值的一部分。它指定Kafka Connect模式,描述事件值中的内容有效载荷部分。换句话说,是第二种模式描述已更改行的结构。通常,这个模式包含嵌套的模式。

4

有效载荷

第二个有效载荷字段是事件值的一部分。它具有前面所描述的结构模式字段,它包含已更改行的实际数据。

默认行为是连接器流将事件记录更改为主题,其名称与事件的原始表相同

从Kafka 0.10开始,Kafka可以选择记录事件键和值时间戳消息被创建(由生产者记录)或被Kafka写入日志的时间。

更改事件键

对于给定的表,更改事件的键具有一个结构,该结构在创建事件时包含表主键中每一列的字段。所有键列将被标记为非可选。

考虑一个用户表中定义的业务开云体育电动老虎机数据库和该表的更改事件键的示例。

例表
创建表用户(id INT64不空,用户名字符串(最大)不空,密码字符串(最大)不空,电子邮件字符串(最大)不空)主键(id);
更改事件键的示例

的每一个变化事件用户表,虽然它有这个定义有相同的键结构,在JSON中看起来像这样:

{"schema": {(1)"type": "struct", "name": "Users. "关键”,(2)“可选”:假的,(3)“字段”:[(4){“类型”:“int64”、“可选”:“false”,“场”:“假的 " } ] }, " 有效载荷":{(5)id: "1"},}
表2。更改事件键的描述
字段名 描述

1

模式

密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容有效载荷部分。

2

用户。关键

定义键的有效负载结构的模式的名称。此模式描述已更改表的主键的结构。

3.

可选

指示事件键是否必须在其中包含值有效载荷字段。主键列总是必需的。

4

字段

属性中期望的每个字段有效载荷,包括每个字段的名称、类型以及是否是可选的。

5

有效载荷

包含为其生成此更改事件的行的键。在本例中,键包含一个单键id字段,其值为1

更改事件值

考虑用于显示更改事件键示例的同一个示例表:

创建表用户(id INT64不空,用户名字符串(最大)不空,密码字符串(最大)不空,电子邮件字符串(最大)不空)主键(id);

创建事件

对象中创建数据的操作所生成的更改事件的值部分,示例如下用户表格如果Spanner列被标记为非可选,则它的值对于插入一行的所有突变都是必需的。扳手中的所有主键列将被标记为非可选。注意,即使一个非键列在Spanner中被标记为非可选列,它也将在模式中显示为可选列。模式中只有主键列被标记为非可选。

{"schema": {(1)“类型”:“结构”、“字段”:[{“类型”:“结构”、“字段”:[{“类型”:“int32”、“可选”:假的,“场”:“id”},{“类型”:“弦”、“可选”:真的,“场”:“first_name”},{“类型”:“弦”、“可选”:真的,“场”:“last_name”},{“类型”:“弦”、“可选”:真的,“场”:“电子邮件”}],“可选”:真的,“名字”:“用户。值”,(2)“字段”:“之前”},{“类型”:“结构”、“字段”:[{“类型”:“int32”、“可选”:假的,“场”:“id”},{“类型”:“弦”、“可选”:假的,“场”:“first_name”},{“类型”:“弦”、“可选”:假的,“场”:“last_name”},{“类型”:“弦”、“可选”:假的,“场”:“电子邮件”}],“可选”:真的,“名字”:“用户。值”,"field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source",(3)"field": "source"}, {"type": "string", "optional": false, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "connector_name.Users.Envelope"(4)}, "有效载荷":{(5)“之前”:空,(6)"后":{(7)“id”:1、“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“源”:{(8)“版本”:“2.1.2。Final", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "开云体育电动老虎机database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "Users", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "" system_transaction": false, "value_capture_type":"OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1}, "op": "c",(9)“ts_ms”:1559033904863(10)}}
表3。的描述创建事件值字段
字段名 描述

1

模式

值的模式,它描述值的有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值模式都是相同的。

2

名字

模式节中,每名字Field指定值的有效负载中字段的模式。

3.

名字

io.开云体育官方注册网址debezium.connector.spanner.Source有效负载的模式是字段。此模式特定于扳手连接器。连接器将其用于生成的所有事件。

4

名字

connector_name.Users.Envelope有效负载的总体结构的模式在哪里connector_name连接器名称,和客户是桌子。

5

有效载荷

该值为实际数据。这是变更事件提供的信息。

6

之前

可选字段,指定事件发生之前行的状态。当人事处字段是c对于create,就像在本例中一样之前字段是因为这个更改事件是针对新内容的。

7

可选字段,指定事件发生后行的状态。在本例中,字段包含新行的值idfirst_namelast_name,电子邮件列。

8

描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:

  • 开云体育官方注册网址Debezium版本

  • 连接器类型和名称

  • 开云体育电动老虎机包含新行的数据库和表

  • 如果事件是快照的一部分

  • 事务中数据更改事件的记录序列号

  • 项目ID

  • 实例ID

  • 数据库I开云体育电动老虎机D

  • 变更流名称

  • 事务ID

  • 低水位,它表示提交时间戳小于低水位时间戳的所有记录已经被连接器流输出的时间戳

  • 在数据库中进行更改时的提交时间戳开云体育电动老虎机

  • 连接器处理更改时的时间戳

  • 初始事务中的记录数

  • 事务标记

  • 该事务是否为系统事务

  • 值捕获类型

  • 用于查询此更改事件的初始分区令牌

  • 从扳手接收到的原始数据更改事件中的mod号

  • 数据更改事件是否是分区中事务的最后一条记录

  • 事务中变更流分区的总数

9

人事处

返回string,描述导致连接器产生事件的操作类型。在这个例子中,c表示该操作创建了一行。有效值为:

  • c=创建

  • u=更新

  • d=删除

10

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

更新事件

样例中更新的更改事件的值用户表的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。中的更新中连接器生成的事件中的更改事件值的示例用户表:

{"schema":{…}, "payload": {"before": {(1)“id”:1、“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org”},“后”:{(2)"id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "source": {(3)“版本”:“2.1.2。Final", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "开云体育电动老虎机database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "Users", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "" system_transaction": false, "value_capture_type":"OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1}, "op": "u",(4)“ts_ms”:1465584025523(5)}}
表4。的描述更新事件值字段
字段名 描述

1

之前

一个可选字段,包含数据库提交前该行中所有列的所有值。开云体育电动老虎机

2

可选字段,指定事件发生后行的状态。在本例中,first_name价值就是现在安妮玛丽

3.

描述事件的源元数据的必填字段。的字段结构中的字段与创建事件,但有些值不同。源元数据包括:

  • 开云体育官方注册网址Debezium版本

  • 连接器类型和名称

  • 开云体育电动老虎机数据库(也就是键空间)和包含新行的表

  • 如果事件是快照的一部分

  • 事务中数据更改事件的记录序列号

  • 项目ID

  • 实例ID

  • 数据库I开云体育电动老虎机D

  • 变更流名称

  • 事务ID

  • 低水位,它表示提交时间戳小于低水位时间戳的所有记录已经被连接器流输出的时间戳

  • 在数据库中进行更改时的提交时间戳开云体育电动老虎机

  • 连接器处理更改时的时间戳

  • 初始事务中的记录数

  • 事务标记

  • 该事务是否为系统事务

  • 值捕获类型

  • 用于查询此更改事件的初始分区令牌

  • 从扳手接收到的原始数据更改事件中的mod号

  • 数据更改事件是否是分区中事务的最后一条记录

  • 事务中变更流分区的总数

4

人事处

返回string,描述操作类型。在一个更新事件值,则人事处字段值为u,表示该行因更新而更改。

5

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

删除事件

的值删除改变事件有相同之处模式部分为创建而且更新同一表的事件。的有效载荷的一部分删除事件。用户表是这样的:

{"schema":{…}, "payload": {"before": {(1)"id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org"}, "after": null,(2)“源”:{(3)“版本”:“2.1.2。Final", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "开云体育电动老虎机database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "Users", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "" system_transaction": false, "value_capture_type":"OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1}, "op": "d",(4)“ts_ms”:1465581902461(5)}}
表5所示。的描述删除事件值字段
字段名 描述

1

之前

可选字段,指定事件发生之前行的状态。在一个删除事件值,则之前字段包含在数据库提交时删除该行之前该行中的值。开云体育电动老虎机

2

可选字段,指定事件发生后行的状态。在一个删除事件值,则字段是,表示该行已不存在。

3.

描述事件的源元数据的必填字段。在一个删除事件值,则字段结构与for相同创建而且更新同一表的事件。许多字段值也相同。在一个删除事件值,则ts_ms而且lsn字段值以及其他值可能已经更改。但是,字段在删除事件值提供相同的元数据:

  • 开云体育官方注册网址Debezium版本

  • 连接器类型和名称

  • 开云体育电动老虎机数据库(也就是键空间)和包含新行的表

  • 如果事件是快照的一部分

  • 事务中数据更改事件的记录序列号

  • 项目ID

  • 实例ID

  • 数据库I开云体育电动老虎机D

  • 变更流名称

  • 事务ID

  • 低水位,它表示提交时间戳小于低水位时间戳的所有记录已经被连接器流输出的时间戳

  • 在数据库中进行更改时的提交时间戳开云体育电动老虎机

  • 连接器处理更改时的时间戳

  • 初始事务中的记录数

  • 事务标记

  • 该事务是否为系统事务

  • 值捕获类型

  • 用于查询此更改事件的初始分区令牌

  • 从扳手接收到的原始数据更改事件中的mod号

  • 数据更改事件是否是分区中事务的最后一条记录

  • 事务中变更流分区的总数

4

人事处

返回string,描述操作类型。的人事处字段值为d,表示该行已被删除。

5

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

一个删除更改事件记录为使用者提供了处理删除该行所需的信息。

扳手连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。注意,如果启用了低水位,则不应启用压缩。

墓碑上的事件

删除一行时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有相同键的消息,消息值必须为.为了实现这一点,连接器遵循删除特别活动墓碑上事件,具有相同的键,但价值。

数据类型映射

扳手连接器用事件表示对行的更改,事件的结构类似于行所在的表。该事件为每个列值包含一个字段。该值在事件中如何表示取决于列的Spanner数据类型。本节描述这些映射。

扳手类型

表6所示。扳手数据类型的映射
扳手数据类型 文字类型(模式类型)

布尔

布尔

INT64

INT64

数组

数组

字节

字节

字符串

字符串

浮动

数字

字符串

时间戳

字符串

数字

字符串

安装扳手

检查表
  • 确保提供一个项目ID、扳手实例ID、扳手数据库ID和更改流名称。开云体育电动老虎机看到文档关于如何创建变更流。

  • 确保使用适当的凭证创建和配置一个GCP服务帐户。需要在连接器配置中提供服务帐户密钥。查看更多信息关于服务账户。

请参阅如何配置Debezium扳手连接器的下面部分。开云体育官方注册网址

部署

动物园管理员卡夫卡,卡夫卡连接安装完成后,部署Debezium扳手连接器的其余任务是下载开云体育官方注册网址连接器的插件存档,将JAR文件解压缩到Kafka Connect环境中,并将JAR文件的目录添加到卡夫卡连接的plugin.path.然后需要重新启动Kafka Connect进程来获取新的JAR文件。

如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Zookeeper, Kafka和Kafka Connect,扳手连接器已经安装并准备运行。你也可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium

连接器配置示例

下面是连接到变更流的扳手连接器的配置示例changeStreamAll在数据库中开云体育电动老虎机开云体育电动老虎机在实例实例和项目项目

您可以选择为模式和表的一个子集生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。

{"name": "扳手-连接器",(1)"config": {"connector.class": "io.d开云体育官方注册网址ebezium.connector. spannerconnector ",(2)“gcp.spanner.change。流”:“changeStreamAll”,(3)“gcp.spanner.project。id”:“项目”,(4)“gcp.spanner.instance。id”:“实例”,(5)“gcp.spanner.开云体育电动老虎机database。id”:“数开云体育电动老虎机据库”,(6)“gcp.spanner.credentials。json: < key.json >,(7)”任务。马克斯:1(8)}}
1 在Kafka Connect服务中注册连接器时的名称。
2 扳手连接器类的名称。
3. 变更流名称。
4 GCP项目ID。
5 扳手实例ID。
6 扳手数据库ID。开云体育电动老虎机
7 GCP服务帐户密钥JSON。
8 最大任务数。

看到扳手连接器属性的完整列表可以在这些配置中指定。

添加连接器配置

要开始运行扳手连接器,创建一个连接器配置,并将该配置添加到Kafka Connect集群。

先决条件
  • 扳手变更流已创建并可用。

  • 扳手接头已安装完成。

过程
  1. 为扳手连接器创建配置。

  2. 使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。

结果

当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到Kafka主题。

监控

除了Zo开云体育官方注册网址okeeper、Kafka和Kafka Connect提供的内置JMX指标支持外,Debezium Spanner连接器只提供了一种类型的指标。

  • 流指标当连接器捕获变更和流化变更事件记录时,提供有关连接器操作的信息。

开云体育官方注册网址Debezium监控文档提供了如何使用JMX公开这些指标的详细信息。

流指标

MBean开云体育官方注册网址debezium。云扳手:type = connector-metrics、上下文=流媒体服务器=<云Spanner.server.name >

属性 类型 描述

自连接器读取并处理最近事件以来的毫秒数。

自上次启动或重置以来,此连接器已看到的事件总数。

已由连接器上配置的包含/排除列表筛选规则筛选的事件数。

int

用于在streamer和主Kafka Connect循环之间传递事件的队列长度。

int

用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。

布尔

标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机

从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机

提交的已处理事务的数量。

队列的最大缓冲区(以字节为单位),用于在streamer和主Kafka Connect循环之间传递事件。

队列的当前缓冲区(以字节为单位),用于在streamer和主Kafka Connect循环之间传递事件。

连接器任务的当前低水位。低水位描述了时间T,在这个时间T,连接器保证已经输出了时间戳< T的所有事件。

连接器任务的当前低水位,以毫秒为单位。低水位描述了时间T,在这个时间T,连接器保证已经输出了时间戳< T的所有事件。

低水位滞后于当前时间(以毫秒为单位)。低水位描述了时间T,在这个时间T,连接器保证已经输出了时间戳< T的所有事件。

低水位滞后于当前时间的延迟分布,单位为毫秒。该变体将包括P50, P95, P99,平均,最小,最大计算。

Spanner提交时间戳到连接器读延迟的分布。该变体将包括P50, P95, P99,平均,最小,最大计算。

扳手读取时间戳到连接器的分布会产生延迟。该变体将包括P50, P95, P99,平均,最小,最大计算。

Spanner向连接器提交时间戳的分布会触发延迟。该变体将包括P50, P95, P99,平均,最小,最大计算。

Spanner向Kafka分发提交时间戳的时间戳延迟。该变体将包括P50, P95, P99,平均,最小,最大计算。

分发的连接器发出时间戳,Kafka发布时间戳延迟。该变体将包括P50, P95, P99,平均,最小,最大计算。

扳手事件队列的总容量。该队列表示StreamEventQueue的总容量,StreamEventQueue是一个特定于扳手的队列,用于存储从变更流查询接收到的元素。

剩余的扳手事件队列容量。

任务状态更改事件队列的总容量。该队列表示TaskStateChangeEventQueue的总容量,TaskStateChangeEventQueue是一个特定于扳手的队列,用于存储连接器中发生的事件。

剩余的任务状态更改事件队列容量。

当前任务检测到的分区总数。

当前任务发出的变更流查询的总数。

当前任务检测到的更改流查询的活动数量。

连接器配置属性

Debe开云体育官方注册网址zium扳手连接器具有许多配置属性,您可以使用这些属性为应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:

以下配置属性为要求除非有默认值可用。

表7所示。所需的连接器配置属性
财产 默认的 描述

没有默认的

连接器的唯一名称。尝试使用相同的名称再次注册将失败。所有Kafka Connect连接器都需要这个属性。

没有默认的

连接器的Java类的名称。始终使用值io.开云体育官方注册网址debezium.connector.spanner.SpannerConnector用于扳手连接器。

1

应该为此连接器创建的最大任务数。如果启用offset.storage.per.task模式,扳手连接器可以使用多个任务。

没有默认的

GCP项目ID

没有默认的

扳手实例ID

没有默认的

扳手数据库ID开云体育电动老虎机

没有默认的

扳手改变流向

没有默认的

GCP服务帐户密钥JSON的文件路径。

没有默认的

GCP服务帐户密钥JSON。如果没有提供gcp.spanner.credentials.path,则必须。

以下先进的配置属性具有在大多数情况下都适用的默认值,因此很少需要在连接器的配置中指定。

表8所示。高级连接器配置属性
财产 默认的 描述

连接器是否启用低水位。

1000毫秒

低水位更新的时间间隔。

300000

扳手的心跳间隔。

当前时间

连接器启动时间。

无限期结束时间

连接器结束时间。

10000

扳手事件队列容量。如果在连接器运行时期间剩余的流事件队列容量接近零,则增加此容量。

1000

任务状态改变事件队列容量。如果在连接器运行时期间剩余的任务状态更改事件队列容量接近零,则增加此容量。

5

在抛出异常之前,变更流查询遗漏的最大心跳数

是否启用任务自动伸缩

sync_topic_spanner_connector< connectorname >

同步主题的名称。Sync主题是一个内部连接器主题,用于存储任务之间的通信。

500毫秒

同步主题的轮询持续时间。

5000毫秒

请求同步主题的超时时间。

15000毫秒

发布到同步主题的超时时间。

5000毫秒

为同步主题提交偏移量的超时。

60000毫秒

为同步主题提交偏移量的间隔。

5女士

消息发布到同步主题的时间间隔。

rebalancing_topic_spanner_connector< connectorname >

重新平衡主题的名称。重新平衡主题是一个内部连接器主题,用于确定任务活动状态。

5000

重新平衡主题的投票持续时间。

5000

为rebalance主题提交偏移量的超时。

60000毫秒

为同步主题提交偏移量的间隔。

1000毫秒

任务在处理重新平衡事件之前等待的持续时间。

有关高级配置的更完整列表,请参见Github代码

直通连接器配置属性

连接器还支持直通创建Kafka生产者和消费者时使用的配置属性。

一定要咨询卡夫卡的文档Kafka生产者和消费者的所有配置属性。扳手连接器使用新的使用者配置属性

出现问题时的行为

开云体育官方注册网址Debezium是一个分布式系统,可以捕获多个上游数据库中的所有更改;开云体育电动老虎机它从不错过或丢失任何事件。当系统正常运行或被仔细管理时,Debezium提供开云体育官方注册网址只有一天交付每个变更事件记录。

如果确实发生了故障,则系统不会丢失任何事件。然而,当它从错误中恢复时,它可能会重复一些更改事件。在这些不正常的情况下,Debezium就像Kafka一样提供开云体育官方注册网址了帮助至少一次变更事件的交付。

本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址

配置和启动错误

在以下情况下,连接器在尝试启动时失败,在日志中报告错误/异常,并停止运行:

  • 连接器的配置无效。

  • 连接器无法通过指定的连接参数成功连接到扳手。

在这些情况下,错误消息有关于问题的详细信息,可能还有建议的解决方案。在纠正配置或解决扳手问题后,重新启动连接器。

扳手不可用

当连接器正在运行时,Spanner可能由于各种原因变得不可用。连接器将继续运行,并在扳手再次可用时能够流化事件。

Kafka Connect进程优雅地停止

假设Kafka Connect正在以分布式模式运行,并且Kafka Connect进程优雅地停止。在关闭该进程之前,Kafka Connect会将该进程的连接器任务迁移到该组中的另一个Kafka Connect进程。新的连接器任务开始处理之前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。

Kafka连接进程崩溃

如果Kafka连接器进程意外停止,任何正在运行的连接器任务都将终止,而不记录最近处理的偏移量。当Kafka Connect以分布式模式运行时,Kafka Connect会重新启动其他进程上的连接器任务。然而,扳手连接器从上次偏移恢复记录通过早期的过程。这意味着新的替换任务可能会生成一些在崩溃之前处理的相同的更改事件。重复事件的数量取决于偏移刷新周期和崩溃前更改的数据量。

由于在故障恢复过程中可能会出现重复事件,因此使用者应该始终预测到一些重复事件。

在每个更改事件记录中,Debezium连接器插入关于事件起开云体育官方注册网址源的源特定信息,例如最初的分区令牌、提交时间戳、事务ID、记录序列和mod号。消费者可以使用这些标识符进行重复数据删除。

Kafka不可用

当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。定期地,在Kafka Connect配置中指定的频率,Kafka Connect会记录在这些更改事件中出现的最新偏移量。如果Kafka代理变得不可用,运行连接器的Kafka Connect进程会反复尝试重新连接到Kafka代理。换句话说,连接器任务将暂停,直到重新建立连接,此时连接器将从停止的位置恢复。

连接器停止一段时间

如果连接器正常停止,则可以继续使用数据库。开云体育电动老虎机当连接器重新启动时,它将从中断的地方恢复流更改。也就是说,它为连接器停止时所做的所有数据库更改生成更改事件记录。开云体育电动老虎机

一个正确配置的Kafka集群能够处理大量的吞吐量。Kafka Connect是根据Kafka最佳实践编写的,如果有足够的资源,Kafka Connect连接器也可以处理大量的数据库更改事件。开云体育电动老虎机因此,在停止一段时间后,当Debezium连接器重新启动时,它很可能会赶上在停止时所做的数据库更改。开云体育官方注册网址开云体育电动老虎机这种情况发生的速度取决于Kafka的能力和性能,以及Spanner中对数据的修改量。

注意,当前连接器只能按~ 1小时的时间流回。Kafka连接器在Kafka连接器的开始时间戳处读取信息模式以检索模式信息。属性之前的读取时间戳时,Spanner默认不能读取信息模式版本保护期,默认为1小时。如果您希望从一个小时之前启动连接器,则需要增加数据库的版本保留周期。开云体育电动老虎机

限制