开云体育官方注册网址卡桑德拉Debezium连接器
Cassanadra连接器可以监视Cassandra集群并记录所有行级更改。连接器必须本地部署在Cassandra集群中的每个节点上。连接器第一次连接到Cassandra节点时,它对所有关键空间中所有启用cdc的表执行快照。连接器还将读取写入Cassandra提交日志的更改,并生成相应的插入、更新和删除事件。每个表的所有事件都记录在一个单独的kafka主题中,应用程序和服务可以很容易地使用它们。
有关与此连接器兼容的Cassandra版本的信息,请参见开云体育官方注册网址Debezium发布概述.
概述
Cassandra是一个开源的NoSQL数据库。开云体育电动老虎机与大多数数据库类似,Cassan开云体育电动老虎机dra的写路径从立即将更改记录到其提交日志开始。提交日志本地驻留在每个节点上,记录对该节点的每一次写操作。
自Cassandra 3.0以来,a变更数据捕获(CDC)特性介绍了。CDC特性可以通过设置table属性在表级别上启用美国疾病控制与预防中心= true
,在此之后,任何包含CDC启用表的数据的提交日志将被移动到中指定的CDC目录cassandra.yaml
在丢弃。
Cassandra连接器驻留在每个Cassandra节点上,并监视cdc_raw
更改目录。它处理所有检测到的本地提交日志段,为提交日志中的每个行级插入、更新和删除操作生成一个更改事件,在单独的Kafka主题中发布每个表的所有更改事件,最后从Kafka主题中删除提交日志cdc_raw
目录中。最后一步很重要,因为一旦CDC被启用,Cassandra本身就不能清除提交日志。如果cdc_free_space_in_mb
写入启用cdc的表时将被拒绝。
这个连接器容错能力强。当连接器读取提交日志并产生事件时,它记录每个提交日志段的文件名和位置以及每个事件。如果连接器由于任何原因(包括通信故障、网络问题或崩溃)而停止,那么在重新启动时,它只是在上次停止的地方继续读取提交日志。这包括快照:如果在连接器停止时快照没有完成,那么在重新启动时它将开始一个新的快照。稍后我们将讨论连接器的行为当事情出错时.
Cassandra与其他Debezium连接器不同,因为它不是在Kafka 开云体育官方注册网址Connect框架之上实现的。相反,它是一个单独的JVM进程,旨在驻留在每个Cassandra节点上,并通过Kafka生成器将事件发布到Kafka。 |
Cassandra连接器目前不支持以下特性。由这些特性引起的更改将被忽略:
|
设置卡桑德拉
在使用Debezium开云体育官方注册网址 Cassandra连接器监视Cassandra集群中的更改之前,必须在节点级和表级启用CDC。
在节点上启用CDC
要启用CDC,请在中更新以下CDC配置cassandra.yaml
:
cdc_enabled:真
额外的CDC配置有以下默认值:
cdc_raw_directory: $CASSANDRA_HOME/data/cdc_raw: 4096 cdc_free_space_in_mb: 250
cdc_enabled
在节点范围内启用或禁用CDC操作cdc_raw_directory
在所有对应的memtable被刷新后,确定要移动的提交日志段的目的地。cdc_free_space_in_mb
是分配用于存储提交日志段的最大容量,默认为4096mb的最小值和1/8的卷空间。cdc_free_space_check_interval_ms
我们重新计算占用空间的频率是否为cdc_raw_directory
以防止不必要地消耗CPU周期时,容量。
Cassandra连接器如何工作
本节将详细介绍Cassandra连接器如何执行快照、将提交日志事件转换为Debezium更改事件、处理提交日志生命周期、将事件记录到Kafka、管理模式演变以及在出现错误时的行为。开云体育官方注册网址
快照
当Cassandra连接器第一次在Cassandra节点上启动时,默认情况下它将执行集群的初始快照。这是默认模式,因为大多数时候CDC是在非空表上启用的,并且提交日志不包含完整的历史记录。
快照读取器发出SELECT语句来查询表中的所有列。Cassandra允许在全局或语句级别上设置一致性级别。对于快照,一致性级别在语句级别设置为所有
默认提供最高一致性。这意味着如果一个节点在快照期间宕机,快照将无法继续,并且一旦该节点恢复在线,就需要后续重新快照。您可以将快照的一致性级别调整到较低的一致性级别,以提高可用性,前提是您理解一致性之间的权衡。
在卡桑德拉3。X,不可能严格地从本地Cassandra节点读取。从Cassandra 4.0开始,a |
与关系数据库不同,快照期间没有应用读开云体育电动老虎机锁,因此对Cassandra的写入在快照期间不会阻塞。如果查询的数据在快照期间被另一个客户端修改了,这些更改可能会反映在快照结果集中。
如果连接器失败或在快照完成之前停止,连接器将在重新启动时开始一个新的快照。在默认快照模式下(最初的
),一旦连接器完成它的初始快照,它将不再执行任何额外的快照。唯一的例外是在连接器重新启动期间:如果在一个表上启用了CDC,然后重新启动连接器,那么该表将被快照。
第二种快照模式(总是
)允许连接器在必要时执行快照。它定期检查新启用cdc的表,并在检测到这些表时立即对它们进行快照。
第三种快照模式('never')确保连接器永远不会执行快照。当以这种方式配置一个新连接器时,它将只读取CDC目录中的提交日志。这不是默认行为,因为以这种模式(不带快照)启动新连接器需要提交日志包含所有启用cdc的表的整个历史,而通常情况并非如此。此模式的另一个用例是,如果有一个连接器已经在执行快照,则可以禁用其他连接器的快照,以避免重复工作。
读取提交日志
Cassandra连接器通常将绝大多数时间用于读取Cassandra节点上的本地提交日志。在Cassandra 4.0中,在每个分段fsync上,索引文件将被更新以反映最新的偏移量。这消除了Cassandra 3.X中CDC特性中的处理延迟。并且可以通过设置配置在Cassandra 4 Debezium连接器开云体育官方注册网址中启用:commit.log.real.time.processing.enabled
来真正的
.索引文件轮询的频率由commit.log.marked.complete.poll.interval.ms
.
使用Cassandra的CommitLogReader和CommitLogReadHandler反序列化提交日志的二进制数据。每个反序列化对象称为a突变
卡桑德拉。一个突变
包含一个或多个更改事件。
当Cassandra连接器读取提交日志时,它将日志事件转换为Debezium开云体育官方注册网址创建,更新,或删除事件,包括在提交日志中发现事件的位置。Cassandra连接器使用Kafka Connect转换器对这些更改事件进行编码,并将它们发布到适当的Kafka主题中。
提交日志的限制
Cassandra的提交日志带有一组限制,这对于正确解释CDC事件至关重要:
提交日志只到达
cdc_raw
目录,在这种情况下,它将被刷新/丢弃。这意味着在记录事件和捕获事件之间存在延迟。单个Cassandra节点上的提交日志不反映对集群的所有写操作,它们只反映存储在该节点上的写操作。这就是为什么必须监视Cassandra集群中所有节点上的更改。但是,由于复制因素,这也意味着这些事件的下游消费者有必要处理重复数据删除。
对单个Cassandra节点的写入在到达时被记录。但是,这些事件可能会按照它们发出的顺序出现。这些事件的下游消费者必须理解并实现类似于Cassandra的读取路径的逻辑,以获得正确的输出。
提交日志中不记录表的模式更改,只记录数据更改。因此,在尽可能的基础上检测模式的更改。为了避免数据丢失,建议在模式更改期间暂停对表的写操作。
Cassandra不执行先读后写,因此提交日志不记录已更改行的每一列的值,它只记录已修改的列的值(分区键列除外,它们总是根据Cassandra DML命令的要求进行记录)。
由于CQL的性质,插入dml可以导致行插入或更新;更新dml可以导致行插入、更新或删除;删除dml可以导致行更新或删除。由于查询没有记录在提交日志中,CDC事件类型在关系数据库意义上是根据对行的影响进行分类的。开云体育电动老虎机
待办事项:是否有一种方法来确定事件类型对应于实际的Cassandra DML语句?如果是的话,这比这些事件的语义更受欢迎吗?
管理提交日志生命周期
默认情况下,Cassandra连接器将删除已处理的提交日志。不建议在禁用删除提交日志的同时启动连接器,因为这可能会使磁盘存储空间膨胀,并阻止进一步向Cassandra集群写入数据。要以自定义方式管理提交日志(即上传到云提供商),可以实现CommitLogTransfer接口。
主题名称
Cassandra连接器将单个表上所有插入、更新和删除操作的事件写入单个Kafka主题。Kafka主题的名称通常采用以下形式:
clusterName.keyspaceName.的表
在哪里clusterName连接器的逻辑名称是否与topic.prefix
配置属性,keyspaceName发生操作的键空间的名称和的表发生操作的表的名称。
例如,考虑一个Cassandra安装库存
包含四个表的Keyspace:产品
,products_on_hand
,客户
,订单
.如果给监视此数据库的连接器一个逻辑服务器名开云体育电动老虎机实现
,那么连接器将在这四个Kafka主题上产生事件:
fulfillment.inventory.products
fulfillment.inventory.products_on_hand
fulfillment.inventory.customers
fulfillment.inventory.orders
待办事项:对于主题名称,为clusterName.keyspaceName.的表好吧?或者它应该是connectorName.keyspaceName.的表或connectorName.clusterName.keyspaceName.的表?
模式演化
ddl没有记录在提交日志中。当一个表的模式发生变化时,这个变化会从一个Cassandra节点发出,并通过八卦协议传播到其他节点。
Cassandra中的模式更改将被一个实现的SchemaChangeListener以小于1秒的延迟检测到,然后它将更新从Cassandra加载的模式实例以及为每个表缓存的Kafka键值模式。
请注意,使用当前的模式演进方法,Cassandra连接器在以下情况下将无法在一小段时间内提供准确的数据更改信息:
如果一个表的CDC被禁用,在CDC被禁用之前发生的数据更改将被跳过。
如果从表中删除列,则在删除该列之前涉及该列的数据更改不能正确反序列化,将被跳过。
事件
Cassandra连接器产生的所有数据更改事件都有一个键和一个值,不过键和值的结构取决于产生更改事件的表(参见主题名称)。
更改事件的键
对于给定的表,更改事件的键将具有一个结构,该结构在创建事件时包含表主键中每一列的字段。考虑一个库存
开云体育电动老虎机数据库的客户
表定义为:
CREATE TABLE customers (id bigint, registration_date timestamp, first_name text, last_name text, email text, PRIMARY KEY (id, registration_date));
的每一个变化事件客户
表,当它有这个定义时,将具有相同的键模式,在JSON表示中是这样的:
{"type": "record", "name": "cassandra-cluster-1.inventory.customers. "Key", "namespace": "io.开云体育官方注册网址 debezum .connector.cassandra", "fields": [{"name": "id", "type": "long"}, {"name": "registration_date", "type": "long", "logicalType": "timestamp-millis"}]}
对于id = 1001和registration_date = 1562202942545, JSON表示中的键有效负载如下所示:
{"id": 1001, "registration_date": 1562202942545}
虽然 |
更改事件的值
更改事件消息的值稍微复杂一些。Cassandra连接器产生的每个更改事件值都有一个包含以下字段的信封结构:
-
人事处
-
一个包含描述操作类型的字符串值的必填字段。Cassandra连接器的值为
我
插入,u
更新,和d
删除。 -
后
-
一个可选字段,如果存在则包含行状态后事件发生了。该结构将由
卡桑德拉inventory.customers.value——集群- 1.
Kafka Connect模式,它代表了事件所引用的集群、密钥空间和表。 -
源
-
一个必须的字段,包含描述事件源元数据的结构,在Cassandra的情况下包含以下字段:
开云体育官方注册网址Debezium版本。
连接器的名称。
Cassandra集群名称。
记录事件的提交日志文件的名称、事件在该提交日志文件中出现的位置、该事件是否是快照的一部分、受影响的键空间和表的名称,以及分区更新的最大时间戳(以微秒为单位)。
-
ts_ms
-
(可选)如果存在,则包含连接器根据运行Cassandra连接器的JVM的系统时钟处理事件的时间。
因为Cassandra不执行写前读,所以Cassandra提交日志不会在应用更改之前记录行值。因此,卡桑德拉更改事件记录中不包括一个 |
的值模式的JSON表示形式创建我们的活动客户
表:
{"type": "record", "name": "cassandra-cluster-1.inventory.customers. "信封”、“名称”:“io.debezium.connec开云体育官方注册网址tor.cassandra”、“字段”:[{" name ":“人事处”、“类型”:“字符串”},{“名称”:“ts_ms”,“类型”:“长”、“logicalType”:“timestamp-millis”},{“名称”:“后”,“类型”:“记录”、“字段”:[{" name ": " id ",“类型”:[“零”,{“名称”:“id”、“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“字符串”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] }, { " 名称”:“registration_date”、“类型”(“空”,{“名称”:“registration_date”,“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“长”、“logical_type”:“timestamp-millis”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] }, { " 名称”:“first_name”、“类型”:[“零”,{“名称”:“first_name”,“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“字符串”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] }, { " 名称”:“last_name”、“类型”(“空”,:{“名称”:“last_name”,“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“字符串”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] }, { " 名称”:“last_name”、“类型”(“空”,:{“名称”:“电子邮件”、“类型”:“记录”、“字段”:[{“名称”:“价值”、“类型”:“字符串”},{“名称”:“deletion_ts”,“类型”:“空”、“长”,“默认”:“零”},{“名称”:“设置”,“类型”:“布尔 " } ] } ] } ] }, { " 名称”:“源”、“类型”:“记录”、“字段”:[{“名称”:“版本”、“类型”:“字符串”},{“名称”:“连接器”、“类型”:“字符串”},{“名称”:“集群”、“类型”:“字符串”},{“名称”:“快照”,“类型”:“布尔”},{“名称”:“用于”、“类型”:“字符串”},{“名称”:“表”、“类型”:“字符串”},{“名称”:“文件”、“类型”:“字符串”},{“名称”:“位置”,“类型”:“int”},{“名称”:“ts_ms”、“类型”:“长”、“logicalType”:"timestamp-micros"}]}]}
待办事项:验证最大时间戳!=删除时间戳,如果删除ddl
给定以下条件插入
DML:
INSERT INTO customers (id, registration_date, first_name, last_name, email) VALUES (1001, now(), "Anne", "Kretchmar", "annek@noanswer.org");
JSON表示中的值负载看起来像这样:
{"op": "c", "ts_ms": 1562202942832, "after": {"id": {"value": 1001, "deletion_ts": null, "set": true}, "registration_date": {"value": 1562202942545, "deletion_ts": null, "set": true}, "first_name": {"value": "Anne", "deletion_ts": null, "set": true}, "last_name": {"value": "Kretchmar", "deletion_ts": null, "set": true}, "email": {"value": "annek@noanswer.org", "deletion_ts": null, "set": true}, "source": {"version": "2.1.2. "Final", "connector": "cassandra", "cluster": "cassandra-cluster-1", "snapshot": false, "keyspace": "inventory", "table": "customers", "file": "commitlog-6-123456.log", "pos": 54, "ts_ms": 1562202942666382}}
给定以下条件更新
DML:
更新客户SET email = "annek_new@noanswer.org" WHERE id = 1001 AND registration_date = 1562202942545
JSON表示中的值负载看起来像这样:
{"op": "u", "ts_ms": 1562202942912, "after": {"id": {"value": 1001, "deletion_ts": null, "set": true}, "registration_date": {"value": 1562202942545, "deletion_ts": null, "set": true}, "first_name": null, "last_name": null, "email": {"value": "annek_new@noanswer.org", "deletion_ts": null, "set": true}}, "source": {"version": "2.1.2. "Final", "connector": "cassandra", "cluster": "cassandra-cluster-1", "snapshot": false, "keyspace": "inventory", "table": "customers", "file": "commitlog-6-123456.log", "pos": 102, "ts_ms": 1562202942666490}}
当我们把这个和插入事件,我们看到了一些不同之处:
的
人事处
字段值现在是u
,表示该行因更新而更改。的
后
字段现在具有该行的更新状态,在这里我们可以看到电子邮件值现在是annek_new@noanswer.org
.请注意,first_name
而且last_name
为空,这是因为这些字段在更新期间没有更改。然而,id
而且registration_date
仍然包含,因为这些是该表的主键。的
源
字段结构具有与以前相同的字段,但值不同,因为此事件来自提交日志中的不同位置。的
ts_ms
显示连接器处理此事件的时间戳毫秒。
最后,给出如下条件删除
DML:
DELETE FROM customers WHERE id = 1001 AND registration_date = 1562202942545
JSON表示中的值负载看起来像这样:
{"op": "d", "ts_ms": 1562202942912, "after": {"id": {"value": 1001, "deletion_ts": 1562202972545, "set": true}, "registration_date": {"value": 1562202942545, "deletion_ts": 1562202972545, "set": true}, "first_name": null, "last_name": null, "email": null}, "source": {"version": "2.1.2. "Final", "connector": "cassandra", "cluster": "cassandra-cluster-1", "snapshot": false, "keyspace": "inventory", "table": "customers", "file": "commitlog-6-123456.log", "pos": 102, "ts_ms": 1562202942666490}}
当我们把这个和插入而且更新事件,我们看到了一些不同之处:
的
人事处
字段值现在是d
,表示该行因删除而更改。的
后
字段仅包含值id
而且registration_date
因为这是按主键删除。的
源
字段结构具有与以前相同的字段,但值不同,因为此事件来自提交日志中的不同位置。的
ts_ms
显示连接器处理此事件的时间戳毫秒。
待办事项:给定TTL目前不支持,是否删除delete_ts会更好?是否也可以通过查看每一列来派生一个字段是否为空?
待办事项:讨论Cassandra连接器中的墓碑事件
数据类型
如上所述,Cassandra连接器用事件表示行更改,事件的结构类似于行所在的表。该事件包含每个列值的字段,该值在事件中的表示方式取决于该列的Cassandra数据类型。本节描述此映射。
下表描述了连接器如何将每个Cassandra数据类型映射到Kafka Connect数据类型。
Cassandra数据类型 |
文字类型(模式类型) |
语义类型(模式名) |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
|
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
N/A |
|
|
|
|
|
|
|
|
N/A |
|
|
N/A |
|
|
|
|
|
N/A |
|
|
N/A |
|
|
|
待办事项:添加逻辑类型
任意精度整数类型
Cassandra连接器句柄varint
属性的设置varint.handling.mode
连接器配置属性.
- varint.handling.mode =长
-
表1。映射时 varint.handling.mode =长
卡桑德拉式 文字类型 语义类型 varint
INT64
N/A
- varint.handling.mode =精确
-
表2。映射时 decimal.handling.mode =精确
卡桑德拉式 文字类型 语义类型 varint
字节
org.apache.kafka.connect.data.Decimal
的规模
模式参数设置为0。 - varint.handling.mode =字符串
-
表3。映射时 varint.handling.mode =字符串
卡桑德拉式 文字类型 语义类型 varint
字符串
N/A
十进制类型
Cassandra连接器句柄小数
属性的设置decimal.handling.mode
连接器配置属性.
- decimal.handling.mode =双
-
表4。映射时 decimal.handling.mode =双
卡桑德拉式 文字类型 语义类型 小数
FLOAT64
N/A
- decimal.handling.mode =精确
-
表5所示。映射时 decimal.handling.mode =精确
卡桑德拉式 文字类型 语义类型 小数
结构体
io.开云体育官方注册网址debezium.data.VariableScaleDecimal
包含两个字段的结构:规模
类型的INT32
其中包含传输值的刻度和价值
类型的字节
以未缩放的形式包含原始值。 - decimal.handling.mode =字符串
-
表6所示。映射时 decimal.handling.mode =字符串
卡桑德拉式 文字类型 语义类型 小数
字符串
N/A
如果默认的数据类型转换不能满足您的需求,您可以这样做创建自定义转换器对于连接器。
当事情出错时
配置和启动错误
Cassandra连接器将在启动时失败,在日志中报告错误或异常,如果配置无效或如果连接器无法使用指定的连接参数成功连接到Cassandra,则停止运行。在这种情况下,错误将提供有关问题的更多细节,并可能建议解决方法。当配置被纠正后,连接器可以重新启动。
Cassandra不可用
一旦连接器开始运行,如果Cassandra节点由于任何原因变得不可用,连接器将失败并停止。在这种情况下,一旦服务器可用,就重新启动连接器。如果这发生在快照期间,它将从表的开头重新引导整个表。
卡桑德拉连接器优雅地停止
如果Cassandra连接器被优雅地关闭,在停止进程之前,它将确保将ChangeEventQueue中的所有事件刷新到Kafka。Cassandra连接器会在每次流记录被发送到Kafka时跟踪文件名和偏移量。因此,当重新启动连接器时,它将从停止的地方重新开始。它通过搜索目录中最古老的提交日志,开始处理该提交日志,跳过已经读取的记录,直到找到尚未处理的最新记录。如果Cassandra连接器在快照期间停止,它将从该表中拾取数据,但将重新引导整个表。
Cassandra连接器崩溃
如果Cassandra连接器意外崩溃,那么Cassandra连接器可能已经终止,而没有记录最近处理的偏移量。在这种情况下,当连接器重新启动时,它将从最近记录的偏移量开始恢复。这意味着可能会有重复(这是微不足道的,因为我们已经从RF中得到重复)。请注意,由于偏移量仅在记录成功发送到Kafka时才会更新,因此在崩溃期间丢失ChangeEventQueue中未发出的数据是可以的,因为这些事件将被重新创建。
Kafka不可用
当连接器生成更改事件时,它将使用Kafka生成器API将这些事件发布到Kafka。如果Kafka代理不可用(生产者遇到TimeoutException), Cassandra连接器将每秒重复尝试重新连接代理一次,直到重试成功。
Cassandra连接器停止一段时间
根据表的写负载,当Cassandra连接器长时间停止时,它可能会达到cdc_total_space_in_mb容量。一旦达到这个上限,Cassandra将停止接受对该表的写操作;这意味着在运行Cassandra连接器时监视这个空间非常重要。在最坏的情况下,如果发生这种情况,请完成以下步骤:
关闭Cassandra连接器。
Dusable CDC为表,因此它停止产生额外的写入。因为提交日志没有被过滤,所以对同一节点上启用cdc的其他表的写入仍然会影响提交日志文件的生成。
从偏移量文件中删除已记录的偏移量
在容量增加或目录使用的空间得到控制之后,重新启动连接器,使其重新引导表。
部署连接器
Cassandra连接器应该部署在Cassandra集群中的每个Cassandra节点上。Cassandra连接器Jar文件包含一个CDC配置(.properties)文件。看到参见配置示例供参考。
示例配置
下面是一个用于在本地运行和测试Cassandra Connector的.properties配置文件示例:
connector.name = test_connector commit.log.relocation。d我r=/Users/test_user/debezium-connector-cassandra/test_dir/relocation/ http.port=8000 cassandra.config=/usr/local/etc/cassandra/cassandra.yaml cassandra.hosts=127.0.0.1 cassandra.port=9042 kafka.producer.bootstrap.servers=127.0.0.1:9092 kafka.producer.retries=3 kafka.producer.retry.backoff.ms=1000 topic.prefix=test_prefix key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url: http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url: http://localhost:8081 offset.backing.store.dir=/Users/test_user/debezium-connector-cassandra/test_dir/ snapshot.consistency=ONE snapshot.mode=ALWAYS latest.commit.log.only=true
监控
Cassandra连接器内置了对JMX指标的支持。Cassandra驱动程序还发布了许多关于驱动程序活动的指标,这些指标可以通过JMX进行监控。连接器有两种类型的指标。快照度量可以帮助您监视快照活动,并且在连接器执行快照时可用。当连接器读取Cassandra提交日志时,Binlog指标帮助您监视进度和活动。
快照指标
属性名称 |
类型 |
描述 |
|
|
快照中包含的表的总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否启动。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。 |
|
|
映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。 |
连接器属性
财产 |
默认的 |
描述 |
||
|
指定运行快照的条件(例如。在启动Cassandra连接器代理时进行初始同步。必须是'INITIAL', 'ALWAYS',或'NEVER'中的一个。默认快照模式为“INITIAL”。 |
|||
|
指定用于快照查询的{@link ConsistencyLevel}。 |
|||
|
HTTP服务器用于ping、运行状况检查和生成信息的端口 |
|||
没有默认的 |
Cassandra节点使用的YAML配置文件的绝对路径。 |
|||
|
驱动程序用于发现拓扑的Cassandra节点的一个或多个地址,用","分隔 |
|||
|
用于连接Cassandra主机的端口。 |
|||
没有默认的 |
连接Cassandra主机时使用的用户名。 |
|||
没有默认的 |
连接Cassandra主机时使用的密码。 |
|||
|
如果设置为true, Cassandra连接器代理将使用SSL连接到Cassandra节点。 |
|||
没有默认的 |
存储节点SSL配置文件路径。配置文件的示例可以在页面底部找到。 |
|||
|
仅适用于Cassandra 4,如果设置为true, Cassandra连接器代理将通过观察提交日志索引文件中的更新来增量读取提交日志,并实时读取流数据,频率由 |
|||
10000 |
仅适用于Cassandra 4和当实时流 |
|||
没有默认的 |
一旦处理完成,提交日志将从cdc_raw dir重新定位到的本地目录。 |
|||
|
确定是否应该运行CommitLogPostProcessor来从重定位目录移动已处理的提交日志。如果禁用,提交日志将不会被移出重定位目录。 |
|||
10000 |
CommitLogPostProcessor在重定位目录中重新获取所有处理过的提交日志时应该等待的时间。 |
|||
|
CommitLogPostProcessor用于从重定位目录移动已处理提交日志的类。内置的传输类是 |
|||
假 |
确定CommitLogProcessor是否应该重新处理错误提交日志。 |
|||
没有默认的 |
属性的符号名称的逗号分隔列表自定义转换器连接器可以使用的实例。例如,
您必须设置 对于为连接器配置的每个转换器,还必须添加
例如, isbn。type: io.debezium.test.IsbnConverter 如果希望进一步控制已配置转换器的行为,可以添加一个或多个配置参数来将值传递给转换器。若要将任何其他配置参数与转换器关联起来,请在参数名称前加上转换器的符号名称。例如, isbn.schema.name: io.开云体育官方注册网址debezium.cassandra.type.Isbn |
|||
没有默认的 |
存储偏移跟踪文件的目录。 |
|||
|
提交偏移量之前等待的最短时间。默认值为0表示每次都会刷新偏移量。 |
|||
|
在需要将偏移量刷新到磁盘之前允许处理的最大记录。该配置仅在offset_flush_interval_ms != 0时有效。 |
|||
|
正整数值,指定从提交日志中读取的更改事件写入Kafka之前放入的阻塞队列的最大大小。例如,当写入Kafka较慢或Kafka不可用时,该队列可以为提交日志读取器提供回压。出现在队列中的事件不包括在此连接器定期记录的偏移量中。默认值为8192,并且应该总是大于max.batch.size属性中指定的最大批处理大小。在反序列化记录被转换为Kafka Connect结构体并发送到Kafka之前,队列中保存反序列化记录的容量。 |
|||
|
每次要退出队列的最大更改事件数。 |
|||
|
一个长整数值,以字节为单位指定阻塞队列的最大容量。默认情况下,不为阻塞队列指定卷限制。若要指定队列可以使用的字节数,请将此属性设置为正的长值。 |
|||
|
正整数值,指定提交日志处理器在每次迭代期间等待新更改事件出现在队列中的毫秒数。缺省值为1000毫秒,即1秒。 |
|||
|
正整数值,指定模式处理器在刷新缓存的Cassandra表模式之前应该等待的毫秒数。 |
|||
|
每次轮询在重新尝试之前等待的最大时间。 |
|||
|
正整数值,指定快照处理器在重新扫描表以查找新的启用cdc的表之前应该等待的毫秒数。缺省值为10000毫秒,即10秒。 |
|||
|
删除事件是否应该有后续的墓碑事件(true)或(false)。需要注意的是,在Cassandra中,具有相同键的两个事件可能正在更新给定表的不同列。因此,这可能会导致在压缩过程中记录丢失,如果它们还没有被消费者使用。换句话说,如果你开启了Kafka压缩,不要将此设置为true。 |
|||
没有默认的 |
应从更改事件消息值中排除的字段的全限定名称的逗号分隔列表。字段的完全限定名格式为keyspace_name>. |
|||
|
更改事件队列和队列处理器的数量。默认值为1。 |
|||
|
是否对字段名进行消毒以符合Avro命名要求。看到Avro命名欲知详情。 |
|||
|
以逗号分隔的操作类型列表,将在流处理期间跳过。操作包括: |
|||
|
TopicNamingStrategy类的名称默认为,该类应用于确定数据更改、模式更改、事务、心跳事件等的主题名称 |
|||
|
指定主题名称的分隔符,默认为 |
|||
没有默认的 |
用于所有主题的前缀的名称。
|
|||
|
用于在有界并发散列映射中保存主题名称的大小。此缓存有助于确定与给定数据集合对应的主题名称。 |
|||
|
控制连接器向其发送心跳消息的主题的名称。主题名称有以下模式: |
|||
|
指定如何 |
|||
|
指定如何 |
如果Cassandra代理使用SSL连接Cassandra节点,则需要SSL配置文件。以编写SSL配置文件为例:
keyStore.location = / var /私人/ ssl / cassandra.keystore。jks密钥存储库。密码=卡桑德拉密钥存储库。type=JKS trustStore.location=/var/private/ssl/cassandra.truststore.jks trustStore.password=cassandra trustStore.type=JKS keyManager.algorithm=SunX509 trustManager.algorithm=SunX509 cipherSuites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
cipherSuites字段不是强制性的,它只是允许您添加一个(或多个)不存在的密码。trustStore的默认值。类型和keyStore。类型为JKS。keyManager的默认值。算法和trustManager。算法为SunX509。 |
连接器还支持在创建Kafka生成器时使用的直通配置属性。属性开头的所有连接器配置属性kafka.producer。
在创建将事件写入Kafka的Kafka生产者时使用前缀(不带前缀)。
例如,可以使用以下连接器配置属性安全连接到Kafka代理:
kafka.producer.security。协议SSL kafka.producer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks kafka.producer.ssl.keystore。密码= test1234 kafka.producer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks kafka.producer.ssl.truststore。密码= test1234 kafka.producer.ssl.key。密码= test1234 kafka.consumer.security。协议SSL kafka.consumer.ssl.keystore.location = = / var /私人/ SSL / kafka.server.keystore。jks kafka.consumer.ssl.keystore。密码= test1234 kafka.consumer.ssl.truststore.location = / var /私人/ ssl / kafka.server.truststore。jks kafka.consumer.ssl.truststore。密码= test1234 kafka.consumer.ssl.key.password = test1234
一定要咨询卡夫卡的文档为Kafka生产者的所有配置属性。
该连接器支持以下Kafka连接转换器的键/值序列化:
io.confluent.connect.avro.AvroConverter org.apache.kafka.connect.store.stringconverter org.apache.kafka.connect.json.JsonConverter com.blueapron.connect.protobuf.ProtobufConverter