开云体育官方注册网址卡桑德拉Debezium连接器

Cassanadra连接器可以监视Cassandra集群并记录所有行级更改。连接器必须本地部署在Cassandra集群中的每个节点上。连接器第一次连接到Cassandra节点时,它对所有关键空间中所有启用cdc的表执行快照。连接器还将读取写入Cassandra提交日志的更改,并生成相应的插入、更新和删除事件。每个表的所有事件都记录在一个单独的kafka主题中,应用程序和服务可以很容易地使用它们。

有关与此连接器兼容的Cassandra版本的信息,请参见开云体育官方注册网址Debezium发布概述

概述

Cassandra是一个开源的NoSQL数据库。开云体育电动老虎机与大多数数据库类似,Cassan开云体育电动老虎机dra的写路径从立即将更改记录到其提交日志开始。提交日志本地驻留在每个节点上,记录对该节点的每一次写操作。

自Cassandra 3.0以来,a变更数据捕获(CDC)特性介绍了。CDC特性可以通过设置table属性在表级别上启用美国疾病控制与预防中心= true,在此之后,任何包含CDC启用表的数据的提交日志将被移动到中指定的CDC目录cassandra.yaml在丢弃。

Cassandra连接器驻留在每个Cassandra节点上,并监视cdc_raw更改目录。它处理所有检测到的本地提交日志段,为提交日志中的每个行级插入、更新和删除操作生成一个更改事件,在单独的Kafka主题中发布每个表的所有更改事件,最后从Kafka主题中删除提交日志cdc_raw目录中。最后一步很重要,因为一旦CDC被启用,Cassandra本身就不能清除提交日志。如果cdc_free_space_in_mb写入启用cdc的表时将被拒绝。

这个连接器容错能力强。当连接器读取提交日志并产生事件时,它记录每个提交日志段的文件名和位置以及每个事件。如果连接器由于任何原因(包括通信故障、网络问题或崩溃)而停止,那么在重新启动时,它只是在上次停止的地方继续读取提交日志。这包括快照:如果在连接器停止时快照没有完成,那么在重新启动时它将开始一个新的快照。稍后我们将讨论连接器的行为当事情出错时

Cassandra与其他Debezium连接器不同,因为它不是在Kafka 开云体育官方注册网址Connect框架之上实现的。相反,它是一个单独的JVM进程,旨在驻留在每个Cassandra节点上,并通过Kafka生成器将事件发布到Kafka。

Cassandra连接器目前不支持以下特性。由这些特性引起的更改将被忽略:

  • 集合类型列上的TTL

  • 范围内删除

  • 静态列

  • 触发器

  • 物化视图

  • 二级指标

  • 轻量级事务

设置卡桑德拉

在使用Debezium开云体育官方注册网址 Cassandra连接器监视Cassandra集群中的更改之前,必须在节点级和表级启用CDC。

在节点上启用CDC

要启用CDC,请在中更新以下CDC配置cassandra.yaml

cdc_enabled:真

额外的CDC配置有以下默认值:

cdc_raw_directory: $CASSANDRA_HOME/data/cdc_raw: 4096 cdc_free_space_in_mb: 250
  • cdc_enabled在节点范围内启用或禁用CDC操作

  • cdc_raw_directory在所有对应的memtable被刷新后,确定要移动的提交日志段的目的地。

  • cdc_free_space_in_mb是分配用于存储提交日志段的最大容量,默认为4096mb的最小值和1/8的卷空间。

  • cdc_free_space_check_interval_ms我们重新计算占用空间的频率是否为cdc_raw_directory以防止不必要地消耗CPU周期时,容量。

在表中启用CDC

一旦在Cassandra节点上启用CDC,每个表也必须通过CREATE table或ALTER table命令显式地启用CDC。例如:

CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;ALTER TABLE foo WITH cdc=true;

Cassandra连接器如何工作

本节将详细介绍Cassandra连接器如何执行快照、将提交日志事件转换为Debezium更改事件、处理提交日志生命周期、将事件记录到Kafka、管理模式演变以及在出现错误时的行为。开云体育官方注册网址

快照

当Cassandra连接器第一次在Cassandra节点上启动时,默认情况下它将执行集群的初始快照。这是默认模式,因为大多数时候CDC是在非空表上启用的,并且提交日志不包含完整的历史记录。

快照读取器发出SELECT语句来查询表中的所有列。Cassandra允许在全局或语句级别上设置一致性级别。对于快照,一致性级别在语句级别设置为所有默认提供最高一致性。这意味着如果一个节点在快照期间宕机,快照将无法继续,并且一旦该节点恢复在线,就需要后续重新快照。您可以将快照的一致性级别调整到较低的一致性级别,以提高可用性,前提是您理解一致性之间的权衡。

在卡桑德拉3。X,不可能严格地从本地Cassandra节点读取。从Cassandra 4.0开始,aNODE_LOCAL将增加一致性级别。这将允许Cassandra连接器只从它所在的节点读取数据(这将与提交日志的处理方式一致)。

与关系数据库不同,快照期间没有应用读开云体育电动老虎机锁,因此对Cassandra的写入在快照期间不会阻塞。如果查询的数据在快照期间被另一个客户端修改了,这些更改可能会反映在快照结果集中。

如果连接器失败或在快照完成之前停止,连接器将在重新启动时开始一个新的快照。在默认快照模式下(最初的),一旦连接器完成它的初始快照,它将不再执行任何额外的快照。唯一的例外是在连接器重新启动期间:如果在一个表上启用了CDC,然后重新启动连接器,那么该表将被快照。

第二种快照模式(总是)允许连接器在必要时执行快照。它定期检查新启用cdc的表,并在检测到这些表时立即对它们进行快照。

第三种快照模式('never')确保连接器永远不会执行快照。当以这种方式配置一个新连接器时,它将只读取CDC目录中的提交日志。这不是默认行为,因为以这种模式(不带快照)启动新连接器需要提交日志包含所有启用cdc的表的整个历史,而通常情况并非如此。此模式的另一个用例是,如果有一个连接器已经在执行快照,则可以禁用其他连接器的快照,以避免重复工作。

读取提交日志

Cassandra连接器通常将绝大多数时间用于读取Cassandra节点上的本地提交日志。在Cassandra 4.0中,在每个分段fsync上,索引文件将被更新以反映最新的偏移量。这消除了Cassandra 3.X中CDC特性中的处理延迟。并且可以通过设置配置在Cassandra 4 Debezium连接器开云体育官方注册网址中启用:commit.log.real.time.processing.enabled真正的.索引文件轮询的频率由commit.log.marked.complete.poll.interval.ms

使用Cassandra的CommitLogReader和CommitLogReadHandler反序列化提交日志的二进制数据。每个反序列化对象称为a突变卡桑德拉。一个突变包含一个或多个更改事件。

当Cassandra连接器读取提交日志时,它将日志事件转换为Debezium开云体育官方注册网址创建更新,或删除事件,包括在提交日志中发现事件的位置。Cassandra连接器使用Kafka Connect转换器对这些更改事件进行编码,并将它们发布到适当的Kafka主题中。

提交日志的限制

Cassandra的提交日志带有一组限制,这对于正确解释CDC事件至关重要:

  • 提交日志只到达cdc_raw目录,在这种情况下,它将被刷新/丢弃。这意味着在记录事件和捕获事件之间存在延迟。

  • 单个Cassandra节点上的提交日志不反映对集群的所有写操作,它们只反映存储在该节点上的写操作。这就是为什么必须监视Cassandra集群中所有节点上的更改。但是,由于复制因素,这也意味着这些事件的下游消费者有必要处理重复数据删除。

  • 对单个Cassandra节点的写入在到达时被记录。但是,这些事件可能会按照它们发出的顺序出现。这些事件的下游消费者必须理解并实现类似于Cassandra的读取路径的逻辑,以获得正确的输出。

  • 提交日志中不记录表的模式更改,只记录数据更改。因此,在尽可能的基础上检测模式的更改。为了避免数据丢失,建议在模式更改期间暂停对表的写操作。

  • Cassandra不执行先读后写,因此提交日志不记录已更改行的每一列的值,它只记录已修改的列的值(分区键列除外,它们总是根据Cassandra DML命令的要求进行记录)。

  • 由于CQL的性质,插入dml可以导致行插入或更新;更新dml可以导致行插入、更新或删除;删除dml可以导致行更新或删除。由于查询没有记录在提交日志中,CDC事件类型在关系数据库意义上是根据对行的影响进行分类的。开云体育电动老虎机

待办事项:是否有一种方法来确定事件类型对应于实际的Cassandra DML语句?如果是的话,这比这些事件的语义更受欢迎吗?

管理提交日志生命周期

默认情况下,Cassandra连接器将删除已处理的提交日志。不建议在禁用删除提交日志的同时启动连接器,因为这可能会使磁盘存储空间膨胀,并阻止进一步向Cassandra集群写入数据。要以自定义方式管理提交日志(即上传到云提供商),可以实现CommitLogTransfer接口。

主题名称

Cassandra连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。Kafka主题的名称通常采用以下形式:

clusterNamekeyspaceName的表

在哪里clusterName连接器的逻辑名称是否与topic.prefix配置属性,keyspaceName发生操作的键空间的名称和的表发生操作的表的名称。

例如,考虑一个Cassandra安装库存包含四个表的Keyspace:产品products_on_hand客户,订单.如果给监视此数据库的连接器一个逻辑服务器名开云体育电动老虎机实现,那么连接器将在这四个Kafka主题上产生事件:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

待办事项:对于主题名称,为clusterNamekeyspaceName的表好吧?或者它应该是connectorNamekeyspaceName的表connectorNameclusterNamekeyspaceName的表?

模式演化

ddl没有记录在提交日志中。当一个表的模式发生变化时,这个变化会从一个Cassandra节点发出,并通过八卦协议传播到其他节点。

Cassandra中的模式更改将被一个实现的SchemaChangeListener以小于1秒的延迟检测到,然后它将更新从Cassandra加载的模式实例以及为每个表缓存的Kafka键值模式。

请注意,使用当前的模式演进方法,Cassandra连接器在以下情况下将无法在一小段时间内提供准确的数据更改信息:

  1. 如果一个表的CDC被禁用,在CDC被禁用之前发生的数据更改将被跳过。

  2. 如果从表中删除列,则在删除该列之前涉及该列的数据更改不能正确反序列化,将被跳过。

事件

Cassandra连接器产生的所有数据更改事件都有一个键和一个值,不过键和值的结构取决于产生更改事件的表(参见主题名称).

更改事件的键

对于给定的表,更改事件的键将具有一个结构,该结构在创建事件时包含表主键中每一列的字段。考虑一个库存开云体育电动老虎机数据库的客户表定义为:

CREATE TABLE customers (id bigint, registration_date timestamp, first_name text, last_name text, email text, PRIMARY KEY (id, registration_date));

的每一个变化事件客户表,当它有这个定义时,将具有相同的键模式,在JSON表示中是这样的:

{"type": "record", "name": "cassandra-cluster-1.inventory.customers. "Key", "namespace": "io.开云体育官方注册网址 debezum .connector.cassandra", "fields": [{"name": "id", "type": "long"}, {"name": "registration_date", "type": "long", "logicalType": "timestamp-millis"}]}

对于id = 1001和registration_date = 1562202942545, JSON表示中的键有效负载如下所示:

{"id": 1001, "registration_date": 1562202942545}

虽然field.exclude.list属性允许您从事件值中删除列,主键中的所有列始终包含在事件的键中。

更改事件的值

更改事件消息的值稍微复杂一些。Cassandra连接器产生的每个更改事件值都有一个包含以下字段的信封结构:

人事处

一个包含描述操作类型的字符串值的必填字段。Cassandra连接器的值为插入,u更新,和d删除。

一个可选字段,如果存在则包含行状态事件发生了。该结构将由卡桑德拉inventory.customers.value——集群- 1.Kafka Connect模式,它代表了事件所引用的集群、密钥空间和表。

一个必须的字段,包含描述事件源元数据的结构,在Cassandra的情况下包含以下字段:

  • 开云体育官方注册网址Debezium版本。

  • 连接器的名称。

  • Cassandra集群名称。

  • 记录事件的提交日志文件的名称、事件在该提交日志文件中出现的位置、该事件是否是快照的一部分、受影响的键空间和表的名称,以及分区更新的最大时间戳(以微秒为单位)。

ts_ms

(可选)如果存在,则包含连接器根据运行Cassandra连接器的JVM的系统时钟处理事件的时间。

因为Cassandra不执行写前读,所以Cassandra提交日志不会在应用更改之前记录行值。因此,卡桑德拉更改事件记录中不包括一个之前字段。

的值模式的JSON表示形式创建我们的活动客户表:

{"type": "record", "name": "cassandra-cluster-1.inventory.customers. "信封”、“名称”:“io.debezium.connec开云体育官方注册网址tor.cassandra”、“字段”:[{" name ":“人事处”、“类型”:“字符串”},{“名称”:“ts_ms”,“类型”:“长”、“logicalType”:“timestamp-millis”},{“名称”:“后”,“类型”:“记录”、“字段”:[{" name ": " id ",“类型”:[“零”,{“名称”:“id”、“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“字符串”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] }, { " 名称”:“registration_date”、“类型”(“空”,{“名称”:“registration_date”,“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“长”、“logical_type”:“timestamp-millis”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] }, { " 名称”:“first_name”、“类型”:[“零”,{“名称”:“first_name”,“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“字符串”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] }, { " 名称”:“last_name”、“类型”(“空”,:{“名称”:“last_name”,“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“字符串”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] }, { " 名称”:“last_name”、“类型”(“空”,:{“名称”:“电子邮件”、“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“字符串”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] } ] }, { " 名称”:“源”、“类型”:“记录”、“字段”:[{“名称”:“版本”、“类型”:“字符串”},{“名称”:“连接器”、“类型”:“字符串”},{“名称”:“集群”、“类型”:“字符串”},{“名称”:“快照”,“类型”:“布尔”},{“名称”:“用于”、“类型”:“字符串”},{“名称”:“表”、“类型”:“字符串”},{“名称”:“文件”、“类型”:“字符串”},{“名称”:“位置”,“类型”:“int”},{“名称”:“ts_ms”、“类型”:“长”、“logicalType”:"timestamp-micros"}]}]}

待办事项:验证最大时间戳!=删除时间戳,如果删除ddl

给定以下条件插入DML:

INSERT INTO customers (id, registration_date, first_name, last_name, email) VALUES (1001, now(), "Anne", "Kretchmar", "annek@noanswer.org");

JSON表示中的值负载看起来像这样:

{"op": "c", "ts_ms": 1562202942832, "after": {"id": {"value": 1001, "deletion_ts": null, "set": true}, "registration_date": {"value": 1562202942545, "deletion_ts": null, "set": true}, "first_name": {"value": "Anne", "deletion_ts": null, "set": true}, "last_name": {"value": "Kretchmar", "deletion_ts": null, "set": true}, "email": {"value": "annek@noanswer.org", "deletion_ts": null, "set": true}, "source": {"version": "2.1.2. "Final", "connector": "cassandra", "cluster": "cassandra-cluster-1", "snapshot": false, "keyspace": "inventory", "table": "customers", "file": "commitlog-6-123456.log", "pos": 54, "ts_ms": 1562202942666382}}

给定以下条件更新DML:

更新客户SET email = "annek_new@noanswer.org" WHERE id = 1001 AND registration_date = 1562202942545

JSON表示中的值负载看起来像这样:

{"op": "u", "ts_ms": 1562202942912, "after": {"id": {"value": 1001, "deletion_ts": null, "set": true}, "registration_date": {"value": 1562202942545, "deletion_ts": null, "set": true}, "first_name": null, "last_name": null, "email": {"value": "annek_new@noanswer.org", "deletion_ts": null, "set": true}}, "source": {"version": "2.1.2. "Final", "connector": "cassandra", "cluster": "cassandra-cluster-1", "snapshot": false, "keyspace": "inventory", "table": "customers", "file": "commitlog-6-123456.log", "pos": 102, "ts_ms": 1562202942666490}}

当我们把这个和插入事件,我们看到了一些不同之处:

  • 人事处字段值现在是u,表示该行因更新而更改。

  • 字段现在具有该行的更新状态,在这里我们可以看到电子邮件值现在是annek_new@noanswer.org.请注意,first_name而且last_name为空,这是因为这些字段在更新期间没有更改。然而,id而且registration_date仍然包含,因为这些是该表的主键。

  • 字段结构具有与以前相同的字段,但值不同,因为此事件来自提交日志中的不同位置。

  • ts_ms显示连接器处理此事件的时间戳毫秒。

最后,给出如下条件删除DML:

DELETE FROM customers WHERE id = 1001 AND registration_date = 1562202942545

JSON表示中的值负载看起来像这样:

{"op": "d", "ts_ms": 1562202942912, "after": {"id": {"value": 1001, "deletion_ts": 1562202972545, "set": true}, "registration_date": {"value": 1562202942545, "deletion_ts": 1562202972545, "set": true}, "first_name": null, "last_name": null, "email": null}, "source": {"version": "2.1.2. "Final", "connector": "cassandra", "cluster": "cassandra-cluster-1", "snapshot": false, "keyspace": "inventory", "table": "customers", "file": "commitlog-6-123456.log", "pos": 102, "ts_ms": 1562202942666490}}

当我们把这个和插入而且更新事件,我们看到了一些不同之处:

  • 人事处字段值现在是d,表示该行因删除而更改。

  • 字段仅包含值id而且registration_date因为这是按主键删除。

  • 字段结构具有与以前相同的字段,但值不同,因为此事件来自提交日志中的不同位置。

  • ts_ms显示连接器处理此事件的时间戳毫秒。

待办事项:给定TTL目前不支持,是否删除delete_ts会更好?是否也可以通过查看每一列来派生一个字段是否为空?

待办事项:讨论Cassandra连接器中的墓碑事件

数据类型

如上所述,Cassandra连接器用事件表示行更改,事件的结构类似于行所在的表。该事件包含每个列值的字段,该值在事件中的表示方式取决于该列的Cassandra数据类型。本节描述此映射。

下表描述了连接器如何将每个Cassandra数据类型映射到Kafka Connect数据类型。

Cassandra数据类型

文字类型(模式类型)

语义类型(模式名)

美国信息交换标准代码

字符串

N/A

长整型数字

int64

N/A

字节

N/A

布尔

布尔

N/A

计数器

int64

N/A

日期

int32

io.开云体育官方注册网址debezium.time.Date

小数

float64

N/A

float64

N/A

浮动

float32

N/A

字节

N/A

inet

字符串

N/A

int

int32

N/A

列表

数组

N/A

地图

地图

N/A

数组

N/A

短整型

int16

N/A

文本

字符串

N/A

时间

int64

N/A

时间戳

int64

io.开云体育官方注册网址debezium.time.Timestamp

timeuuid

字符串

io.开云体育官方注册网址debezium.data.Uuid

非常小的整数

int8

N/A

元组

地图

N/A

uuid

字符串

io.开云体育官方注册网址debezium.data.Uuid

varchar

字符串

N/A

varint

int64

N/A

持续时间

int64

io.开云体育官方注册网址debezium.time.NanoDuration(以纳秒为单位的持续时间值的近似表示)

待办事项:添加逻辑类型

任意精度整数类型

Cassandra连接器句柄varint属性的设置varint.handling.mode连接器配置属性

varint.handling.mode =长
表1。映射时varint.handling.mode =长
卡桑德拉式 文字类型 语义类型

varint

INT64

N/A

varint.handling.mode =精确
表2。映射时decimal.handling.mode =精确
卡桑德拉式 文字类型 语义类型

varint

字节

org.apache.kafka.connect.data.Decimal
规模模式参数设置为0。

varint.handling.mode =字符串
表3。映射时varint.handling.mode =字符串
卡桑德拉式 文字类型 语义类型

varint

字符串

N/A

十进制类型

Cassandra连接器句柄小数属性的设置decimal.handling.mode连接器配置属性

decimal.handling.mode =双
表4。映射时decimal.handling.mode =双
卡桑德拉式 文字类型 语义类型

小数

FLOAT64

N/A

decimal.handling.mode =精确
表5所示。映射时decimal.handling.mode =精确
卡桑德拉式 文字类型 语义类型

小数

结构体

io.开云体育官方注册网址debezium.data.VariableScaleDecimal
包含两个字段的结构:规模类型的INT32其中包含传输值的刻度和价值类型的字节以未缩放的形式包含原始值。

decimal.handling.mode =字符串
表6所示。映射时decimal.handling.mode =字符串
卡桑德拉式 文字类型 语义类型

小数

字符串

N/A

如果默认的数据类型转换不能满足您的需求,您可以这样做创建自定义转换器对于连接器。

当事情出错时

配置和启动错误

Cassandra连接器将在启动时失败,在日志中报告错误或异常,如果配置无效或如果连接器无法使用指定的连接参数成功连接到Cassandra,则停止运行。在这种情况下,错误将提供有关问题的更多细节,并可能建议解决方法。当配置被纠正后,连接器可以重新启动。

Cassandra不可用

一旦连接器开始运行,如果Cassandra节点由于任何原因变得不可用,连接器将失败并停止。在这种情况下,一旦服务器可用,就重新启动连接器。如果这发生在快照期间,它将从表的开头重新引导整个表。

卡桑德拉连接器优雅地停止

如果Cassandra连接器被优雅地关闭,在停止进程之前,它将确保将ChangeEventQueue中的所有事件刷新到Kafka。Cassandra连接器会在每次流记录被发送到Kafka时跟踪文件名和偏移量。因此,当重新启动连接器时,它将从停止的地方重新开始。它通过搜索目录中最古老的提交日志,开始处理该提交日志,跳过已经读取的记录,直到找到尚未处理的最新记录。如果Cassandra连接器在快照期间停止,它将从该表中拾取数据,但将重新引导整个表。

Cassandra连接器崩溃

如果Cassandra连接器意外崩溃,那么Cassandra连接器可能已经终止,而没有记录最近处理的偏移量。在这种情况下,当连接器重新启动时,它将从最近记录的偏移量开始恢复。这意味着可能会有重复(这是微不足道的,因为我们已经从RF中得到重复)。请注意,由于偏移量仅在记录成功发送到Kafka时才会更新,因此在崩溃期间丢失ChangeEventQueue中未发出的数据是可以的,因为这些事件将被重新创建。

Kafka不可用

当连接器生成更改事件时,它将使用Kafka生成器API将这些事件发布到Kafka。如果Kafka代理不可用(生产者遇到TimeoutException), Cassandra连接器将每秒重复尝试重新连接代理一次,直到重试成功。

Cassandra连接器停止一段时间

根据表的写负载,当Cassandra连接器长时间停止时,它可能会达到cdc_total_space_in_mb容量。一旦达到这个上限,Cassandra将停止接受对该表的写操作;这意味着在运行Cassandra连接器时监视这个空间非常重要。在最坏的情况下,如果发生这种情况,请完成以下步骤:

  1. 关闭Cassandra连接器。

  2. Dusable CDC为表,因此它停止产生额外的写入。因为提交日志没有被过滤,所以对同一节点上启用cdc的其他表的写入仍然会影响提交日志文件的生成。

  3. 从偏移量文件中删除已记录的偏移量

  4. 在容量增加或目录使用的空间得到控制之后,重新启动连接器,使其重新引导表。

Cassandra表CDC已启用,然后暂时禁用,然后再次启用

如果Cassandra表暂时禁用CDC,然后在一段时间后重新启用它,则必须重新引导它。要重新引导单个表,可以手动从snapshot_offset中删除与该表对应的已记录偏移行。属性文件。

部署连接器

Cassandra连接器应该部署在Cassandra集群中的每个Cassandra节点上。Cassandra连接器Jar文件包含一个CDC配置(.properties)文件。看到参见配置示例供参考。

示例配置

下面是一个用于在本地运行和测试Cassandra Connector的.properties配置文件示例:

connector.name = test_connector commit.log.relocation。d我r=/Users/test_user/debezium-connector-cassandra/test_dir/relocation/ http.port=8000 cassandra.config=/usr/local/etc/cassandra/cassandra.yaml cassandra.hosts=127.0.0.1 cassandra.port=9042 kafka.producer.bootstrap.servers=127.0.0.1:9092 kafka.producer.retries=3 kafka.producer.retry.backoff.ms=1000 topic.prefix=test_prefix key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url: http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url: http://localhost:8081 offset.backing.store.dir=/Users/test_user/debezium-connector-cassandra/test_dir/ snapshot.consistency=ONE snapshot.mode=ALWAYS latest.commit.log.only=true

监控

Cassandra连接器内置了对JMX指标的支持。Cassandra驱动程序还发布了许多关于驱动程序活动的指标,这些指标可以通过JMX进行监控。连接器有两种类型的指标。快照度量可以帮助您监视快照活动,并且在连接器执行快照时可用。当连接器读取Cassandra提交日志时,Binlog指标帮助您监视进度和活动。

快照指标

属性名称

类型

描述

total-table-count

int

快照中包含的表的总数。

remaining-table-count

int

快照尚未复制的表数。

snapshot-running

布尔

快照是否启动。

snapshot-aborted

布尔

快照是否中止。

snapshot-completed

布尔

快照是否完成。

snapshot-during-in-seconds

快照到目前为止所花费的总秒数,即使没有完成。

rows-scanned

Map < String,长>

映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。

Commitlog指标

属性名称

类型

描述

commitlog-filename

字符串

连接器最近读取的提交日志文件名的名称。

commitlog-position

连接器已读取的提交日志中的最近位置(以字节为单位)。

number-of-processed-mutations

已经处理的突变的数量。

number-of-unrecoverable-errors

处理提交日志时不可恢复的错误数。

连接器属性

财产

默认的

描述

最初的

指定运行快照的条件(例如。在启动Cassandra连接器代理时进行初始同步。必须是'INITIAL', 'ALWAYS',或'NEVER'中的一个。默认快照模式为“INITIAL”。

所有

指定用于快照查询的{@link ConsistencyLevel}。

8000

HTTP服务器用于ping、运行状况检查和生成信息的端口

没有默认的

Cassandra节点使用的YAML配置文件的绝对路径。

本地主机

驱动程序用于发现拓扑的Cassandra节点的一个或多个地址,用","分隔

9042

用于连接Cassandra主机的端口。

没有默认的

连接Cassandra主机时使用的用户名。

没有默认的

连接Cassandra主机时使用的密码。

如果设置为true, Cassandra连接器代理将使用SSL连接到Cassandra节点。

没有默认的

存储节点SSL配置文件路径。配置文件的示例可以在页面底部找到。

仅适用于Cassandra 4,如果设置为true, Cassandra连接器代理将通过观察提交日志索引文件中的更新来增量读取提交日志,并实时读取流数据,频率由commit.log.marked.complete.poll.interval.ms.如果设置为false, Cassandra 4连接器将等待提交日志文件标记为已完成,然后再处理它们。

10000

仅适用于Cassandra 4和当实时流commit.log.real.time.processing.enabled.此配置确定提交日志索引文件轮询偏移值更新的频率。

没有默认的

一旦处理完成,提交日志将从cdc_raw dir重新定位到的本地目录。

真正的

确定是否应该运行CommitLogPostProcessor来从重定位目录移动已处理的提交日志。如果禁用,提交日志将不会被移出重定位目录。

10000

CommitLogPostProcessor在重定位目录中重新获取所有处理过的提交日志时应该等待的时间。

io.开云体育官方注册网址debezium.connector.cassandra.BlackHoleCommitLogTransfer

CommitLogPostProcessor用于从重定位目录移动已处理提交日志的类。内置的传输类是BlackHoleCommitLogTransfer,它只是从重定位目录中删除所有处理过的提交日志。如果需要,用户应该实现他们自己的自定义提交日志传输类。

确定CommitLogProcessor是否应该重新处理错误提交日志。

没有默认的

属性的符号名称的逗号分隔列表自定义转换器连接器可以使用的实例。例如,

国际标准图书编号

您必须设置转换器属性使连接器能够使用自定义转换器。

对于为连接器配置的每个转换器,还必须添加.type属性,该属性指定实现转换器接口的类的全限定名称。的.type属性使用以下格式:

< converterSymbolicName >.type

例如,

isbn。type: io.debezium.test.IsbnConverter

如果希望进一步控制已配置转换器的行为,可以添加一个或多个配置参数来将值传递给转换器。若要将任何其他配置参数与转换器关联起来,请在参数名称前加上转换器的符号名称。例如,

isbn.schema.name: io.开云体育官方注册网址debezium.cassandra.type.Isbn

没有默认的

存储偏移跟踪文件的目录。

0

提交偏移量之前等待的最短时间。默认值为0表示每次都会刷新偏移量。

One hundred.

在需要将偏移量刷新到磁盘之前允许处理的最大记录。该配置仅在offset_flush_interval_ms != 0时有效。

8192

正整数值,指定从提交日志中读取的更改事件写入Kafka之前放入的阻塞队列的最大大小。例如,当写入Kafka较慢或Kafka不可用时,该队列可以为提交日志读取器提供回压。出现在队列中的事件不包括在此连接器定期记录的偏移量中。默认值为8192,并且应该总是大于max.batch.size属性中指定的最大批处理大小。在反序列化记录被转换为Kafka Connect结构体并发送到Kafka之前,队列中保存反序列化记录的容量。

2048

每次要退出队列的最大更改事件数。

0

一个长整数值,以字节为单位指定阻塞队列的最大容量。默认情况下,不为阻塞队列指定卷限制。若要指定队列可以使用的字节数,请将此属性设置为正的长值。
如果max.queue.size时,当队列大小达到任一属性指定的限制时,将阻塞对队列的写入。例如,如果你设置max.queue.size = 1000,max.queue.size.in.bytes = 5000,当队列中有1000条记录时,或者队列中记录的容量达到5000字节时,写入队列被阻塞。

1000

正整数值,指定提交日志处理器在每次迭代期间等待新更改事件出现在队列中的毫秒数。缺省值为1000毫秒,即1秒。

10000

正整数值,指定模式处理器在刷新缓存的Cassandra表模式之前应该等待的毫秒数。

10000

每次轮询在重新尝试之前等待的最大时间。

10000

正整数值,指定快照处理器在重新扫描表以查找新的启用cdc的表之前应该等待的毫秒数。缺省值为10000毫秒,即10秒。

删除事件是否应该有后续的墓碑事件(true)或(false)。需要注意的是,在Cassandra中,具有相同键的两个事件可能正在更新给定表的不同列。因此,这可能会导致在压缩过程中记录丢失,如果它们还没有被消费者使用。换句话说,如果你开启了Kafka压缩,不要将此设置为true。

没有默认的

应从更改事件消息值中排除的字段的全限定名称的逗号分隔列表。字段的完全限定名格式为keyspace_name>... < keyspace_name>.< nested_field_name>. >

1

更改事件队列和队列处理器的数量。默认值为1。

真正的连接器配置显式指定key.convertervalue.converter参数使用Avro,否则默认为

是否对字段名进行消毒以符合Avro命名要求。看到Avro命名欲知详情。

t

以逗号分隔的操作类型列表,将在流处理期间跳过。操作包括:c插入/创建、u的更新,d删除,t对于截断,和没有一个不跳过任何操作。默认情况下,截断操作将被跳过(此连接器不会触发)。

io.开云体育官方注册网址debezium.schema.SchemaTopicNamingStrategy

TopicNamingStrategy类的名称默认为,该类应用于确定数据更改、模式更改、事务、心跳事件等的主题名称SchemaTopicNamingStrategy

指定主题名称的分隔符,默认为

没有默认的

用于所有主题的前缀的名称。

不要更改此属性的值。如果更改了name值,在重新启动后,连接器不会继续向原始主题发出事件,而是向名称基于新值的主题发出后续事件。连接器也无法恢复其数据库模式历史记录主题。开云体育电动老虎机

10000

用于在有界并发散列映射中保存主题名称的大小。此缓存有助于确定与给定数据集合对应的主题名称。

__开云体育官方注册网址debezium-heartbeat

控制连接器向其发送心跳消息的主题的名称。主题名称有以下模式:

topic.heartbeat.prefixtopic.prefix

例如,如果数据库服务器名称或主题前缀为开云体育电动老虎机实现,默认主题名为__开云体育官方注册网址debezium-heartbeat.fulfillment

指定如何varint列应该在更改事件中表示。可能的设置有:

(默认值)使用Java的表示值这种方法可能无法提供精准度,但在消费者中很容易使用。

精确的使用java.math.BigDecimal来表示值,这些值通过使用二进制表示和Kafka Connect的方式编码在更改事件中org.apache.kafka.connect.data.Decimal类型。

字符串将值编码为格式化字符串,这很容易使用。

指定如何小数列应该在更改事件中表示。可能的设置有:

(默认值)使用Java的表示值这种方法可能无法提供精准度,但在消费者中很容易使用。

精确的使用java.math.BigDecimal来表示值,这些值通过使用二进制表示和Kafka Connect的方式编码在更改事件中org.apache.kafka.connect.data.VariableScaleDecimal类型。

字符串将值编码为格式化字符串,这很容易使用。

如果Cassandra代理使用SSL连接Cassandra节点,则需要SSL配置文件。以编写SSL配置文件为例:

keyStore.location = / var /私人/ ssl / cassandra.keystore。jks密钥存储库。密码=卡桑德拉密钥存储库。type=JKS trustStore.location=/var/private/ssl/cassandra.truststore.jks trustStore.password=cassandra trustStore.type=JKS keyManager.algorithm=SunX509 trustManager.algorithm=SunX509 cipherSuites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

cipherSuites字段不是强制性的,它只是允许您添加一个(或多个)不存在的密码。trustStore的默认值。类型和keyStore。类型为JKS。keyManager的默认值。算法和trustManager。算法为SunX509。

连接器还支持在创建Kafka生成器时使用的直通配置属性。属性开头的所有连接器配置属性kafka.producer。在创建将事件写入Kafka的Kafka生产者时使用前缀(不带前缀)。

例如,可以使用以下连接器配置属性安全连接到Kafka代理

kafka.producer.security。协议SSL kafka.producer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks kafka.producer.ssl.keystore。密码= test1234 kafka.producer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks kafka.producer.ssl.truststore。密码= test1234 kafka.producer.ssl.key。密码= test1234 kafka.consumer.security。协议SSL kafka.consumer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks kafka.consumer.ssl.keystore。密码= test1234 kafka.consumer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks kafka.consumer.ssl.truststore。密码= test1234 kafka.consumer.ssl.key.password = test1234

一定要咨询卡夫卡的文档为Kafka生产者的所有配置属性。

该连接器支持以下Kafka连接转换器的键/值序列化:

io.confluent.connect.avro.AvroConverter org.apache.kafka.connect.store.stringconverter org.apache.kafka.connect.json.JsonConverter com.blueapron.connect.protobuf.ProtobufConverter