开云体育官方注册网址MongoDB的Debezium连接器
开云体育官方注册网址Debezium的MongoDB连接器跟踪MongoDB副本集或MongoDB分片集群,以记录数据库和集合中的文档更改,并将这些更改记录为Kafka主题中的事件。开云体育电动老虎机连接器自动处理分片集群中分片的添加或删除、每个副本集成员的更改、每个副本集中的选举以及等待通信问题的解决。
概述
MongoDB的复制机制提供了冗余和高可用性,是在生产环境中运行MongoDB的首选方式。MongoDB连接器捕获复制集或分片集群中的更改。
一个MongoDB副本集由一组服务器组成,这些服务器都有相同数据的副本,复制确保客户端对副本集中的文档所做的所有更改主要的正确地应用于其他复制集的服务器,称为次要的人.MongoDB复制的工作原理是让主服务器记录其数据库中的更改oplog(或操作日志),然后每个辅助服务器读取主服务器的oplog,并按顺序将所有操作应用于它们自己的文档。当一个新的服务器被添加到一个副本集时,该服务器首先执行一个快照然后读取主数据库的oplog开云体育电动老虎机,以应用自快照开始以来可能发生的所有更改。当这个新服务器赶上主服务器的oplog的尾部时,它将成为辅助服务器(并能够处理查询)。
MongoDB连接器使用相同的复制机制,尽管它实际上并没有成为复制集的成员。然而,就像MongoDB从服务器一样,连接器总是读取复制集主服务器的oplog。并且,当连接器第一次看到一个副本集时,它会查看oplog以获得最后记录的事务,然后执行主数据库和集合的快照。开云体育电动老虎机复制完所有数据后,连接器就开始从之前从oplog中读取的位置开始流化更改。MongoDB oplog中的操作为幂等,因此无论应用多少次操作,结果都是相同的结束状态。
当MongoDB连接器进程发生变化时,它会定期记录事件起源于oplog中的位置。当MongoDB连接器停止时,它会记录它处理过的最后一个oplog位置,因此在重新启动时,它只是从该位置开始流。换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,它将准确地从停止的地方恢复,而不会丢失任何事件。当然,MongoDB的oplog通常被限制在最大大小,这意味着连接器不应该停止太长时间,否则在连接器有机会读取它们之前,oplog中的一些操作可能会被清除。在这种情况下,在重新启动时,连接器将检测缺失的oplog操作,执行快照,然后继续对更改进行流式处理。
MongoDB连接器还可以容忍副本集的成员和领导、分片集群中分片的添加或删除以及可能导致通信故障的网络问题的更改。连接器总是使用副本集的主节点来流化更改,因此当副本集经历一次选举并且不同的节点成为主节点时,连接器将立即停止流化更改,连接到新的主节点,并使用新的主节点开始流化更改。同样地,如果连接器遇到任何与复制集主通信的问题,它将尝试重新连接(使用指数回退,以避免淹没网络或复制集),并继续从它上次离开的地方传输更改。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。
支持的MongoDB拓扑
MongoDB连接器可以与各种MongoDB拓扑一起使用。
MongoDB副本集
MongoDB连接器可以从单个连接器中捕获更改MongoDB副本集.生产副本集需要最少的至少三名成员.
要使用带有副本集的MongoDB连接器,请提供一个或多个副本集服务器的地址为种子地址通过连接器mongodb.hosts
财产。连接器将使用这些种子连接到复制集,然后一旦连接,将从复制集获得完整的成员集以及哪个成员是主成员。连接器将启动连接到主服务器的任务,并从主服务器的oplog捕获更改。当副本集选择一个新的主副本时,任务将自动切换到新的主副本。
当MongoDB由代理连接时(例如在OS X或Windows上使用Docker),当客户端连接到副本集并发现成员时,MongoDB客户端将排除代理作为有效成员,并尝试直接连接到成员,而不是通过代理连接,但失败。 在这种情况下,将连接器设置为可选 |
MongoDB分片集群
一个MongoDB分片集群包括:
一个或多个碎片,每一个都作为复制集部署;
一个单独的副本集,充当集群的副本集配置服务器
一个或多个路由器(也称为
蒙戈
),客户端连接并将请求路由到适当的分片
要使用带分片集群的MongoDB连接器,请使用配置服务器副本集。当连接器连接到这个复制集时,它会发现自己正在充当一个分片集群的配置服务器,发现关于集群中用作分片的每个复制集的信息,然后启动一个单独的任务来捕获每个复制集的更改。如果向集群中添加了新的分片或删除了现有的分片,连接器将自动相应地调整其任务。
MongoDB独立服务器
MongoDB连接器不能监视独立MongoDB服务器的更改,因为独立服务器没有oplog。如果将独立服务器转换为具有一个成员的副本集,则连接器将正常工作。
MongoDB不推荐在生产环境中运行独立服务器。 |
MongoDB连接器如何工作
在配置和部署MongoDB连接器时,它首先连接到种子地址上的MongoDB服务器,并确定关于每个可用副本集的详细信息。由于每个复制集都有自己独立的oplog,连接器将尝试为每个复制集使用单独的任务。连接器可以限制它将使用的任务的最大数量,如果没有足够的任务可用,连接器将为每个任务分配多个副本集,尽管任务仍将为每个副本集使用单独的线程。
在对分片集群运行连接器时,使用值 |
逻辑连接器名称
连接器配置属性mongodb.name
作为逻辑名用于MongoDB副本集或分片集群。连接器以多种方式使用逻辑名:作为所有主题名的前缀,以及在记录每个副本集的oplog位置时作为唯一标识符。
您应该为每个MongoDB连接器提供一个唯一的逻辑名称,该名称可以有意义地描述源MongoDB系统。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。
执行快照
当一个任务使用复制集启动时,它使用连接器的逻辑名和复制集名来查找抵消它描述连接器先前停止读取更改的位置。如果可以找到一个偏移量,并且它仍然存在于oplog中,那么任务立即继续执行流的变化,从记录的偏移位置开始。
但是,如果没有发现偏移量,或者如果oplog不再包含该位置,任务必须首先通过执行命令获取复制集内容的当前状态快照.这个过程首先记录oplog的当前位置,并将其记录为偏移量(以及一个表示快照已经启动的标志)。然后,该任务将继续复制每个集合,生成尽可能多的线程(直到snapshot.max.threads
配置属性)以并行地执行此工作。连接器将记录一个单独的读取事件对于它看到的每个文档,读取事件将包含对象的标识符、对象的完整状态和源关于该对象所在的MongoDB副本集的信息。源信息还将包括一个标志,表示快照期间产生的事件。
此快照将继续,直到复制了与连接器筛选器匹配的所有集合为止。如果连接器在任务的快照完成之前停止,重新启动时连接器将再次开始快照。
在连接器执行任何副本集的快照时,尽量避免重新分配和重新配置任务。连接器使用快照的进度记录消息。为了最大限度地控制,为每个连接器运行一个单独的Kafka Connect集群。 |
流的变化
一旦复制集的连接器任务有了偏移量,它就使用偏移量来确定在oplog中的位置,它应该从哪里开始流化更改。然后,该任务将连接到副本集的主节点,并从该位置开始流化更改,处理所有的创建、插入和删除操作,并将它们转换为Debezium开云体育官方注册网址更改事件.每个更改事件都包括在oplog中发现操作的位置,连接器定期将其记录为最近的偏移量。记录偏移量的间隔由offset.flush.interval.ms
,这是一个Kafka连接工人配置属性。
当连接器优雅地停止时,将记录处理的最后一个偏移量,以便重新启动时,连接器将准确地继续它停止的位置。然而,如果连接器的任务意外终止,那么任务可能在它最后一次记录偏移量之后,但在最后一次记录偏移量之前处理并生成了事件;重新启动时,连接器从最后一个开始记录偏移,可能会生成一些与之前在崩溃之前生成的事件相同的事件。
当一切正常运行时,Kafka消费者将真正看到每条消息只有一天.然而,当出现问题时,Kafka只能保证消费者能够看到每一条信息至少一次.因此,您的客户需要预期看到不止一次的消息。 |
如上所述,连接器任务总是使用复制集的主节点来传输来自oplog的更改,从而确保连接器尽可能看到最新的操作,并且能够以比使用辅助节点更低的延迟捕获更改。当副本集选择一个新的主节点时,连接器立即停止流化更改,连接到新的主节点,并从相同位置的新主节点开始流化更改。同样地,如果连接器遇到任何与复制集成员通信的问题,它会尝试重新连接,使用指数回退以避免淹没复制集,并且一旦连接,它将继续从它上次离开的地方进行流更改。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。
总之,MongoDB连接器在大多数情况下继续运行。通信问题可能导致连接器等待问题解决。
主题名称
MongoDB连接器将对每个集合中的文档的所有插入、更新和删除操作的事件写入到单个Kafka主题。卡夫卡主题的名称总是采用这种形式logicalName.开云体育电动老虎机数据库名.collectionName,在那里logicalName是逻辑名属性指定的连接器的mongodb.name
配置属性,开云体育电动老虎机数据库名发生操作的数据库名称和开云体育电动老虎机collectionName是受影响文档所在的MongoDB集合的名称。
例如,考虑一个MongoDB副本集库存
开云体育电动老虎机数据库包含四个集合:产品
,products_on_hand
,客户
,订单
.如果监视此数据库的连接器被赋予的逻辑名称为开云体育电动老虎机实现
,那么连接器将在这四个Kafka主题上产生事件:
fulfillment.inventory.products
fulfillment.inventory.products_on_hand
fulfillment.inventory.customers
fulfillment.inventory.orders
注意,主题名称不包含复制集名称或碎片名称。因此,对切分集合(其中每个切分包含集合文档的子集)的所有更改都转到相同的Kafka主题。
你可以设置卡夫卡自动创建根据需要选择题目。如果不是,那么你必须在启动连接器之前使用Kafka管理工具来创建主题。
分区
MongoDB连接器不显式地确定事件的主题分区。相反,它允许Kafka根据键来确定分区。你可以改变Kafka的分区逻辑,方法是在Kafka Connect worker配置中定义瓜分者
实现。
Kafka只维护写入单个主题分区的事件的总顺序。按键划分事件意味着具有相同键的所有事件总是进入相同的分区。这确保了特定文档的所有事件总是完全有序的。
数据变更事件
Debe开云体育官方注册网址zium MongoDB连接器为插入、更新或删除数据的每个文档级操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于所更改的集合。
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。
下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式
字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器并配置它来生成所有四个基本的更改事件部分,则更改事件的结构如下:
{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
项 | 字段名 | 描述 |
---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3. |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始集合相同。看到主题名称.
MongoDB连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符必须是拉丁字母、数字或下划线,即a-z、a-z、0-9或\_。开云体育电动老虎机如果存在无效字符,则将其替换为下划线字符。 如果逻辑服务器名称、数据库名称或集合名称包含无效字符,并且区分名称之间的唯一字符无效,因此用下划线替换,则可能导致意外冲突。开云体育电动老虎机 |
更改事件键
更改事件的键包含已更改文档的键的模式和已更改文档的实际键。对于给定的集合,模式及其相应的有效负载都包含一个单一的id
字段。此字段的值是文档的标识符,表示为派生的字符串MongoDB扩展JSON序列化的严格模式.
考虑逻辑名为的连接器实现
对象的复制集库存
开云体育电动老虎机数据库,以及客户
集合,其中包含以下文档。
{"_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org"}
对象的更改的每个更改事件客户
集合具有相同的事件键模式。只要客户
集合具有前面的定义,则捕获对的更改的每个更改事件客户
集合具有以下关键结构。在JSON中,它看起来是这样的:
{"schema": {(1)"type": "struct", "name": "fulfillment.inventory.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){“字段”:“id”,“类型”:“弦”、“可选”:假}},“有效载荷”:{(5)"id": "1004"}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容 |
2 |
|
定义键的有效负载结构的模式的名称。此模式描述已更改文档的键的结构。关键模式名具有该格式connector-name.开云体育电动老虎机数据库名称.集合名称.
|
3. |
|
指示事件键是否必须在其中包含值 |
4 |
|
属性中期望的每个字段 |
5 |
|
包含为其生成此更改事件的文档的键。在本例中,键包含一个单键 |
本例使用一个具有整数标识符的文档,但任何有效的MongoDB文档标识符都以相同的方式工作,包括文档标识符。对于文档标识符,事件键的值payload.id
Value是一个字符串,表示更新后的文档的原始内容_id
字段作为使用严格模式的MongoDB扩展JSON序列化。下表提供了不同类型的_id
字段被表示出来。
类型 | MongoDB_id 价值 |
关键的负载 |
---|---|---|
整数 |
1234 |
|
浮动 |
12.34 |
|
字符串 |
“1234” |
|
文档 |
|
|
ObjectId |
|
|
二进制 |
|
|
更改事件值
change事件中的值比键稍微复杂一些。和键一样,值也有模式
Section和a有效载荷
部分。的模式
类的模式信封
的结构有效载荷
节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。
考虑用于显示更改事件键示例的相同示例文档:
{"_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org"}
本文档更改的更改事件的值部分针对每种事件类型进行了描述:
创建事件
对象中创建数据的操作所生成的更改事件的值部分,示例如下客户
集合:
{"schema": {(1)“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”,开云体育官方注册网址(2)“版本”:1、“字段”:“后”},{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“补丁”},{“开云体育官方注册网址类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“过滤器”},{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:false, "field": "ts_ms"}, {"type": "boolean", "optional": true, "default": false, "field": "snapshot"}, {"type": "string", "optional": false, "field": "rs"}, {"type": "string", "optional": false, "field": "collection"}, {"type": "int32", "optional": false, "field": "ord"}, {"type": "int64", "optional": true, "field": "h"}], "optional": " false ", "optional": "h"}], "optional": false, "io. debezum .connector.mon . source "开云体育官方注册网址,(3)"field": "source"}, {"type": "string", "optional": true, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "dbserver1.inventory.customers.Envelope"(4)}, "有效载荷":{(5)“后”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“安妮\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,(6)"patch": null, "source": {(7)“版本”:“1.4.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999}, "op": "c",(8)“ts_ms”:1558965515240(9)}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
值的模式,它描述值的有效负载的结构。连接器为特定集合生成的每个更改事件中,更改事件的值模式都是相同的。 |
2 |
|
在 |
3. |
|
|
4 |
|
|
5 |
|
该值为实际数据。这是变更事件提供的信息。 |
6 |
|
可选字段,指定事件发生后文档的状态。在本例中, |
7 |
|
描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:
|
8 |
|
返回string,描述导致连接器产生事件的操作类型。在这个例子中,
|
9 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
更新事件
样例中更新的更改事件的值客户
集合的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。一个更新事件没有后
价值。相反,它有以下两个字段:
补丁
是一个包含幂等更新操作的JSON表示的字符串字段过滤器
是一个字符串字段,其中包含更新选择标准的JSON表示形式。的过滤器
字符串可以包含多个分片集合的分片键字段。
中的更新中连接器生成的事件中的更改事件值的示例客户
集合:
{"schema":{…}, "payload": {"op": "u",(1)“ts_ms”:1465491461815,(2)“补丁”:“{\“\”美元:{\“first_name \”,\“安妮玛丽\}}”,(3)“过滤器”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”}}”,(4)“源”:{(5)“版本”:“1.4.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
返回string,描述导致连接器产生事件的操作类型。在这个例子中, |
2 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
3. |
|
包含对文档的实际MongoDB幂等变化的JSON字符串表示。在本例中,更新更改了 |
4 |
|
包含MongoDB选择标准的JSON字符串表示形式,用于标识要更新的文档。 |
5 |
|
描述事件的源元数据的必填字段。该字段包含与a相同的信息创建事件,但是值不同,因为该事件来自oplog中的不同位置。源元数据包括:
|
在Debe开云体育官方注册网址zium更改事件中,MongoDB提供 |
在MongoDB的博客中,更新事件不包含之前或后已更改文档的状态。因此,Debezium连接器不可能提供此信息。开云体育官方注册网址但是,Debezium开云体育官方注册网址连接器提供文档的开始状态创建而且读事件。流的下游消费者可以通过保留每个文档的最新状态并将新事件中的状态与保存的状态进行比较来重建文档状态。开云体育官方注册网址Debezium连接器不能保持这种状态。 |
删除事件
的值删除改变事件有相同之处模式
部分为创建而且更新事件。的有效载荷
的一部分删除事件包含的值与创建而且更新事件。特别地,删除事件既不包含后
价值也不是补丁
价值。这里有一个例子删除属性中的文档的客户
集合:
{"schema":{…}, "payload": {"op": "d",(1)“ts_ms”:1465495462115,(2)“过滤器”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”}}”,(3)“源”:{(4)“版本”:“1.4.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
项 | 字段名 | 描述 |
---|---|---|
1 |
|
返回string,描述操作类型。的 |
2 |
|
可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。 |
3. |
|
包含用于识别要删除的文档的MongoDB选择标准的JSON字符串表示形式。 |
4 |
|
描述事件的源元数据的必填字段。该字段包含与a相同的信息创建或更新事件,但是值不同,因为该事件来自oplog中的不同位置。源元数据包括:
|
MongoDB连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。
唯一标识文档的所有MongoDB连接器事件都具有完全相同的键。删除文档时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有该键的消息,消息值必须为零
.为了实现这一点,在Debezium的MongoDB连接器发开云体育官方注册网址出一个删除事件时,连接器发出一个特殊的墓碑事件,该事件具有相同的键,但具有零
价值。墓碑事件通知Kafka所有具有相同键的消息都可以被删除。
事务的元数据
开云体育官方注册网址Debezium可以生成表示事务元数据边界和丰富数据消息的事件。
事务边界
开云体育官方注册网址Debezium为每个事务生成事件开始
而且结束
.每个事件包含
状态
-开始
或结束
id
唯一事务标识符的字符串表示event_count
(结束
事件)——事务发出的事件总数data_collections
(结束
事件)-对的数组data_collection
而且event_count
它提供了由来自给定数据收集的更改所发出的事件数
下面是一条消息的示例:
{"status": "BEGIN", "id": "1462833718356672513", "event_count": null, "data_collections": null} {"status": "END", "id": "1462833718356672513", "event_count": 2, "data_collections": [{"data_collection": "rs0.testDB. "tablea", "event_count": 1}, {"data_collection": "rs0.testDB. tablea", "event_count": 1},Tableb ", "event_count": 1}]}
事务事件被写入指定的主题<开云体育电动老虎机 database.server.name > .transaction
.
数据事件丰富
启用事务元数据时,数据消息信封
是充实了新的事务
字段。这个字段以复合字段的形式提供关于每个事件的信息:
id
唯一事务标识符的字符串表示total_order
-该事件在事务生成的所有事件中的绝对位置data_collection_order
-该事件在事务发出的所有事件中的每数据收集位置
下面是一条消息的示例:
{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…}, "op": "c", "ts_ms": "1580390884335", "transaction": {"id": "1462833718356672513", "total_order": "1", "data_collection_order": "1"}}
部署
要部署Debezium 开云体育官方注册网址MongoDB连接器,您需要安装Debezium MongoDB连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。
MongoDB已经安装完成与Debezium连接器一起工作开云体育官方注册网址.
下载连接器的插件存档,
将JAR文件解压缩到Kafka Connect环境中。
将包含JAR文件的目录添加到卡夫卡连接的
plugin.path
.重新启动Kafka Connect进程以获取新的JAR文件。
如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Apache Zookeeper, Apache Kafka和Kafka Connect, MongoDB连接器已经安装并准备运行。
的Deb开云体育官方注册网址ezium教程引导您使用这些图像,这是了解Debezium的好方法。开云体育官方注册网址
MongoDB连接器配置示例
下面是一个从MongoDB副本集中捕获数据的连接器实例的配置示例rs0
在192.168.99.100的27017端口,我们在逻辑上命名它fullfillment
.通常,通过设置连接器可用的配置属性,可以在JSON文件中配开云体育官方注册网址置Debezium MongoDB连接器。
您可以选择为特定的MongoDB复制集或分片集群生成事件。您还可以选择过滤掉不需要的集合。
{"name": "inventory-connector",(1)"config": {"connector.class": "io.d开云体育官方注册网址ebezium.connector.mongodb.MongoDbConnector",(2)“mongodb。hosts": "rs0/192.168.99.100:27017",(3):“mongodb.name fullfillment”,(4)“collection.include。库存清单”:“。*”,(5)}}
1 | 当我们向Kafka Connect服务注册连接器时,连接器的名称。 |
2 | MongoDB连接器类的名称。 |
3. | 用于连接到MongoDB复制集的主机地址。 |
4 | 的逻辑名它为生成的事件形成了一个命名空间,并用于连接器写入的所有Kafka主题的名称、Kafka Connect模式名称以及使用Avro转换器时对应的Avro模式的名称空间。 |
5 | 与要监控的所有集合的命名空间(例如 |
有关可以为Debezium MongoDB连接器设置的配置属性的完整列表,请参见开云体育官方注册网址MongoDB连接器配置属性.
您可以使用帖子
命令到正在运行的Kafka Connect服务。该服务记录配置并启动一个执行以下操作的连接器任务:
连接MongoDB复制集或分片集群。
为每个副本集分配任务。
如果需要,执行快照。
阅读博客。
流将事件记录更改为Kafka主题。
添加连接器配置
要开始运行Debezium Mongo开云体育官方注册网址DB连接器,请创建一个连接器配置,并将该配置添加到Kafka Connect集群。
已安装D开云体育官方注册网址ebezium MongoDB连接器。
为MongoDB连接器创建配置。
使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。
当连接器启动时,它完成以下操作:
执行一致性快照在你的MongoDB复制集集合。
读取复制集的oplogs。
为每个插入、更新和删除的文档生成更改事件。
流将事件记录更改为Kafka主题。
连接器属性
以下配置属性为要求除非有默认值可用。
财产 | 默认的 | 描述 |
---|---|---|
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。) |
||
连接器的Java类的名称。始终使用值 |
||
复制集中MongoDB服务器的主机名和端口对列表(以'host'或'host:port'的形式),以逗号分隔。该列表可以包含单个主机名和端口对。如果 |
||
一个唯一的名称,用于标识连接器和/或该连接器监视的MongoDB副本集或分片集群。每个服务器应该由最多一个Debezium连接器监控,因为这个服务器名前缀了所有来自MongoD开云体育官方注册网址B副本集或集群的持久化Kafka主题。只能使用字母数字字符和下划线。 |
||
连接MongoDB时使用开云体育电动老虎机的数据库用户名。只有当MongoDB配置为使用身份验证时,才需要这样做。 |
||
连接MongoDB时使用的密码。只有当MongoDB配置为使用身份验证时,才需要这样做。 |
||
|
开云体育电动老虎机包含MongoDB凭证的数据库(身份验证源)。只有当MongoDB配置为使用与另一个身份验证数据库的身份验证时,才需要这样做开云体育电动老虎机 |
|
|
连接器将使用SSL连接到MongoDB实例。 |
|
|
启用SSL时,此设置控制在连接阶段是否禁用严格的主机名检查。如果 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,匹配要监控的数据库名称;开云体育电动老虎机中未包含开云体育电动老虎机的任何数据库名称 |
|
空字符串 |
可选的逗号分隔的正则表达式列表,匹配要排除在监视之外的数据库名称;开云体育电动老虎机中未包含开云体育电动老虎机的任何数据库名称 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,它与要监控的MongoDB集合的完全限定名称空间匹配;未包含的集合 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,该列表与MongoDB集合的完全限定名称空间匹配,将被排除在监控之外;未包含的集合 |
|
|
指定在连接器启动时运行快照的条件。默认为最初的,并指定连接器在没有发现偏移量或oplog不再包含以前的偏移量时读取快照。的从来没有选项指定连接器永远不应该使用快照,相反,连接器应该继续跟踪日志。 |
|
中指定的所有集合 |
中指定的模式名称匹配的可选的、以逗号分隔的正则表达式列表 |
|
空字符串 |
应从更改事件消息值中排除的字段的全限定名称的可选列表,以逗号分隔。字段的完全限定名的格式为开云体育电动老虎机数据库名.collectionName.字段名.nestedFieldName,在那里开云体育电动老虎机数据库名而且collectionName可以包含与任何字符匹配的通配符(*)。 |
|
空字符串 |
一个可选的、以逗号分隔的字段完全限定替换列表,用于重命名更改事件消息值中的字段。字段的完全合格替换是这样的开云体育电动老虎机数据库名.collectionName.字段名.nestedFieldName:newNestedFieldName,在那里开云体育电动老虎机数据库名而且collectionName可以包含与任何字符匹配的通配符(*),冒号(:)用于确定字段的重命名映射。下一个字段替换应用于列表中前一个字段替换的结果,因此在重命名位于同一路径的多个字段时请记住这一点。 |
|
|
应该为此连接器创建的最大任务数。MongoDB连接器将尝试为每个副本集使用单独的任务,因此在使用单个MongoDB副本集的连接器时,默认值是可接受的。当使用MongoDB分片集群的连接器时,我们建议指定一个等于或大于集群中分片数量的值,这样每个副本集的工作就可以通过Kafka Connect分配。 |
|
|
正整数值,指定用于对复制集中的集合执行初始同步的最大线程数。默认值为1。 |
|
|
控制是否在删除事件之后生成墓碑事件。 |
|
连接器启动后在快照之前应该等待的毫秒间隔; |
||
|
指定在拍摄快照时应从每个集合中一次性读取的最大文档数。连接器将以这个大小的多个批次读取集合内容。 |
以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。
财产 | 默认的 | 描述 |
---|---|---|
|
正整数值,指定阻塞队列的最大大小,从数据库日志中读取的更改事件在写入Kafka之前被放置在其中。开云体育电动老虎机例如,当写入Kafka较慢或Kafka不可用时,该队列可以为oplog读取器提供反压力。出现在队列中的事件不包括在此连接器定期记录的偏移量中。属性中指定的最大批处理大小,默认值为8192 |
|
|
正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。 |
|
|
阻塞队列最大大小(以字节为单位)的长值。默认情况下,该功能是禁用的,如果设置为正长值,则该功能将激活。 |
|
|
正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|
|
正整数值,指定在第一次连接尝试失败或没有主节点可用时试图重新连接到主节点时的初始延迟。缺省值为1秒(1000毫秒)。 |
|
|
正整数值,指定在多次连接尝试失败或没有主服务器可用时试图重新连接到主服务器时的最大延迟。默认值为120秒(120,000毫秒)。 |
|
|
正整数值,指定在发生异常和任务中止之前尝试连接主副本集失败的最大次数。默认值为16 |
|
|
布尔值,用于指定mongodb中的地址。hosts' are seeds that should be used to discover all members of the cluster or replica set ( |
|
v2 |
的架构版本 |
|
|
控制心跳消息的发送频率。 将此参数设置为 |
|
|
控制要向其发送心跳消息的主题的命名。 |
|
|
字段名是否被净化以符合Avro命名要求。看到Avro命名欲知详情。 |
|
以逗号分隔的oplog操作列表,将在流处理期间跳过。操作包括: |
||
控制快照中包含哪些集合项。此属性仅影响快照。在表单中指定以逗号分隔的集合名称列表开云体育电动老虎机databaseName.collectionName. 对于指定的每个集合,还要指定另一个配置属性: |
||
|
当设置为 看到事务的元数据更多细节。 |
|
10000(10秒) |
在发生可检索错误后重新启动连接器之前等待的毫秒数。 |
|
|
连接器轮询新的、删除的或更改的副本集的时间间隔。 |
|
10000(10秒) |
驱动程序在放弃新的连接尝试之前等待的毫秒数。 |
|
0 |
套接字上的发送/接收在超时发生之前所需要的毫秒数。值为 |
|
30000(30秒) |
驱动程序在超时并抛出错误之前等待选择服务器的毫秒数。 |
监控
Debe开云体育官方注册网址zium MongoDB连接器除了内置支持Zookeeper、Kafka和Kafka Connect的JMX指标外,还有两种指标类型。
详情请参阅监控文档了解如何通过JMX公开这些指标的详细信息。
快照指标
的MBean是开云体育官方注册网址debezium.mongodb: type = connector-metrics上下文=快照,server =< mongodb.name >
.
属性 | 类型 | 描述 |
---|---|---|
|
连接器读取的最后一个快照事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已由连接器上配置的包含/排除列表筛选规则筛选的事件数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
快照中包含的表的总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否启动。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。 |
|
|
映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。 |
|
|
队列的最大缓冲区,以字节为单位。如果 |
|
|
队列中记录的当前数据,以字节为单位。 |
Debe开云体育官方注册网址zium MongoDB连接器还提供了以下自定义快照指标:
属性 | 类型 | 描述 |
---|---|---|
|
|
数据库断开数。开云体育电动老虎机 |
流指标
的MBean是开云体育官方注册网址debezium.sql_server: type = connector-metrics、上下文=流媒体服务器=< mongodb.name >
.
属性 | 类型 | 描述 |
---|---|---|
|
连接器读取的最后一个流事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已由连接器上配置的包含/排除列表筛选规则筛选的事件数。 |
|
|
连接器监视的表的列表。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机 |
|
|
从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机 |
|
|
提交的已处理事务的数量。 |
|
|
上次接收事件的坐标。 |
|
|
最后处理的事务的事务标识符。 |
|
|
队列的最大缓冲区,以字节为单位。 |
|
|
队列中记录的当前数据,以字节为单位。 |
Debe开云体育官方注册网址zium MongoDB连接器还提供了以下自定义流指标:
属性 | 类型 | 描述 |
---|---|---|
|
|
数据库断开数。开云体育电动老虎机 |
|
|
主节点选举的个数。 |
MongoDB连接器常见问题
开云体育官方注册网址Debezium是一个分布式系统,它捕获多个上游数据库中的所有更改,并且永远不会错过或丢失事件。开云体育电动老虎机当然,当系统在名义上运行或被仔细管理时,Debezium可以提供开云体育官方注册网址只有一天交付每个变更事件。但是,如果确实发生了错误,那么系统仍然不会丢失任何事件,尽管当它从错误中恢复时,它可能会重复一些更改事件。因此,在这些不正常的情况下,Debezium(像Kafka)提供了开云体育官方注册网址至少一次变更事件的交付。
本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址
配置和启动错误
连接器将在启动时失败,在日志中报告错误/异常,当连接器的配置无效时,或当连接器使用指定的连接参数多次连接到MongoDB失败时,连接器将停止运行。重新连接使用指数回退完成,并且最大尝试次数是可配置的。
在这些情况下,错误将提供有关问题的更多详细信息,并可能提供建议的解决方法。当配置已经纠正或MongoDB问题已经解决时,可以重新启动连接器。
MongoDB不可用
一旦连接器开始运行,如果任何MongoDB副本集的主节点变得不可用或不可达,连接器将重复尝试重新连接到主节点,使用指数回退来防止网络或服务器饱和。如果在可配置的连接尝试次数之后,主服务器仍然不可用,则连接器将失败。
重新连接的尝试由三个属性控制:
connect.backoff.initial.delay.ms
-第一次尝试重新连接之前的延迟,默认为1秒(1000毫秒)。connect.backoff.max.delay.ms
-尝试重新连接之前的最大延迟,默认为120秒(120,000毫秒)。connect.max.attempts
—产生错误前的最大尝试次数,默认为16次。
每次延迟都是之前延迟的两倍,直到最大延迟。如果使用默认值,下表显示了每次失败连接尝试的延迟和失败前的累计总时间。
重新连接次数 | 尝试前延迟,以秒为单位 | 尝试前的总延迟,以分钟和秒为单位 |
---|---|---|
1 |
1 |
00:01 |
2 |
2 |
00:03 |
3. |
4 |
00:07 |
4 |
8 |
00:15 |
5 |
16 |
00:31 |
6 |
32 |
01:03 |
7 |
64 |
02:07 |
8 |
120 |
04:07 |
9 |
120 |
06:07 |
10 |
120 |
08:07 |
11 |
120 |
10:07 |
12 |
120 |
12:07 |
13 |
120 |
14:07 |
14 |
120 |
16:07 |
15 |
120 |
18:07 |
16 |
120 |
20:07 |
Kafka Connect进程优雅地停止
如果Kafka Connect正在以分布式模式运行,并且一个Kafka Connect进程被优雅地停止,那么在关闭该进程之前,Kafka Connect将把该进程的所有连接器任务迁移到该组中的另一个Kafka Connect进程中,新的连接器任务将准确地拾取先前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。
如果组只包含一个进程,并且该进程被优雅地停止,那么Kafka Connect将停止连接器并记录每个副本集的最后偏移量。在重新启动时,副本集任务将继续在他们离开的地方。
Kafka连接进程崩溃
如果Kafka连接器进程意外停止,那么它正在运行的任何连接器任务都将终止,而不会记录它们最近处理的偏移量。当Kafka Connect以分布式模式运行时,它将重新启动其他进程上的连接器任务。但是,MongoDB连接器将从最后一个偏移量恢复记录通过较早的流程,这意味着新的替换任务可能会生成一些与崩溃之前处理的相同的更改事件。重复事件的数量取决于偏移刷新周期和崩溃前更改的数据量。
由于在故障恢复过程中可能会出现重复的事件,因此使用者应该始终预测到某些事件可能会重复。开云体育官方注册网址Debezium的变化是幂等的,所以一系列事件的结果总是相同的状态。 开云体育官方注册网址Debezium还在每个更改事件消息中包含关于事件起源的源特定信息,包括MongoDB事件的唯一事务标识符( |
Kafka不可用
当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。Kafka Connect还会定期记录在这些更改事件中出现的最新偏移量,频率由你在Kafka Connect worker配置中指定。如果Kafka代理变得不可用,运行连接器的Kafka Connect工作进程将简单地重复尝试重新连接到Kafka代理。换句话说,连接器任务将简单地暂停,直到重新建立连接,此时连接器将完全从它们停止的地方恢复。
连接器停止一段时间
如果连接器被优雅地停止,副本集可以继续使用,任何新的更改都会记录在MongoDB的oplog中。当连接器重新启动时,它将恢复它上次停止的每个副本集的流更改,记录连接器停止时所做的所有更改的更改事件。如果连接器停止的时间足够长,以至于MongoDB从它的oplog中清除一些连接器没有读取的操作,那么在启动连接器时,连接器将执行一个快照。
一个正确配置的Kafka集群能够提供巨大的吞吐量。Kafka Connect是用Kafka最佳实践编写的,如果有足够的资源,它也能够处理大量的数据库更改事件。开云体育电动老虎机正因为如此,当一个连接器在一段时间后重新启动时,它很可能会赶上数据库,尽管多快将取决于Kafka的能力和性能以及MongoDB中对数据的更改量。开云体育电动老虎机
如果连接器停止的时间足够长,MongoDB可能会清除旧的oplog文件,连接器的最后一个位置可能会丢失。在这种情况下,当连接器配置为最初的快照模式(默认)最终重新启动,MongoDB服务器将不再有起点,连接器将失败并报错。 |