开云体育官方注册网址Debezium连接器用于MongoDB
开云体育官方注册网址Debezium的MongoDB连接器可以监视一个MongoDB副本集或者一个MongoDB分片集群对于数据库和集合中的文档更改,将这些更改记录为K开云体育电动老虎机afka主题中的事件。连接器自动处理除了或删除对于分片集群中的分片,每个副本集成员的变化,选举在每个副本集中,并等待通信问题的解决。
概述
MongoDB的复制机制提供冗余和高可用性,是在生产环境中运行MongoDB的首选方式。一个MongoDB副本集由一组服务器组成,这些服务器都有相同数据的副本,复制确保客户端对副本集中的文档所做的所有更改主要的正确地应用于其他复制集的服务器,称为次要的人.MongoDB复制的工作原理是让主服务器记录其数据库中的更改oplog(或操作日志),然后每个辅助服务器读取主服务器的oplog,并按顺序将所有操作应用于它们自己的文档。当一个新的服务器被添加到一个副本集时,该服务器首先执行一个初始同步然后读取主服务器的oplog开云体育电动老虎机,以应用自初始同步开始以来可能发生的所有更改。当这个新服务器赶上主服务器的oplog的尾部时,它将成为辅助服务器(并能够处理查询)。
Debe开云体育官方注册网址zium MongoDB连接器使用相同的复制机制,尽管它实际上不会成为复制集的成员。然而,就像MongoDB从服务器一样,连接器总是读取复制集主服务器的oplog。并且,当连接器第一次看到一个副本集时,它会查看oplog以获得最后记录的事务,然后执行开始同步主要的数据库和收藏。开云体育电动老虎机复制完所有数据后,连接器开始从之前读取的位置读取oplog。MongoDB oplog中的操作为幂等,因此无论应用多少次操作,结果都是相同的结束状态。
MongoDB连接器在处理oplog时,会定期记录事件在oplog中的起源位置。当MongoDB连接器停止时,它会记录它处理过的最后一个oplog位置,因此在重新启动时,它只是从该位置开始读取oplog。换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,它将准确地从停止的地方恢复,而不会丢失任何事件。当然,MongoDB的oplog通常被限制在最大大小,这意味着连接器不应该停止太长时间,否则在连接器改变读取它们之前,oplog中的一些操作可能会被清除。在这种情况下,在重新启动时,连接器将检测缺失的oplog操作,执行初始同步,然后继续跟踪oplog。
MongoDB连接器还可以容忍副本集的成员和领导、分片集群中分片的添加或删除以及可能导致通信故障的网络问题的更改。连接器总是使用复制集的主节点来跟踪oplog,因此当复制集经历一次选举并且不同的节点成为主节点时,连接器将立即停止跟踪oplog,连接到新的主节点,并开始使用新的主节点跟踪oplog。同样,如果连接器在与复制集主节点的通信中遇到任何问题,它将尝试重新连接(使用指数回退,以免淹没网络或复制集),并从它上次离开的地方继续跟踪oplog。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。
支持的MongoDB拓扑
Debe开云体育官方注册网址zium MongoDB连接器可以与各种MongoDB拓扑一起使用。
MongoDB副本集
Debe开云体育官方注册网址zium MongoDB连接器可以从单个连接器捕获更改MongoDB副本集.虽然生产副本集应该有至少三名成员,连接器实际上并不关心复制集中有多少成员。
要使用带有复制集的De开云体育官方注册网址bezium MongoDB连接器,只需提供一个或多个复制集服务器的地址为种子地址通过连接器的mongodb.hosts
财产。连接器将使用这些种子连接到复制集,然后一旦连接,将从复制集获得完整的成员集以及哪个成员是主成员。连接器将启动连接到主服务器的任务,并从主服务器的oplog捕获更改。当副本集选择一个新的主副本时,任务将自动切换到新的主副本。
当MongoDB由代理连接时(例如在OS X或Windows上使用Docker),当客户端连接到副本集并发现成员时,MongoDB客户端将排除代理作为有效成员,并尝试直接连接到成员,而不是通过代理连接,但失败。 在这些情况下,将连接器设置为可选 |
MongoDB分片集群
一个MongoDB分片集群包括:
一个或多个碎片,每一个都作为复制集部署;
一个单独的副本集,充当集群的副本集配置服务器;而且
一个或多个路由器(也称为
蒙戈
),客户端连接并将请求路由到适当的分片
要使用带有分片集群的D开云体育官方注册网址ebezium MongoDB连接器,请使用配置服务器副本集。当连接器连接到这个复制集时,它会发现自己正在充当一个分片集群的配置服务器,发现关于集群中用作分片的每个复制集的信息,然后启动一个单独的任务来捕获每个复制集的更改。如果向集群中添加了新的分片或删除了现有的分片,连接器将自动相应地调整其任务。
MongoDB独立服务器
Debe开云体育官方注册网址zium MongoDB连接器不能监视独立MongoDB服务器的更改,因为独立服务器没有oplog。如果将独立服务器转换为具有一个成员的副本集,则连接器将正常工作。
MongoDB不推荐在生产环境中运行独立服务器。 |
MongoDB连接器如何工作
本节将详细介绍MongoDB连接器如何捕获复制集或分片集群中的更改。
任务
在配置和部署MongoDB连接器时,它首先连接到种子地址上的MongoDB服务器,并确定关于每个可用副本集的详细信息。由于每个复制集都有自己独立的oplog,连接器将尝试为每个复制集使用单独的任务。连接器可以限制它将使用的任务的最大数量,如果没有足够的任务可用,连接器将为每个任务分配多个副本集,尽管任务仍将为每个副本集使用单独的线程。
在对分片集群运行连接器时,使用值 |
逻辑连接器名称
连接器配置属性之一是mongodb.name
,它作为一个逻辑名用于MongoDB副本集或分片集群。连接器以多种方式使用逻辑名:作为所有主题名的前缀,以及在记录每个副本集的oplog位置时作为唯一标识符。
您应该为每个MongoDB连接器提供一个唯一的逻辑名称,该名称可以有意义地描述源MongoDB系统。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。
初始同步
当一个任务使用复制集启动时,它使用连接器的逻辑名和复制集名来查找抵消它描述了在复制集oplog中连接器先前停止读取的位置。如果可以找到一个偏移量,并且它仍然在oplog中,那么任务立即继续执行跟踪博客,从记录的偏移位置开始。
但是,如果没有发现偏移量,或者如果oplog不再包含该位置,任务必须首先通过执行命令获取复制集内容的当前状态初始同步.这个过程首先记录oplog的当前位置,并将其记录为偏移量(同时记录一个标志,表示初始同步已经开始)。然后,该任务将继续复制每个集合,生成尽可能多的线程(直到initial.sync.max.threads
配置属性)以并行地执行此工作。连接器将记录一个单独的读取事件对于它看到的每个文档,读取事件将包含对象的标识符、对象的完整状态和源关于该对象所在的MongoDB副本集的信息。源信息还将包括一个标志,表示在初始同步期间产生的事件。
这个初始同步将继续进行,直到复制了与连接器筛选器匹配的所有集合为止。如果连接器在任务初始同步完成之前停止,重新启动连接器将再次开始初始同步。
在连接器执行任何副本集的初始同步时,尽量避免重新分配和重新配置任务。连接器将记录初始同步进程的消息。为了最大限度地控制,为每个连接器运行一个单独的Kafka Connect集群。 |
跟踪博客
一旦复制集的连接器任务有了偏移量,它就会使用偏移量来确定它应该开始读取的oplog中的位置。然后,该任务将连接到副本集的主节点,并开始从该位置读取oplog,处理所有创建、插入和删除操作,并将它们转换为Debezium开云体育官方注册网址更改事件.每个更改事件都包括在oplog中发现操作的位置,连接器定期将其记录为最近的偏移量。记录偏移量的间隔由offset.flush.interval.ms
Kafka连接工人配置属性.)
当连接器优雅地停止时,将记录处理的最后一个偏移量,以便重新启动时,连接器将准确地继续它停止的位置。然而,如果连接器的任务意外终止,那么任务可能在它最后一次记录偏移量之后,但在最后一次记录偏移量之前处理并生成了事件;重新启动时,连接器将从最后一个开始记录偏移,可能会生成一些与之前在崩溃之前生成的事件相同的事件。
当一切正常运行时,Kafka消费者将真正看到每条消息只有一天.然而,当出现问题时,Kafka只能保证消费者能够看到每一条信息至少一次.因此,您的客户需要预期看到不止一次的消息。 |
如上所述,连接器任务总是使用复制集的主节点跟踪oplog,确保连接器尽可能看到最新的操作,并能够以比使用辅助节点更低的延迟捕获更改。当复制集选择一个新的主节点时,连接器将立即停止跟踪oplog,连接到新的主节点,并在相同的位置开始跟踪新主节点的oplog。同样地,如果连接器在与复制集成员的通信中遇到任何问题,它将尝试重新连接(使用指数回退,以免淹没复制集),并且一旦连接,将从它上次离开的地方继续跟踪oplog。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。
总之,MongoDB连接器将在大多数情况下继续运行,尽管通信问题可能会导致连接器等待问题解决。
主题名称
MongoDB连接器将对每个集合中的文档的所有插入、更新和删除操作的事件写入到单个Kafka主题。卡夫卡主题的名称总是采用这种形式logicalName.开云体育电动老虎机数据库名.collectionName,在那里logicalName是逻辑名属性指定的连接器的mongodb.name
配置属性,开云体育电动老虎机数据库名发生操作的数据库名称和开云体育电动老虎机collectionName是受影响文档所在的MongoDB集合的名称。
例如,考虑一个MongoDB副本集库存
开云体育电动老虎机数据库包含四个集合:产品
,products_on_hand
,客户
,订单
.如果监视此数据库的连接器被赋予的逻辑名称为开云体育电动老虎机实现
,那么连接器将在这四个Kafka主题上产生事件:
fulfillment.inventory.products
fulfillment.inventory.products_on_hand
fulfillment.inventory.customers
fulfillment.inventory.orders
注意,主题名称不包含复制集名称或碎片名称。因此,对切分集合(其中每个切分包含集合文档的子集)的所有更改都转到相同的Kafka主题。
你可以设置卡夫卡自动创建根据需要选择题目。如果不是,那么你必须在启动连接器之前使用Kafka管理工具来创建主题。
分区
MongoDB连接器不显式地确定事件的主题分区。相反,它允许Kafka根据密钥来确定分区。你可以改变Kafka的分区逻辑,方法是在Kafka Connect worker配置中定义瓜分者
实现。
请注意,Kafka只维护写入单个主题的事件的总顺序分区.按键划分事件意味着具有相同键的所有事件将始终进入相同的分区,从而确保特定文档的所有事件始终完全有序。
事件
MongoDB连接器产生的所有数据更改事件都有一个键和一个值。本节的其余部分将概述这些键和值的结构。
从Kafka 0.10开始,Kafka可以选择记录消息键和值时间戳消息被创建(由生产者记录)或被Kafka写入日志的时间。 |
开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流,如果这些事件的源在结构上发生了变化,或者连接器得到了改进或改变,那么这些事件的结构可能会随着时间的推移而发生变化。这对于消费者来说可能很难处理,所以Kafka Connect使得每个事件都是自包含的。每个消息键和值都有两部分模式而且有效载荷.模式描述有效负载的结构,而有效负载包含实际数据。
更改事件的键
对于给定的集合,更改事件的键将是包含单个id
字段。它的值将是文档的标识符,表示为字符串,派生自MongoDB在严格模式下扩展JSON序列化.考虑逻辑名为的连接器实现
对象的复制集库存
开云体育电动老虎机数据库的客户
收藏的文件包括:
{"_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org"}
的每一个变化事件客户
collection将具有相同的键结构,在JSON中是这样的:
{"schema": {"type": "struct", "name": "fulfillment.inventory.customers. "关键”“可选”:假的,“字段”:[{”字段”:“id”、“类型”:“弦”、“可选”:假}]},“有效载荷”:{" id ": " 1004 "}}
的模式
密钥的部分包含一个Kafka Connect模式,描述有效负载部分的内容,在我们的例子中,这意味着有效载荷
值不是可选的,是由名为fulfillment.inventory.customers.Key
,并且有一个名为id
类型的字符串
.如果我们看一下键的值有效载荷
字段,我们将看到它确实是一个结构(在JSON中只是一个对象)id
字段,其值为包含整数的字符串1004
.
这个例子使用了一个具有整数标识符的文档,但是任何有效的MongoDB文档标识符(包括文档)都可以工作。的值id
字段将只是一个字符串,表示原始文档的MongoDB扩展JSON序列化(严格模式)_id
字段。下面的几个例子说明了如何做到_id
不同类型的字段将被编码为事件键的有效负载:
类型 | MongoDB_id 价值 |
关键的负载 |
---|---|---|
整数 |
1234 |
|
浮动 |
12.34 |
|
字符串 |
“1234” |
|
文档 |
{“嗨”:“卡夫卡”、“num”:(10.0,100.0,1000.0)} |
|
ObjectId |
ObjectId(“596 e275826f08b2730779e1f”) |
|
二进制 |
BinData(“a2Fma2E = ", 0) |
|
从Debez开云体育官方注册网址ium 0.3.0开始,Debezium MongoDB连接器确保所有Kafka连接模式名是有效的Avro模式名.这意味着逻辑服务器名称必须以拉丁字母或下划线开头(例如,[a-z, a-z, _]),逻辑服务器名称中的其余字符以及数据库和集合名称中的所有字符必须是拉丁字母、数字或下划线(例如,[a-z, a-z, 0-9,\_])。开云体育电动老虎机如果不是,则所有无效字符将自动替换为下划线字符。 当逻辑服务器名、数据库名和表名包含其他字符时,这可能导致模式名中出现意外冲突,并且表全名之间唯一的区分字符无效,因此被下划线取代。开云体育电动老虎机连接器尝试在这种情况下产生异常,但只有在单个连接器中使用的模式之间存在冲突时才会产生异常。 |
更改事件的值
更改事件消息的值稍微复杂一些。就像关键信息一样,它有一个模式节和有效载荷部分。MongoDB连接器产生的每个更改事件值的有效负载部分都有一个信封结构,使用以下字段:
人事处
必选字段,包含描述操作类型的字符串值。MongoDB连接器的值为c
对于创建(或插入),u
对于更新,d
对于删除,和r
用于读取(在初始同步的情况下)。后
是可选字段,如果存在则包含文档的状态后事件发生了。MongoDB的oplog条目只包含一个文档的完整状态创建事件,这些是唯一包含后字段。源
是一个必选字段,包含描述事件源元数据的结构,在MongoDB中包含几个字段:Debezium版本、逻辑名、副本集的名称、集合的名称空间、事件发生的MongoDB时间戳(以及时间戳内事件的序号)、MongoDB操作的标识符(例如开云体育官方注册网址h
字段),以及初始同步标志(如果该事件是在初始同步期间发生的)。ts_ms
是可选的,如果存在,则包含连接器处理事件的时间(使用运行Kafka Connect任务的JVM中的系统时钟)。
当然,还有模式事件消息值的一部分包含描述此信封结构及其内嵌套字段的模式。
我们来看看a创建/读事件值可能看起来像客户
表:
{"模式":{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“之后”},{“类型”:开云体育官方注册网址“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“补丁”},{“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:“快照”},{“类型”:“弦”、“可选”:假的,“场”:“分贝”},{“类型”:“弦”、“可选”:假的,“场”:“rs”},{“类型”:“弦”、“可选”:假的,“场”:“集合”},{“类型”:“int32”、“可选”:假的,“场”:“奥德”},{“类型”:“int64”、“可选”:真的,“场”:“h”}],“可选”:假的,“名字”:“io.debezium.connector.mongo.Source”、“字段”:“源”},{“类型”:开云体育官方注册网址"string", "optional": true, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "dbserver1.inventory.customers. "信封”},“有效载荷”:{“后”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“安妮\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,“补丁”:空,“源”:{“版本”:“0.9.5。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999}, "op": "r", "ts_ms": 1558965515240}}
如果我们看模式
事件的一部分价值的模式信封的模式特定于集合,而源
结构(特定于MongoDB连接器并在所有事件中重用)。还要注意后
value总是一个字符串,并且按照约定它将包含文档的JSON表示形式。
如果我们看有效载荷
事件的一部分价值,我们可以看到事件中的信息,即它描述了文档作为初始同步的一部分被读取(因为op = r
而且initsync = true
),以及后
字段值包含文档的JSON字符串表示形式。
事件的JSON表示形式似乎比它们描述的行要大得多。这是正确的,因为JSON表示必须包含模式和有效载荷部分信息。这是可能的,甚至建议使用Avro转换器以大幅减少写入Kafka主题的实际消息的大小。 |
的值更新此集合上的Change事件将具有完全相同的内容模式,其有效负载的结构相同,但将持有不同的值。具体来说,更新事件将没有后
值,将会有一个补丁
包含幂等更新操作的JSON表示形式的字符串。这里有一个例子:
{"schema":{…},“有效载荷”:{“人事处”:“u”,“ts_ms”:1465491461815,“补丁”:“{\“\”美元:{\“first_name \”,\“安妮玛丽\}}”,“源”:{“版本”:“0.9.5。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
当我们把这个和插入事件中,我们看到了一些不同有效载荷
部分:
的
人事处
字段值现在是u
,表示此文档因更新而更改的
补丁
字段出现,并具有对文档的实际MongoDB幂等性更改的字符串化JSON表示,在本例中涉及设置first_name
字段转换为新值的
后
字段不再出现的
源
字段结构具有与以前相同的字段,但值有所不同,因为此事件来自oplog中的不同位置。的
ts_ms
显示了Debezium处理此事件的时间戳。开云体育官方注册网址
的内容 本文档中的所有示例都来自MongoDB 3.4,如果使用不同的示例可能会有所不同。 |
同样,MongoDB的oplog中的更新事件没有之前或后因此Debezium连接器无法提供此信息。开云体育官方注册网址然而,由于创建或读事件做如果包含开始状态,流的下游消费者实际上可以通过为每个文档保留最新状态并将每个事件应用于该状态来完全重建状态。开云体育官方注册网址Debezium连接器不能保持这样的状态,所以它不能这样做。 |
到目前为止,我们已经看到了样本创建/读而且更新事件。现在,我们来看看a的值删除事件。的值删除事件也将具有完全相同的属性模式,其有效负载的结构相同,但将持有不同的值。特别地,删除事件不会有后
值或补丁
值:
{"schema":{…},“有效载荷”:{“人事处”:“d”、“ts_ms”:1465495462115,“源”:{“版本”:“0.9.5。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
当我们将此值与其他事件中的值进行比较时,我们可以看到有效载荷
部分:
的
人事处
字段值现在是d
,表示该文档已被删除的
补丁
字段不出现的
后
字段不出现的
源
字段结构具有与以前相同的字段,但值有所不同,因为此事件来自oplog中的不同位置。的
ts_ms
显示了Debezium处理此事件的时间戳。开云体育官方注册网址
Debe开云体育官方注册网址zium MongoDB连接器实际上提供了另一种事件。每一个删除事件之后会有一个墓碑上事件,具有相同的键,但零
值,给卡夫卡足够的信息来知道它Kafka对数压缩机构可以移除所有用那个键发送消息。
所有的MongoDB连接器事件都是设计用于工作的Kafka对数压缩,只要每个键至少保留最近的消息,就可以删除旧消息。这就是Kafka如何回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。 一个唯一标识文档的所有MongoDB连接器事件将具有完全相同的键,向Kafka发出信号,只保留最新的事件。墓碑事件告诉卡夫卡所有可以删除具有相同键的消息。 |
当事情出错时
开云体育官方注册网址Debezium是一个分布式系统,它捕获多个上游数据库中的所有更改,并且永远不会错过或丢失事件。开云体育电动老虎机当然,当系统在名义上运行或被仔细管理时,Debezium可以提供开云体育官方注册网址只有一天交付每个变更事件。但是,如果确实发生了错误,那么系统仍然不会丢失任何事件,尽管当它从错误中恢复时,它可能会重复一些更改事件。因此,在这些不正常的情况下,Debezium(像Kafka)提供了开云体育官方注册网址至少一次变更事件的交付。
本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址
配置和启动错误
连接器将在启动时失败,在日志中报告错误/异常,当连接器的配置无效时,或当连接器使用指定的连接参数多次连接到MongoDB失败时,连接器将停止运行。重新连接使用指数回退完成,并且最大尝试次数是可配置的。
在这些情况下,错误将提供有关问题的更多详细信息,并可能提供建议的解决方法。当配置已经纠正或MongoDB问题已经解决时,可以重新启动连接器。
MongoDB不可用
一旦连接器开始运行,如果任何MongoDB副本集的主节点变得不可用或不可达,连接器将重复尝试重新连接到主节点,使用指数回退来防止网络或服务器饱和。如果在可配置的连接尝试次数之后,主服务器仍然不可用,则连接器将失败。
重新连接的尝试由三个属性控制:
connect.backoff.initial.delay.ms
-第一次尝试重新连接之前的延迟,默认为1秒(1000毫秒)。connect.backoff.max.delay.ms
-尝试重新连接之前的最大延迟,默认为120秒(120,000毫秒)。connect.max.attempts
—产生错误前的最大尝试次数,默认为16次。
每次延迟都是之前延迟的两倍,直到最大延迟。如果使用默认值,下表显示了每次失败连接尝试的延迟和失败前的累计总时间。
重新连接次数 | 尝试前延迟,以秒为单位 | 尝试前的总延迟,以分钟和秒为单位 |
---|---|---|
1 |
1 |
00:01 |
2 |
2 |
00:03 |
3. |
4 |
00:07 |
4 |
8 |
00:15 |
5 |
16 |
00:31 |
6 |
32 |
01:03 |
7 |
64 |
02:07 |
8 |
120 |
04:07 |
9 |
120 |
06:07 |
10 |
120 |
08:07 |
11 |
120 |
10:07 |
12 |
120 |
12:07 |
13 |
120 |
14:07 |
14 |
120 |
16:07 |
15 |
120 |
18:07 |
16 |
120 |
20:07 |
Kafka Connect进程优雅地停止
如果Kafka Connect正在以分布式模式运行,并且一个Kafka Connect进程被优雅地停止,那么在关闭该进程之前,Kafka Connect将把该进程的所有连接器任务迁移到该组中的另一个Kafka Connect进程中,新的连接器任务将准确地拾取先前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。
如果组只包含一个进程,并且该进程被优雅地停止,那么Kafka Connect将停止连接器并记录每个副本集的最后偏移量。在重新启动时,副本集任务将继续在他们离开的地方。
Kafka连接进程崩溃
如果Kafka连接器进程意外停止,那么它正在运行的任何连接器任务显然都将终止,而不会记录它们最近处理的偏移量。当Kafka Connect以分布式模式运行时,它将重新启动其他进程上的连接器任务。但是,MongoDB连接器将从最后一个偏移量恢复记录通过较早的流程,这意味着新的替换任务可能会生成一些与崩溃之前处理的相同的更改事件。重复事件的数量将取决于偏移刷新周期和崩溃前的数据更改量。
由于在故障恢复过程中可能会出现重复的事件,因此使用者应该始终预测到某些事件可能会重复。开云体育官方注册网址氘的变化是幂等的,所以一系列的事件总是在相同的状态下发生。 开云体育官方注册网址Debezium还在每个更改事件消息中包含关于事件起源的源特定信息,包括MongoDB事件的唯一事务标识符( |
Kafka不可用
当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。Kafka Connect还会定期记录在这些更改事件中出现的最新偏移量,频率在Kafka Connect worker配置中指定。如果Kafka代理变得不可用,运行连接器的Kafka Connect工作进程将简单地重复尝试重新连接到Kafka代理。换句话说,连接器任务将简单地暂停,直到重新建立连接,此时连接器将完全从它们停止的地方恢复。
连接器停止一段时间
如果连接器优雅地停止,副本集可以继续使用,任何新的更改都将记录在MongoDB的oplog中。当连接器重新启动时,它将恢复读取它上次停止的每个副本集的oplog,记录连接器停止时所做的所有更改的更改事件。如果连接器停止的时间足够长,以至于MongoDB从它的oplog中清除一些连接器没有读取的操作,那么在启动连接器时,连接器将执行初始同步。
一个正确配置的Kafka集群能够巨大的吞吐量.Kafka Connect是用Kafka最佳实践编写的,如果有足够的资源,它也能够处理大量的数据库更改事件。开云体育电动老虎机正因为如此,当一个连接器在一段时间后重新启动时,它很可能会赶上数据库,尽管多快将取决于Kafka的能力和性能以及MongoDB中对数据的更改量。开云体育电动老虎机
如果连接器停止的时间足够长,MongoDB可能会清除旧的oplog文件,连接器的最后一个位置可能会丢失。在这种情况下,当连接器配置为最初的快照模式(默认)最终重新启动,MongoDB服务器将不再有起点,连接器将失败并报错。 |
部署连接器
如果您已经安装了动物园管理员,卡夫卡,卡夫卡连接,那么使用Debezium开云体育官方注册网址的MongoDB连接器就很容易了。只需下载连接器插件存档,将jar文件解压到Kafka Connect环境中,并添加jar文件所在的目录Kafka Connect的类路径.重新启动Kafka Connect进程以获取新的jar。
如果你喜欢不可变容器,那就试试吧开云体育官方注册网址Debezium的Docker图片对于Zookeeper, Kafka和Kafka Connect, MongoDB连接器已经预安装并准备好了。我们的教程甚至引导您使用这些图像,这是了解Debezium的一个很好的方法。开云体育官方注册网址你甚至可以在Kub开云体育官方注册网址ernetes和OpenShift上运行Debezium.
要使用连接器为特定的MongoDB复制集或分片集群产生更改事件,只需创建一个MongoDB Connector配置文件并使用Kafka连接REST API将该连接器添加到Kafka Connect集群。当连接器启动时,它将对您的MongoDB复制集中的集合执行初始同步,并开始读取复制集的oplogs,为每个插入、更新和删除的行生成事件。可选地过滤掉不需要的集合。
示例配置
使用MongoDB连接器非常简单。下面是一个用于监视MongoDB副本集的MongoDB连接器的配置示例rs0
在192.168.99.100的27017端口,我们在逻辑上命名它fullfillment
:
{"name": "inventory-connector",(1)"config": {"connector.class": "io.d开云体育官方注册网址ebezium.connector.mongodb.MongoDbConnector",(2)“mongodb。hosts": "rs0/192.168.99.100:27017",(3):“mongodb.name fullfillment”,(4)”集合。白名单”:“库存。*”,(5)}}
1 | 当我们向Kafka Connect服务注册连接器时,连接器的名称。 |
2 | MongoDB连接器类的名称。 |
3. | 用于连接到MongoDB复制集的主机地址 |
4 | 的逻辑名MongoDB副本集的命名空间,它为生成的事件形成了一个命名空间,并用于连接器写入的所有Kafka主题的名称、Kafka Connect模式的名称以及相应的Avro模式的命名空间Avro连接器使用。 |
5 | 匹配所有要监控的集合名称空间(例如, |
看到连接器属性的完整列表可以在这些配置中指定。
该配置可以通过POST发送到正在运行的Kafka Connect服务,然后该服务将记录配置并启动一个连接器任务,该任务将连接到MongoDB副本集或分片集群,为每个副本集分配任务,在必要时执行初始同步,读取oplog,并将事件记录到Kafka主题。
连接器属性
以下配置属性为要求除非有默认值可用。
财产 | 默认的 | 描述 |
---|---|---|
|
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。) |
|
|
连接器的Java类的名称。始终使用值 |
|
|
复制集中MongoDB服务器的主机名和端口对列表(以'host'或'host:port'的形式),以逗号分隔。该列表可以包含单个主机名和端口对。如果 |
|
|
一个唯一的名称,用于标识连接器和/或该连接器监视的MongoDB副本集或分片集群。每个服务器应该由最多一个Debezium连接器监控,因为这个服务器名前缀了所有来自MongoD开云体育官方注册网址B副本集或集群的持久化Kafka主题。 |
|
|
连接MongoDB时使用开云体育电动老虎机的数据库用户名。只有当MongoDB配置为使用身份验证时,才需要这样做。 |
|
|
连接MongoDB时使用的密码。只有当MongoDB配置为使用身份验证时,才需要这样做。 |
|
|
|
连接器将使用SSL连接到MongoDB实例。 |
|
|
启用SSL时,此设置控制在连接阶段是否禁用严格的主机名检查。如果 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,匹配要监控的数据库名称;开云体育电动老虎机任何未包开云体育电动老虎机含在白名单中的数据库名称都将被排除在监视之外。默认情况下,将监视所有数据库。开云体育电动老虎机不得与 |
|
空字符串 |
可选的逗号分隔的正则表达式列表,匹配要排除在监视之外的数据库名称;开云体育电动老虎机任何未包开云体育电动老虎机含在黑名单中的数据库名称都将被监控。不得与 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,它与要监控的MongoDB集合的完全限定名称空间匹配;任何未包含在白名单中的集合都将被排除在监控之外。每个标识符都是这样的开云体育电动老虎机数据库名.collectionName.对象中的集合之外的所有集合默认情况下,连接器将监视 |
|
空字符串 |
一个可选的逗号分隔的正则表达式列表,该列表与MongoDB集合的完全限定名称空间匹配,将被排除在监控之外;任何未列入黑名单的收集都将被监控。每个标识符都是这样的开云体育电动老虎机数据库名.collectionName.不得与 |
|
|
指定运行快照的条件(例如。在连接器启动时进行初始同步。默认为最初的,并指定连接器在没有发现偏移量或oplog不再包含以前的偏移量时读取快照。的从来没有选项指定连接器永远不应该使用快照,相反,连接器应该继续跟踪日志。 |
|
空字符串 |
应从更改事件消息值中排除的字段的全限定名称的可选列表,以逗号分隔。字段的完全限定名的格式为开云体育电动老虎机数据库名.collectionName.字段名.nestedFieldName,在那里开云体育电动老虎机数据库名而且collectionName可以包含与任何字符匹配的通配符(*)。 |
|
空字符串 |
一个可选的、以逗号分隔的字段完全限定替换列表,用于重命名更改事件消息值中的字段。字段的完全合格替换是这样的开云体育电动老虎机数据库名.collectionName.字段名.nestedFieldName:newNestedFieldName,在那里开云体育电动老虎机数据库名而且collectionName可以包含与任何字符匹配的通配符(*),冒号(:)用于确定字段的重命名映射。下一个字段替换应用于列表中前一个字段替换的结果,因此在重命名位于同一路径的多个字段时请记住这一点。 |
|
|
应该为此连接器创建的最大任务数。MongoDB连接器将尝试为每个副本集使用单独的任务,因此在使用单个MongoDB副本集的连接器时,默认值是可接受的。当使用MongoDB分片集群的连接器时,我们建议指定一个等于或大于集群中分片数量的值,这样每个副本集的工作就可以通过Kafka Connect分配。 |
|
|
正整数值,指定用于对复制集中的集合执行初始同步的最大线程数。默认值为1。 |
|
|
控制是否在删除事件之后生成墓碑事件。 |
|
连接器启动后在快照之前应该等待的间隔(以毫秒为单位); |
|
|
|
指定在拍摄快照时应从每个集合中一次性读取的最大文档数。连接器将以这个大小的多个批次读取集合内容。 |
以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。
财产 | 默认的 | 描述 |
---|---|---|
|
|
正整数值,指定阻塞队列的最大大小,从数据库日志中读取的更改事件在写入Kafka之前被放置在其中。开云体育电动老虎机例如,当写入Kafka较慢或Kafka不可用时,该队列可以为oplog读取器提供反压力。出现在队列中的事件不包括在此连接器定期记录的偏移量中。属性中指定的最大批处理大小,默认值为8192 |
|
|
正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。 |
|
|
正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。 |
|
|
正整数值,指定在第一次连接尝试失败或没有主节点可用时试图重新连接到主节点时的初始延迟。缺省值为1秒(1000毫秒)。 |
|
|
正整数值,指定在多次连接尝试失败或没有主服务器可用时试图重新连接到主服务器时的最大延迟。默认值为120秒(120,000毫秒)。 |
|
|
正整数值,指定在发生异常和任务中止之前尝试连接主副本集失败的最大次数。默认值为16 |
|
|
布尔值,用于指定mongodb中的地址。hosts' are seeds that should be used to discover all members of the cluster or replica set ( |
|
v2 |
的架构版本 |
|
|
控制心跳消息的发送频率。 将此参数设置为 |
|
|
控制要向其发送心跳消息的主题的命名。 |