开云体育官方注册网址Debezium连接器用于MongoDB
开云体育官方注册网址Debezium的MongoDB连接器跟踪MongoDB副本集或MongoDB分片集群,以记录数据库和集合中的文档更改,并将这些更改记录为Kafka主题中的事件。开云体育电动老虎机连接器自动处理分片集群中分片的添加或删除、每个副本集成员的更改、每个副本集中的选举以及等待通信问题的解决。
概述
MongoDB的复制机制提供了冗余和高可用性,是在生产环境中运行MongoDB的首选方式。MongoDB连接器捕获复制集或分片集群中的更改。
一个MongoDB副本集由一组服务器组成,这些服务器都有相同数据的副本,复制确保客户端对副本集中的文档所做的所有更改主要的正确地应用于其他复制集的服务器,称为次要的人.MongoDB复制的工作原理是让主服务器记录其数据库中的更改oplog(或操作日志),然后每个辅助服务器读取主服务器的oplog,并按顺序将所有操作应用于它们自己的文档。当一个新的服务器被添加到一个副本集时,该服务器首先执行一个快照然后读取主数据库的oplog开云体育电动老虎机,以应用自快照开始以来可能发生的所有更改。当这个新服务器赶上主服务器的oplog的尾部时,它将成为辅助服务器(并能够处理查询)。
MongoDB连接器使用相同的复制机制,尽管它实际上并没有成为复制集的成员。然而,就像MongoDB从服务器一样,连接器总是读取复制集主服务器的oplog。并且,当连接器第一次看到一个副本集时,它会查看oplog以获得最后记录的事务,然后执行主数据库和集合的快照。开云体育电动老虎机复制完所有数据后,连接器就开始从之前从oplog中读取的位置开始流化更改。MongoDB oplog中的操作为幂等,因此无论应用多少次操作,结果都是相同的结束状态。
当MongoDB连接器进程发生变化时,它会定期记录事件起源于oplog中的位置。当MongoDB连接器停止时,它会记录它处理过的最后一个oplog位置,因此在重新启动时,它只是从该位置开始流。换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,它将准确地从停止的地方恢复,而不会丢失任何事件。当然,MongoDB的oplog通常被限制在最大大小,这意味着连接器不应该停止太长时间,否则在连接器有机会读取它们之前,oplog中的一些操作可能会被清除。在这种情况下,在重新启动时,连接器将检测缺失的oplog操作,执行快照,然后继续对更改进行流式处理。
MongoDB连接器还可以容忍副本集的成员和领导、分片集群中分片的添加或删除以及可能导致通信故障的网络问题的更改。连接器总是使用副本集的主节点来流化更改,因此当副本集经历一次选举并且不同的节点成为主节点时,连接器将立即停止流化更改,连接到新的主节点,并使用新的主节点开始流化更改。同样地,如果连接器遇到任何与复制集主通信的问题,它将尝试重新连接(使用指数回退,以避免淹没网络或复制集),并继续从它上次离开的地方传输更改。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。
支持的MongoDB拓扑
MongoDB连接器可以与各种MongoDB拓扑一起使用。
MongoDB副本集
MongoDB连接器可以从单个连接器中捕获更改MongoDB副本集.生产副本集需要最少的至少三名成员.
要使用带有副本集的MongoDB连接器,请提供一个或多个副本集服务器的地址为种子地址通过连接器mongodb.hosts
财产。连接器将使用这些种子连接到复制集,然后一旦连接,将从复制集获得完整的成员集以及哪个成员是主成员。连接器将启动连接到主服务器的任务,并从主服务器的oplog捕获更改。当副本集选择一个新的主副本时,任务将自动切换到新的主副本。
当MongoDB由代理连接时(例如在OS X或Windows上使用Docker),当客户端连接到副本集并发现成员时,MongoDB客户端将排除代理作为有效成员,并尝试直接连接到成员,而不是通过代理连接,但失败。 在这种情况下,将连接器设置为可选 |
MongoDB分片集群
一个MongoDB分片集群包括:
一个或多个碎片,每一个都作为复制集部署;
一个单独的副本集,充当集群的副本集配置服务器
一个或多个路由器(也称为
蒙戈
),客户端连接并将请求路由到适当的分片
要使用带分片集群的MongoDB连接器,请使用配置服务器副本集。当连接器连接到这个复制集时,它会发现自己正在充当一个分片集群的配置服务器,发现关于集群中用作分片的每个复制集的信息,然后启动一个单独的任务来捕获每个复制集的更改。如果向集群中添加了新的分片或删除了现有的分片,连接器将自动相应地调整其任务。
MongoDB独立服务器
MongoDB连接器不能监视独立MongoDB服务器的更改,因为独立服务器没有oplog。如果将独立服务器转换为具有一个成员的副本集,则连接器将正常工作。
MongoDB不推荐在生产环境中运行独立服务器。 |
MongoDB连接器如何工作
在配置和部署MongoDB连接器时,它首先连接到种子地址上的MongoDB服务器,并确定关于每个可用副本集的详细信息。由于每个复制集都有自己独立的oplog,连接器将尝试为每个复制集使用单独的任务。连接器可以限制它将使用的任务的最大数量,如果没有足够的任务可用,连接器将为每个任务分配多个副本集,尽管任务仍将为每个副本集使用单独的线程。
在对分片集群运行连接器时,使用值 |
逻辑连接器名称
连接器配置属性mongodb.name
作为逻辑名用于MongoDB副本集或分片集群。连接器以多种方式使用逻辑名:作为所有主题名的前缀,以及在记录每个副本集的oplog位置时作为唯一标识符。
您应该为每个MongoDB连接器提供一个唯一的逻辑名称,该名称可以有意义地描述源MongoDB系统。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。
执行快照
当一个任务使用复制集启动时,它使用连接器的逻辑名和复制集名来查找抵消它描述连接器先前停止读取更改的位置。如果可以找到一个偏移量,并且它仍然存在于oplog中,那么任务立即继续执行流的变化,从记录的偏移位置开始。
但是,如果没有发现偏移量,或者如果oplog不再包含该位置,任务必须首先通过执行命令获取复制集内容的当前状态快照.这个过程首先记录oplog的当前位置,并将其记录为偏移量(以及一个表示快照已经启动的标志)。然后,该任务将继续复制每个集合,生成尽可能多的线程(直到initial.sync.max.threads
配置属性)以并行地执行此工作。连接器将记录一个单独的读取事件对于它看到的每个文档,读取事件将包含对象的标识符、对象的完整状态和源关于该对象所在的MongoDB副本集的信息。源信息还将包括一个标志,表示快照期间产生的事件。
此快照将继续,直到复制了与连接器筛选器匹配的所有集合为止。如果连接器在任务的快照完成之前停止,重新启动时连接器将再次开始快照。
在连接器执行任何副本集的快照时,尽量避免重新分配和重新配置任务。连接器使用快照的进度记录消息。为了最大限度地控制,为每个连接器运行一个单独的Kafka Connect集群。 |
流的变化
一旦复制集的连接器任务有了偏移量,它就使用偏移量来确定在oplog中的位置,它应该从哪里开始流化更改。然后,该任务将连接到副本集的主节点,并从该位置开始流化更改,处理所有的创建、插入和删除操作,并将它们转换为Debezium开云体育官方注册网址更改事件.每个更改事件都包括在oplog中发现操作的位置,连接器定期将其记录为最近的偏移量。记录偏移量的间隔由offset.flush.interval.ms
,这是一个Kafka连接工人配置属性。
当连接器优雅地停止时,将记录处理的最后一个偏移量,以便重新启动时,连接器将准确地继续它停止的位置。然而,如果连接器的任务意外终止,那么任务可能在它最后一次记录偏移量之后,但在最后一次记录偏移量之前处理并生成了事件;重新启动时,连接器从最后一个开始记录偏移,可能会生成一些与之前在崩溃之前生成的事件相同的事件。
当一切正常运行时,Kafka消费者将真正看到每条消息只有一天.然而,当出现问题时,Kafka只能保证消费者能够看到每一条信息至少一次.因此,您的客户需要预期看到不止一次的消息。 |
如上所述,连接器任务总是使用复制集的主节点来传输来自oplog的更改,从而确保连接器尽可能看到最新的操作,并且能够以比使用辅助节点更低的延迟捕获更改。当副本集选择一个新的主节点时,连接器立即停止流化更改,连接到新的主节点,并从相同位置的新主节点开始流化更改。同样地,如果连接器遇到任何与复制集成员通信的问题,它会尝试重新连接,使用指数回退以避免淹没复制集,并且一旦连接,它将继续从它上次离开的地方进行流更改。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。
总之,MongoDB连接器在大多数情况下继续运行。通信问题可能导致连接器等待问题解决。
主题名称
MongoDB连接器将对每个集合中的文档的所有插入、更新和删除操作的事件写入到单个Kafka主题。卡夫卡主题的名称总是采用这种形式logicalName.开云体育电动老虎机数据库名.collectionName,在那里logicalName是逻辑名属性指定的连接器的mongodb.name
配置属性,开云体育电动老虎机数据库名发生操作的数据库名称和开云体育电动老虎机collectionName是受影响文档所在的MongoDB集合的名称。
例如,考虑一个MongoDB副本集库存
开云体育电动老虎机数据库包含四个集合:产品
,products_on_hand
,客户
,订单
.如果监视此数据库的连接器被赋予的逻辑名称为开云体育电动老虎机实现
,那么连接器将在这四个Kafka主题上产生事件:
fulfillment.inventory.products
fulfillment.inventory.products_on_hand
fulfillment.inventory.customers
fulfillment.inventory.orders
注意,主题名称不包含复制集名称或碎片名称。因此,对切分集合(其中每个切分包含集合文档的子集)的所有更改都转到相同的Kafka主题。
你可以设置卡夫卡自动创建根据需要选择题目。如果不是,那么你必须在启动连接器之前使用Kafka管理工具来创建主题。
分区
MongoDB连接器不显式地确定事件的主题分区。相反,它允许Kafka根据键来确定分区。你可以改变Kafka的分区逻辑,方法是在Kafka Connect worker配置中定义瓜分者
实现。
Kafka只维护写入单个主题分区的事件的总顺序。按键划分事件意味着具有相同键的所有事件总是进入相同的分区。这确保了特定文档的所有事件总是完全有序的。
事件
MongoDB连接器产生的所有数据更改事件都有一个键和一个值。
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流,如果这些事件的源在结构上发生了变化,或者连接器得到了改进或改变,那么这些事件的结构可能会随着时间的推移而发生变化。这对于消费者来说可能很难处理,所以Kafka Connect使得每个事件都是自包含的。每个消息键和值都有两部分模式而且有效载荷.模式描述有效负载的结构,而有效负载包含实际数据。
更改事件的键
对于给定的集合,更改事件的键包含单个id
字段。它的值是文档的标识符,表示为字符串,派生自MongoDB在严格模式下扩展JSON序列化.考虑逻辑名为的连接器实现
对象的复制集库存
开云体育电动老虎机数据库的客户
收藏的文件包括:
{"_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org"}
的每一个变化事件客户
collection将具有相同的键结构,在JSON中是这样的:
{"schema": {"type": "struct", "name": "fulfillment.inventory.customers. "关键”“可选”:假的,“字段”:[{”字段”:“id”、“类型”:“弦”、“可选”:假}]},“有效载荷”:{" id ": " 1004 "}}
的模式
密钥部分包含Kafka Connect模式,描述有效负载部分的内容。在这种情况下,它意味着有效载荷
值不是可选的,是由名为fulfillment.inventory.customers.Key
,并且有一个名为id
类型的字符串
.如果你看键的值有效载荷
字段,你可以看到它确实是一个结构(在JSON中只是一个对象)id
字段,其值为包含整数的字符串1004
.
这个例子使用了一个具有整数标识符的文档,但是任何有效的MongoDB文档标识符(包括文档)都可以工作。的值id
字段将只是一个字符串,表示原始文档的MongoDB扩展JSON序列化(严格模式)_id
字段。下面的几个例子说明了如何做到_id
不同类型的字段将被编码为事件键的有效负载:
类型 | MongoDB_id 价值 |
关键的负载 |
---|---|---|
整数 |
1234 |
|
浮动 |
12.34 |
|
字符串 |
“1234” |
|
文档 |
{“嗨”:“卡夫卡”、“num”:(10.0,100.0,1000.0)} |
|
ObjectId |
ObjectId(“596 e275826f08b2730779e1f”) |
|
二进制 |
BinData(“a2Fma2E = ", 0) |
|
MongoDB连接器确保所有Kafka连接模式名是有效的Avro模式名.这意味着逻辑服务器名称必须以拉丁字母或下划线开头(例如,[a-z, a-z, _]),逻辑服务器名称中的其余字符以及数据库和集合名称中的所有字符必须是拉丁字母、数字或下划线(例如,[a-z, a-z, 0-9,\_])。开云体育电动老虎机如果不是,则所有无效字符将自动替换为下划线字符。 当逻辑服务器名、数据库名和集合名包含其他字符时,这可能导致模式名中出现意外冲突,并且集合全名之间唯一的区分字符无效,因此被下划线取代。开云体育电动老虎机连接器尝试在这种情况下产生异常,但只有在单个连接器中使用的模式之间存在冲突时才会产生异常。 |
更改事件的值
更改事件消息的值稍微复杂一些。就像关键信息一样,它有一个模式节和有效载荷部分。MongoDB连接器产生的每个更改事件值的有效负载部分都有一个信封结构,使用以下字段:
人事处
必选字段,包含描述操作类型的字符串值。MongoDB连接器的值为c
对于创建(或插入),u
对于更新,d
对于删除,和r
用于读取(在快照的情况下)。后
是可选字段,如果存在则包含文档的状态后事件发生了。MongoDB的oplog条目只包含一个文档的完整状态创建事件,这些是唯一包含后字段。源
是一个必选字段,包含描述事件源元数据的结构,在MongoDB中包含几个字段:Debezium版本、逻辑名、副本集的名称、集合的名称空间、事件发生的MongoDB时间戳(以及时间戳内事件的序号)、MongoDB操作的标识符(例如开云体育官方注册网址h
字段),以及初始同步标志(如果事件是在快照期间发生的)。ts_ms
是可选的,如果存在,则包含连接器处理事件的时间(使用运行Kafka Connect任务的JVM中的系统时钟)。
当然,还有模式事件消息值的一部分包含描述此信封结构及其内嵌套字段的模式。
我们来看看a创建/读事件值可能看起来像客户
集合:
{"模式":{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“之后”},{“类型”:开云体育官方注册网址“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“补丁”},{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“过滤器”},{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假,“场”:“连接器”},{“类型”:“字符串”,“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“弦”、“可选”:假的,“场”:“分贝”},{“类型”:“弦”、“可选”:假的,“场”:“rs”},{“类型”:“弦”、“可选”:假的,“场”:“集合”},{“类型”:“int32”、“可选”:假的,“场”:“奥德”},{“类型”:“int64”、“可选”:true, "field": "h"}], "optional": false, "name": "io. d开云体育官方注册网址ebezum .connector.mongo. source", "field": "source"}, {"type": "string", "optional": true, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "dbserver1. stock .customers。信封”},“有效载荷”:{“后”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“安妮\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,“补丁”:空,“源”:{“版本”:“1.1.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999}, "op": "r", "ts_ms": 1558965515240}}
如果我们看模式
事件的一部分价值的模式信封的模式特定于集合,而源
结构(特定于MongoDB连接器并在所有事件中重用)。还要注意后
value总是一个字符串,并且按照约定它将包含文档的JSON表示形式。
如果我们看有效载荷
事件的一部分价值,我们可以看到事件中的信息,即它描述文档作为快照的一部分被读取(因为op = r
而且快照= true
),以及后
字段值包含文档的JSON字符串表示形式。
事件的JSON表示形式似乎比它们描述的行要大得多。这是正确的,因为JSON表示必须包含模式和有效载荷部分信息。这是可能的,甚至建议使用Avro转换器以大幅减少写入Kafka主题的实际消息的大小。 |
的值更新此集合上的Change事件将具有完全相同的内容模式,其有效负载的结构相同,但将持有不同的值。具体来说,更新事件将没有后
值,将会有一个补丁
字符串,包含幂等更新操作的JSON表示形式和过滤器
字符串,包含更新选择标准的JSON表示形式。的过滤器
字符串可以包含多个分片集合的分片键字段。这里有一个例子:
{"schema":{…},“有效载荷”:{“人事处”:“u”,“ts_ms”:1465491461815,“补丁”:“{\“\”美元:{\“first_name \”,\“安妮玛丽\}}”,“过滤器”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”}}”,“源”:{“版本”:“1.1.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
当我们把这个和插入事件中,我们看到了一些不同有效载荷
部分:
的
人事处
字段值现在是u
,表示此文档因更新而更改的
补丁
字段出现,并具有对文档的实际MongoDB幂等性更改的字符串化JSON表示,在本例中涉及设置first_name
字段转换为新值的
过滤器
字段出现,并且有用于更新的MongoDB选择标准的字符串化JSON表示的
后
字段不再出现的
源
字段结构具有与以前相同的字段,但值有所不同,因为此事件来自oplog中的不同位置的
ts_ms
显示了Debezium处理此事件的时间戳开云体育官方注册网址
补丁字段的内容由MongoDB自己提供,具体格式取决于具体的数据库版本。开云体育电动老虎机因此,在将MongoDB实例升级到新版本时,您应该为格式的潜在变化做好准备。 本文档中的所有示例都来自MongoDB 3.4,如果使用不同的示例可能会有所不同。 |
MongoDB的oplog中的更新事件没有之前或后已更改文档的状态,因此连接器无法提供此信息。然而,由于创建或读事件做如果包含开始状态,流的下游消费者实际上可以通过为每个文档保留最新状态并将每个事件应用于该状态来完全重建状态。开云体育官方注册网址Debezium连接器不能保持这样的状态,所以它不能这样做。 |
到目前为止,您已经看到了创建/读而且更新事件。下面的示例显示了a的值删除事件。a的值删除事件在此集合上具有完全相同的模式,它的有效载荷结构相同,但它持有不同的值。具体地说,删除事件没有后
价值也不是补丁
值:
{"schema":{…},“有效载荷”:{“人事处”:“d”、“ts_ms”:1465495462115,“过滤器”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”}}”,“源”:{“版本”:“1.1.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
当我们将此值与其他事件中的值进行比较时,我们可以看到有效载荷
部分:
的
人事处
字段值现在是d
,表示该文档已被删除的
补丁
字段不出现的
后
字段不出现的
过滤器
字段出现,并且有用于删除的MongoDB选择标准的字符串化JSON表示的
源
字段结构具有与以前相同的字段,但值有所不同,因为此事件来自oplog中的不同位置的
ts_ms
显示了Debezium处理此事件的时间戳开云体育官方注册网址
MongoDB连接器提供了另一种事件。每一个删除事件后面跟着墓碑上属性具有相同键的事件删除事件但是零
价值。这为Kafka提供了运行它所需的信息日志压实移除机制所有用那个键发送消息。
所有的MongoDB连接器事件都是设计用于工作的Kafka对数压缩,只要每个键至少保留最近的消息,就可以删除旧消息。这就是Kafka如何回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。 一个唯一标识文档的所有MongoDB连接器事件都有完全相同的键,向Kafka发出信号,只保留最新的事件。一个墓碑事件告诉卡夫卡所有可以删除具有相同键的消息。 |
事务的元数据
开云体育官方注册网址Debezium可以生成表示事务元数据边界和丰富数据消息的事件。
事务边界
开云体育官方注册网址Debezium为每个事务生成事件开始
而且结束
.每个事件包含
状态
-开始
或结束
id
唯一事务标识符的字符串表示event_count
(结束
事件)——事务发出的事件总数data_collections
(结束
事件)-对的数组data_collection
而且event_count
它提供了由来自给定数据收集的更改所发出的事件数
下面是一条消息的示例:
{"status": "BEGIN", "id": "00000025:00000d08:0025", "event_count": null, "data_collections": null} {"status": "END", "id": "00000025:00000d08:0025", "event_count": 2, "data_collections": [{"data_collection": "rs0.testDB. "tablea", "event_count": 1}, {"data_collection": "rs0.testDB. tablea", "event_count": 1},Tableb ", "event_count": 1}]}
事务事件被写入指定的主题<开云体育电动老虎机 database.server.name > .transaction
.
数据事件丰富
启用事务元数据时,数据消息信封
是充实了新的事务
字段。这个字段以复合字段的形式提供关于每个事件的信息:
id
唯一事务标识符的字符串表示total_order
-该事件在事务生成的所有事件中的绝对位置data_collection_order
-该事件在事务发出的所有事件中的每数据收集位置
下面是一条消息的示例:
{“前”:零,“后”:{“pk”:“2”,“aa”:“1”},“源”:{…},“人事处”:“c”、“ts_ms”:“1580390884335”,“交易”:{" id ":“00000025:00000d08:0025”、“total_order”:“1”,“data_collection_order”:" 1 "}}
部署MongoDB连接器
如果您已经安装了动物园管理员,卡夫卡,卡夫卡连接,那么使用Debezium开云体育官方注册网址的MongoDB连接器就很容易了。只需下载连接器的插件存档,将jar文件解压到Kafka Connect环境中,并将包含jar文件的目录添加到Kafka Connect环境中plugin.path
通过使用plugin.path配置属性。重新启动Kafka Connect进程以获取新的jar。
如果你喜欢不可变容器,那就试试吧开云体育官方注册网址Debezium的Docker图片对于Zookeeper, Kafka和Kafka Connect, MongoDB连接器已经预安装并准备好了。我们的教程甚至引导您使用这些图像,这是了解Debezium的一个很好的方法。开云体育官方注册网址你甚至可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium.
示例配置
要使用连接器为特定的MongoDB复制集或分片集群产生更改事件,请以JSON格式创建一个配置文件。当连接器启动时,它将对MongoDB复制集中的集合执行快照,并开始读取复制集的oplogs,为每个插入、更新和删除的行生成事件。可选地过滤掉不需要的集合。
下面是监视MongoDB副本集的MongoDB连接器的配置示例rs0
在192.168.99.100的27017端口,我们在逻辑上命名它fullfillment
.通常,配置Debezium MongoDB连接器的方法是开云体育官方注册网址. json
文件中使用连接器可用的配置属性。
{"name": "inventory-connector",(1)"config": {"connector.class": "io.d开云体育官方注册网址ebezium.connector.mongodb.MongoDbConnector",(2)“mongodb。hosts": "rs0/192.168.99.100:27017",(3):“mongodb.name fullfillment”,(4)”集合。白名单”:“库存。*”,(5)}}
1 | 当我们向Kafka Connect服务注册连接器时,连接器的名称。 |
2 | MongoDB连接器类的名称。 |
3. | 用于连接到MongoDB复制集的主机地址。 |
4 | 的逻辑名它为生成的事件形成了一个命名空间,并用于连接器写入的所有Kafka主题的名称,Kafka Connect模式的名称,以及使用Avro连接器时对应的Avro模式的名称空间。 |
5 | 与要监控的所有集合的命名空间(例如 |
看到连接器属性的完整列表可以在这些配置中指定。
该配置可以通过POST发送到正在运行的Kafka Connect服务,然后该服务将记录配置并启动一个连接器任务,该任务将连接到MongoDB副本集或分片集群,为每个副本集分配任务,必要时执行快照,读取oplog,并将事件记录到Kafka主题。
监控
Debe开云体育官方注册网址zium MongoDB连接器除了内置支持Zookeeper、Kafka和Kafka Connect的JMX指标外,还有两种指标类型。
详情请参阅监控文档了解如何通过JMX公开这些指标的详细信息。
快照指标
的MBean是开云体育官方注册网址debezium.mongodb: type = connector-metrics上下文=快照,server =< mongodb.name >
.
属性名称 |
类型 |
描述 |
|
|
连接器读取的最后一个快照事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
由连接器监视的集合列表。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
快照中包含的集合总数。 |
|
|
快照尚未复制的集合数量。 |
|
|
快照是否启动。 |
|
|
快照是否中止。 |
|
|
快照是否完成。 |
|
|
快照到目前为止所花费的总秒数,即使没有完成。 |
|
|
映射,其中包含为快照中的每个集合导出的文档数量。在处理期间将集合增量地添加到Map中。每扫描10,000个文档并在完成收集时更新一次。 |
Debe开云体育官方注册网址zium MongoDB连接器还提供了以下自定义快照指标:
属性 | 类型 | 描述 |
---|---|---|
|
|
数据库断开数。开云体育电动老虎机 |
流指标
的MBean是开云体育官方注册网址debezium.sql_server: type = connector-metrics、上下文=流媒体服务器=< mongodb.name >
.
属性名称 |
类型 |
描述 |
|
|
连接器读取的最后一个流事件。 |
|
|
自连接器读取并处理最近事件以来的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
连接器上已配置白名单或黑名单过滤规则过滤的事件个数。 |
|
|
由连接器监视的集合列表。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列长度。 |
|
|
用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。 |
|
|
标记,表示连接器当前是否连接到mongodb。 |
|
|
从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机 |
|
|
提交的已处理事务的数量。 |
|
|
上次接收事件的坐标。 |
|
|
最后处理的事务的事务标识符。 |
Debe开云体育官方注册网址zium MongoDB连接器还提供了以下自定义流指标:
属性 | 类型 | 描述 |
---|---|---|
|
|
数据库断开数。开云体育电动老虎机 |
|
|
主节点选举的个数。 |
连接器属性
以下配置属性为要求除非有默认值可用。
财产 | 默认的 | 描述 |
---|---|---|
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。) |
||
连接器的Java类的名称。始终使用值 |
||
复制集中MongoDB服务器的主机名和端口对列表(以'host'或'host:port'的形式),以逗号分隔。该列表可以包含单个主机名和端口对。如果 |
||
一个唯一的名称,用于标识连接器和/或该连接器监视的MongoDB副本集或分片集群。每个服务器应该由最多一个Debezium连接器监控,因为这个服务器名前缀了所有来自MongoD开云体育官方注册网址B副本集或集群的持久化Kafka主题。只能使用字母数字字符和下划线。 |
||
连接MongoDB时使用开云体育电动老虎机的数据库用户名。只有当MongoDB配置为使用身份验证时,才需要这样做。 |
||
连接MongoDB时使用的密码。只有当MongoDB配置为使用身份验证时,才需要这样做。 |
||
|
开云体育电动老虎机包含MongoDB凭证的数据库(身份验证源)。只有当MongoDB配置为使用与另一个身份验证数据库的身份验证时,才需要这样做开云体育电动老虎机 |
|
|
连接器将使用SSL连接到MongoDB实例。 |
|
|
启用SSL时,此设置控制在连接阶段是否禁用严格的主机名检查。如果 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,匹配要监控的数据库名称;开云体育电动老虎机任何未包开云体育电动老虎机含在白名单中的数据库名称都将被排除在监控之外。默认情况下监视所有数据库。开云体育电动老虎机不得与 |
|
空字符串 |
可选的逗号分隔的正则表达式列表,匹配要排除在监视之外的数据库名称;开云体育电动老虎机任何未包开云体育电动老虎机含在黑名单中的数据库名称都将受到监控。不得与 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,它与要监控的MongoDB集合的完全限定名称空间匹配;任何未包含在白名单中的集合都将被排除在监控之外。每个标识符都是这样的开云体育电动老虎机数据库名.collectionName.对象中的集合之外的所有集合默认情况下,连接器将监视 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,该列表与MongoDB集合的完全限定名称空间匹配,将被排除在监控之外;任何未包含在黑名单中的集合都将被监控。每个标识符都是这样的开云体育电动老虎机数据库名.collectionName.不得与 |
|
|
指定在连接器启动时运行快照的条件。默认为最初的,并指定连接器在没有发现偏移量或oplog不再包含以前的偏移量时读取快照。的从来没有选项指定连接器永远不应该使用快照,相反,连接器应该继续跟踪日志。 |
|
空字符串 |
应从更改事件消息值中排除的字段的全限定名称的可选列表,以逗号分隔。字段的完全限定名的格式为开云体育电动老虎机数据库名.collectionName.字段名.nestedFieldName,在那里开云体育电动老虎机数据库名而且collectionName可以包含与任何字符匹配的通配符(*)。 |
|
空字符串 |
一个可选的、以逗号分隔的字段完全限定替换列表,用于重命名更改事件消息值中的字段。字段的完全合格替换是这样的开云体育电动老虎机数据库名.collectionName.字段名.nestedFieldName:newNestedFieldName,在那里开云体育电动老虎机数据库名而且collectionName可以包含与任何字符匹配的通配符(*),冒号(:)用于确定字段的重命名映射。下一个字段替换应用于列表中前一个字段替换的结果,因此在重命名位于同一路径的多个字段时请记住这一点。 |
|
|
应该为此连接器创建的最大任务数。MongoDB连接器将尝试为每个副本集使用单独的任务,因此在使用单个MongoDB副本集的连接器时,默认值是可接受的。当使用MongoDB分片集群的连接器时,我们建议指定一个等于或大于集群中分片数量的值,这样每个副本集的工作就可以通过Kafka Connect分配。 |
|
|
正整数值,指定用于对复制集中的集合执行初始同步的最大线程数。默认值为1。 |
|
|
控制是否在删除事件之后生成墓碑事件。 |
|
连接器启动后在快照之前应该等待的间隔(以毫秒为单位); |
||
|
指定在拍摄快照时应从每个集合中一次性读取的最大文档数。连接器将以这个大小的多个批次读取集合内容。 |
以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。
财产 |
默认的 |
描述 |
|
正整数值,指定阻塞队列的最大大小,从数据库日志中读取的更改事件在写入Kafka之前被放置在其中。开云体育电动老虎机例如,当写入Kafka较慢或Kafka不可用时,该队列可以为oplog读取器提供反压力。出现在队列中的事件不包括在此连接器定期记录的偏移量中。属性中指定的最大批处理大小,默认值为8192 |
|
|
正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。 |
|
|
正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|
|
正整数值,指定在第一次连接尝试失败或没有主节点可用时试图重新连接到主节点时的初始延迟。缺省值为1秒(1000毫秒)。 |
|
|
正整数值,指定在多次连接尝试失败或没有主服务器可用时试图重新连接到主服务器时的最大延迟。默认值为120秒(120,000毫秒)。 |
|
|
正整数值,指定在发生异常和任务中止之前尝试连接主副本集失败的最大次数。默认值为16 |
|
|
布尔值,用于指定mongodb中的地址。hosts' are seeds that should be used to discover all members of the cluster or replica set ( |
|
v2 |
的架构版本 |
|
|
控制心跳消息的发送频率。 将此参数设置为 |
|
|
控制要向其发送心跳消息的主题的命名。 |
|
|
字段名是否被净化以符合Avro命名要求。看到Avro命名欲知详情。 |
|
以逗号分隔的oplog操作列表,将在流处理期间跳过。操作包括: |
||
|
当设置为 看到事务的元数据更多细节。 |
MongoDB连接器常见问题
开云体育官方注册网址Debezium是一个分布式系统,它捕获多个上游数据库中的所有更改,并且永远不会错过或丢失事件。开云体育电动老虎机当然,当系统在名义上运行或被仔细管理时,Debezium可以提供开云体育官方注册网址只有一天交付每个变更事件。但是,如果确实发生了错误,那么系统仍然不会丢失任何事件,尽管当它从错误中恢复时,它可能会重复一些更改事件。因此,在这些不正常的情况下,Debezium(像Kafka)提供了开云体育官方注册网址至少一次变更事件的交付。
本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址
配置和启动错误
连接器将在启动时失败,在日志中报告错误/异常,当连接器的配置无效时,或当连接器使用指定的连接参数多次连接到MongoDB失败时,连接器将停止运行。重新连接使用指数回退完成,并且最大尝试次数是可配置的。
在这些情况下,错误将提供有关问题的更多详细信息,并可能提供建议的解决方法。当配置已经纠正或MongoDB问题已经解决时,可以重新启动连接器。
MongoDB不可用
一旦连接器开始运行,如果任何MongoDB副本集的主节点变得不可用或不可达,连接器将重复尝试重新连接到主节点,使用指数回退来防止网络或服务器饱和。如果在可配置的连接尝试次数之后,主服务器仍然不可用,则连接器将失败。
重新连接的尝试由三个属性控制:
connect.backoff.initial.delay.ms
-第一次尝试重新连接之前的延迟,默认为1秒(1000毫秒)。connect.backoff.max.delay.ms
-尝试重新连接之前的最大延迟,默认为120秒(120,000毫秒)。connect.max.attempts
—产生错误前的最大尝试次数,默认为16次。
每次延迟都是之前延迟的两倍,直到最大延迟。如果使用默认值,下表显示了每次失败连接尝试的延迟和失败前的累计总时间。
重新连接次数 |
尝试前延迟,以秒为单位 |
尝试前的总延迟,以分钟和秒为单位 |
1 |
1 |
00:01 |
2 |
2 |
00:03 |
3. |
4 |
00:07 |
4 |
8 |
00:15 |
5 |
16 |
00:31 |
6 |
32 |
01:03 |
7 |
64 |
02:07 |
8 |
120 |
04:07 |
9 |
120 |
06:07 |
10 |
120 |
08:07 |
11 |
120 |
10:07 |
12 |
120 |
12:07 |
13 |
120 |
14:07 |
14 |
120 |
16:07 |
15 |
120 |
18:07 |
16 |
120 |
20:07 |
Kafka Connect进程优雅地停止
如果Kafka Connect正在以分布式模式运行,并且一个Kafka Connect进程被优雅地停止,那么在关闭该进程之前,Kafka Connect将把该进程的所有连接器任务迁移到该组中的另一个Kafka Connect进程中,新的连接器任务将准确地拾取先前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。
如果组只包含一个进程,并且该进程被优雅地停止,那么Kafka Connect将停止连接器并记录每个副本集的最后偏移量。在重新启动时,副本集任务将继续在他们离开的地方。
Kafka连接进程崩溃
如果Kafka连接器进程意外停止,那么它正在运行的任何连接器任务都将终止,而不会记录它们最近处理的偏移量。当Kafka Connect以分布式模式运行时,它将重新启动其他进程上的连接器任务。但是,MongoDB连接器将从最后一个偏移量恢复记录通过较早的流程,这意味着新的替换任务可能会生成一些与崩溃之前处理的相同的更改事件。重复事件的数量取决于偏移刷新周期和崩溃前更改的数据量。
由于在故障恢复过程中可能会出现重复的事件,因此使用者应该始终预测到某些事件可能会重复。开云体育官方注册网址Debezium的变化是幂等的,所以一系列事件的结果总是相同的状态。 开云体育官方注册网址Debezium还在每个更改事件消息中包含关于事件起源的源特定信息,包括MongoDB事件的唯一事务标识符( |
Kafka不可用
当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。Kafka Connect还会定期记录在这些更改事件中出现的最新偏移量,频率由你在Kafka Connect worker配置中指定。如果Kafka代理变得不可用,运行连接器的Kafka Connect工作进程将简单地重复尝试重新连接到Kafka代理。换句话说,连接器任务将简单地暂停,直到重新建立连接,此时连接器将完全从它们停止的地方恢复。
连接器停止一段时间
如果连接器被优雅地停止,副本集可以继续使用,任何新的更改都会记录在MongoDB的oplog中。当连接器重新启动时,它将恢复它上次停止的每个副本集的流更改,记录连接器停止时所做的所有更改的更改事件。如果连接器停止的时间足够长,以至于MongoDB从它的oplog中清除一些连接器没有读取的操作,那么在启动连接器时,连接器将执行一个快照。
一个正确配置的Kafka集群能够提供巨大的吞吐量。Kafka Connect是用Kafka最佳实践编写的,如果有足够的资源,它也能够处理大量的数据库更改事件。开云体育电动老虎机正因为如此,当一个连接器在一段时间后重新启动时,它很可能会赶上数据库,尽管多快将取决于Kafka的能力和性能以及MongoDB中对数据的更改量。开云体育电动老虎机
如果连接器停止的时间足够长,MongoDB可能会清除旧的oplog文件,连接器的最后一个位置可能会丢失。在这种情况下,当连接器配置为最初的快照模式(默认)最终重新启动,MongoDB服务器将不再有起点,连接器将失败并报错。 |