开云体育官方注册网址MongoDB的Debezium连接器

开云体育官方注册网址Debezium的MongoDB连接器跟踪MongoDB副本集或MongoDB分片集群,以记录数据库和集合中的文档更改,并将这些更改记录为Kafka主题中的事件。开云体育电动老虎机连接器自动处理分片集群中分片的添加或删除、每个副本集成员的更改、每个副本集中的选举以及等待通信问题的解决。

有关与此连接器兼容的MongoDB版本的信息,请参见开云体育官方注册网址Debezium发布概述

概述

MongoDB的复制机制提供了冗余和高可用性,是在生产环境中运行MongoDB的首选方式。MongoDB连接器捕获复制集或分片集群中的更改。

一个MongoDB副本集由一组服务器组成,这些服务器都有相同数据的副本,复制确保客户端对副本集中的文档所做的所有更改主要的正确地应用于其他复制集的服务器,称为次要的人.MongoDB复制的工作原理是让主服务器记录其数据库中的更改oplog(或操作日志),然后每个辅助服务器读取主服务器的oplog,并按顺序将所有操作应用于它们自己的文档。当一个新的服务器被添加到一个副本集时,该服务器首先执行一个快照然后读取主数据库的oplog开云体育电动老虎机,以应用自快照开始以来可能发生的所有更改。当这个新服务器赶上主服务器的oplog的尾部时,它将成为辅助服务器(并能够处理查询)。

改变流

Debe开云体育官方注册网址zium MongoDB连接器使用与上面描述的复制机制类似的复制机制,尽管它实际上不会成为复制集的成员。主要的区别是连接器不直接读取oplog,而是将捕获和解码oplog委托给MongoDB改变流特性。通过更改流,MongoDB服务器将更改作为事件流公开给集合。Debe开云体育官方注册网址zium连接器监视流并向下游交付更改。并且,当连接器第一次看到一个副本集时,它会查看oplog以获得最后记录的事务,然后执行主数据库和集合的快照。开云体育电动老虎机复制完所有数据后,连接器将从之前从oplog中读取的位置创建一个更改流。

当MongoDB连接器进程发生变化时,它会定期记录事件在oplog/流中的起源位置。当连接器停止时,它会记录它处理的最后一个oplog/流位置,因此在重新启动时,它只是从该位置开始流。换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,它将准确地从停止的地方恢复,而不会丢失任何事件。当然,MongoDB的oplog通常被限制在最大大小,这意味着连接器不应该停止太长时间,否则在连接器有机会读取它们之前,oplog中的一些操作可能会被清除。在这种情况下,在重新启动时,连接器将检测缺失的oplog操作,执行快照,然后继续对更改进行流式处理。

MongoDB连接器还可以容忍副本集的成员和领导、分片集群中分片的添加或删除以及可能导致通信故障的网络问题的更改。连接器总是使用副本集的主节点来流化更改,因此当副本集经历一次选举并且不同的节点成为主节点时,连接器将立即停止流化更改,连接到新的主节点,并使用新的主节点开始流化更改。同样地,如果连接器遇到任何与复制集主通信的问题,它将尝试重新连接(使用指数回退,以避免淹没网络或复制集),并继续从它上次离开的地方传输更改。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。

MongoDB连接器如何工作

对连接器支持的MongoDB拓扑的概述对于规划应用程序非常有用。

在配置和部署MongoDB连接器时,它首先连接到种子地址上的MongoDB服务器,并确定关于每个可用副本集的详细信息。由于每个复制集都有自己独立的oplog,连接器将尝试为每个复制集使用单独的任务。连接器可以限制它将使用的任务的最大数量,如果没有足够的任务可用,连接器将为每个任务分配多个副本集,尽管任务仍将为每个副本集使用单独的线程。

在对分片集群运行连接器时,使用值tasks.max这大于复制集的数量。这将允许连接器为每个副本集创建一个任务,并让Kafka Connect在所有可用的工作进程之间协调、分布和管理任务。

支持的MongoDB拓扑

MongoDB连接器支持以下MongoDB拓扑:

MongoDB副本集

Debe开云体育官方注册网址zium MongoDB连接器可以从单个连接器捕获更改MongoDB副本集.生产副本集需要最少的至少三名成员

要使用带有副本集的MongoDB连接器,请提供一个或多个副本集服务器的地址为种子地址通过连接器mongodb.hosts财产。连接器将使用这些种子连接到复制集,然后一旦连接,将从复制集获得完整的成员集以及哪个成员是主成员。连接器将启动连接到主服务器的任务,并从主服务器的oplog捕获更改。当副本集选择一个新的主副本时,任务将自动切换到新的主副本。

当MongoDB由代理连接时(例如在OS X或Windows上使用Docker),当客户端连接到副本集并发现成员时,MongoDB客户端将排除代理作为有效成员,并尝试直接连接到成员,而不是通过代理连接,但失败。

在这种情况下,将连接器设置为可选mongodb.members.auto.discover配置属性为若要指示连接器放弃成员关系发现,而只需使用第一个种子地址(通过mongodb.hosts属性)作为主节点。这可能会起作用,但当选举发生时,仍然会产生原因问题。

MongoDB分片集群

一个MongoDB分片集群包括:

  • 一个或多个碎片,每一个都作为复制集部署;

  • 一个单独的副本集,充当集群的副本集配置服务器

  • 一个或多个路由器(也称为蒙戈),客户端连接并将请求路由到适当的分片

    要使用带分片集群的MongoDB连接器,请使用配置服务器副本集。当连接器连接到这个复制集时,它会发现自己正在充当一个分片集群的配置服务器,发现关于集群中用作分片的每个复制集的信息,然后启动一个单独的任务来捕获每个复制集的更改。如果向集群中添加了新的分片或删除了现有的分片,连接器将自动相应地调整其任务。

MongoDB独立服务器

MongoDB连接器不能监视独立MongoDB服务器的更改,因为独立服务器没有oplog。如果将独立服务器转换为具有一个成员的副本集,则连接器将正常工作。

MongoDB不建议在生产环境中运行独立服务器。有关更多信息,请参见MongoDB文档

逻辑连接器名称

连接器配置属性topic.prefix作为逻辑名用于MongoDB副本集或分片集群。连接器以多种方式使用逻辑名称:作为所有主题名称的前缀,以及在记录每个副本集的变更流位置时作为唯一标识符。

您应该为每个MongoDB连接器提供一个唯一的逻辑名称,该名称可以有意义地描述源MongoDB系统。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。

执行快照

当一个任务使用复制集启动时,它使用连接器的逻辑名和复制集名来查找抵消它描述连接器先前停止读取更改的位置。如果可以找到一个偏移量,并且它仍然存在于oplog中,那么任务立即继续执行流的变化,从记录的偏移位置开始。

但是,如果没有发现偏移量,或者如果oplog不再包含该位置,任务必须首先通过执行命令获取复制集内容的当前状态快照.这个过程首先记录oplog的当前位置,并将其记录为偏移量(以及一个表示快照已经启动的标志)。然后,该任务将继续复制每个集合,生成尽可能多的线程(直到snapshot.max.threads配置属性)以并行地执行此工作。连接器将记录一个单独的读取事件对于它看到的每个文档,读取事件将包含对象的标识符、对象的完整状态和关于该对象所在的MongoDB副本集的信息。源信息还将包括一个标志,表示快照期间产生的事件。

此快照将继续,直到复制了与连接器筛选器匹配的所有集合为止。如果连接器在任务的快照完成之前停止,重新启动时连接器将再次开始快照。

在连接器执行任何副本集的快照时,尽量避免重新分配和重新配置任务。连接器生成日志消息以报告快照的进度。为了提供最好的控制,为每个连接器运行一个单独的Kafka Connect集群。

Ad hoc快照

默认情况下,连接器仅在第一次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据都只通过流处理传入。

但是,在某些情况下,连接器在初始快照期间获得的数据可能会过时、丢失或不完整。为了提供一种重新捕获收集数据的机制,Debezium包含了一个执行临时快照的选项。开云体育官方注册网址数据库中的以下更改可能会导致执行临时快照:开云体育电动老虎机

  • 连接器配置被修改为捕获一组不同的集合。

  • Kafka主题被删除,必须重新构建。

  • 数据损坏是由于配置错误或其他问题造成的。

您可以为以前捕获快照的集合重新运行快照,方法是启动所谓的特别的快照.临时快照需要使用信号集合.您可以通过向Debezium信令集合发送信号请求来启动一个临时快照。开云体育官方注册网址

当您初始化现有集合的临时快照时,连接器将内容追加到已存在的集合主题。如果先前存在的主题被删除,Debezium可以自动创建一个主题开云体育官方注册网址自动创建主题启用。

临时快照信号指定快照中包含的集合。快照可以捕获数据库的全部内容,也可以仅捕获数据库中集合的一个子集。开云体育电动老虎机

类型指定要捕获的集合execute-snapshot消息发送给信令集合。属性的类型execute-snapshot信号增量,并提供要包含在快照中的集合的名称,如下表所示:

表1。ad hoc的例子execute-snapshot信号记录
默认的 价值

类型

增量

指定要运行的快照类型。
可选设置类型。目前,您只能请求增量快照。

数据收集

N/A

包含与要快照的集合的完全限定名称匹配的正则表达式的数组。
名称的格式与signal.data.collection配置选项。

触发临时快照

属性添加项来初始化临时快照execute-snapshot信令集合的信号类型。连接器处理消息后,开始快照操作。快照进程读取第一个和最后一个主键值,并使用这些值作为每个集合的起点和终点。根据集合中的条目数量和配置的块大小,Debezium将集合划分为块,然后依次对每个块进行快照,每次快照一个。开云体育官方注册网址

目前,execute-snapshot动作类型触发器增量快照只有。有关更多信息,请参见增量快照

增量快照

为了提供管理快照的灵活性,Debezium包含了一个补充的快照机制,称为开云体育官方注册网址增量快照.增量快照依赖于Debezium机制开云体育官方注册网址发送信号到Debezium连接器开云体育官方注册网址.增量快照基于DDD-3设计文档。

在增量快照中,Debezium不像初始快照那样一次性捕获数据库的全部状态,而是以一系列可配置块的形式分阶段捕获每个集合。开云体育官方注册网址开云体育电动老虎机可以指定希望快照捕获的集合和每个块的大小.块大小决定了快照在数据库上的每次获取操作期间收集的行数。开云体育电动老虎机增量快照的默认chunk大小为1kb。

随着增量快照的进行,Debezium使用水印来跟踪它的进度,维护它捕获的每开云体育官方注册网址个收集行的记录。与标准的初始快照过程相比,这种分阶段捕获数据的方法具有以下优点:

  • 您可以在流数据捕获的同时并行运行增量快照,而不是将流数据延迟到快照完成。在整个快照过程中,连接器继续从更改日志中捕获接近实时的事件,并且两个操作都不会阻塞另一个操作。

  • 当增量快照进程中断时,可以恢复增量快照,不会丢失数据。进程恢复后,快照将从它停止的位置开始,而不是从开始重新捕获集合。

  • 您可以在任何时候按需运行增量快照,并根据需要重复该过程以适应数据库更新。开云体育电动老虎机例如,在修改连接器配置以向其添加集合之后,可以重新运行快照collection.include.list财产。

增量快照流程

运行增量快照时,Debezium会根据主键对每个集合排序,然后根据主键将集合开云体育官方注册网址分割成块配置的块大小.一个块接一个块地工作,然后捕获一个块中的每个集合行。对于它捕获的每一行,快照都会发出一个事件。该事件表示块快照开始时行的值。

随着快照的进行,其他进程可能会继续访问数据库,可能会修改收集记录。开云体育电动老虎机为了反映这些变化,插入更新,或删除操作像往常一样提交到事务日志中。类似地,正在进行的Debezium流处理继续检开云体育官方注册网址测这些更改事件,并向Kafka发出相应的更改事件记录。

Debe开云体育官方注册网址zium如何解决具有相同主键的记录之间的冲突

在某些情况下,更新删除流流程发出的事件按顺序接收。也就是说,流处理过程可能在快照捕获包含集合行的块之前发出修改集合行的事件事件。当快照最终发出相应的事件时,其值已被取代。为了确保按正确的逻辑顺序处理不按顺序到达的增量快照事件,Debezium采用了一种缓冲方案来解决冲突。开云体育官方注册网址只有在快照事件和流事件之间的冲突被解决之后,Debezium才会向Kafka发送事件记录。开云体育官方注册网址

快照窗口

协助解决晚到车辆之间的碰撞事件和流事件修改同一集合行,Debezium采用了所谓的开云体育官方注册网址快照窗口.快照窗口定义了增量快照为指定的收集块捕获数据的时间间隔。在块的快照窗口打开之前,Debezium遵循其通常的行为,从事务日志中直接向下游的目标Kaf开云体育官方注册网址ka主题发送事件。但是从特定块的快照打开的那一刻起,直到它关闭,Debezium执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。开云体育官方注册网址

对于每个数据收集,Debezium会发出两种类型的事件,并将开云体育官方注册网址它们的记录存储在一个目的地Kafka主题中。它直接从表捕获的快照记录发出为操作。同时,随着用户继续更新数据收集中的记录,事务日志被更新以反映每次提交,Debezium就会发出开云体育官方注册网址更新删除每个更改的操作。

当快照窗口打开时,Debezium开始处理快照块,它将快照记录传递到开云体育官方注册网址内存缓冲区。的主键在快照窗口期间缓冲区中的事件与传入流事件的主键进行比较。如果没有匹配到,流事件记录将直接发送到Kafka。如果D开云体育官方注册网址ebezium检测到匹配,它将丢弃缓冲事件,并将流记录写入目标主题,因为流事件在逻辑上取代静态快照事件。块的快照窗口关闭后,缓冲区仅包含不存在相关事务日志事件的事件。开云体育官方注册网址Debezium释放这些剩余的事件的集合的卡夫卡主题。

连接器对每个快照块重复该过程。

增量快照目前仅支持单个副本集部署。这个限制将在下一个版本中被移除。

触发增量快照

目前,创建增量快照的唯一方式是发送自组织快照信号到源数据库上的信令集合。开云体育电动老虎机通过使用MongoDB向信令集合提交一个信号insert ()方法。

在Debez开云体育官方注册网址ium检测到信号集合中的变化后,它会读取信号,并运行所请求的快照操作。

提交的查询指定要包含在快照中的集合,还可以指定快照操作的类型。目前,快照操作的唯一有效选项是默认值,增量

若要指定要包含在快照中的集合,请提供数据收集列出集合的数组或用于匹配集合的正则表达式数组,例如:
{“数据收集”:["。文物”、“公众。Collection2 ")}

数据收集增量快照信号阵列没有默认值。如果数据收集数组为空时,Debezium检测开云体育官方注册网址到不需要任何操作,因此不执行快照。

如果要包含在快照中的集合的名称包含点(的名称,以将集合添加到开云体育电动老虎机数据收集数组时,必须用双引号转义名称的每个部分。

类中存在的数据集合公共开云体育电动老虎机数据库,它有名称MyCollection,请使用以下格式:“公共”。“MyCollection”

先决条件
过程
  1. 在信令集合中插入一个快照信号文档:

    < signalDataCollection >.insert({”id": _“类型”:< snapshotType >, "data": {"data-collections" ["< collectionName >”、“< collectionName >”,“类型”:< snapshotType >}});

    例如,

    db.开云体育官方注册网址debeziumSignal.insert ({(1)"type": "execute-snapshot",(2)(3)"data": {"data-collections" ["\"public\"。\“文物\””、“\“公共\”,\“Collection2 \],(4)“类型”:“增量”}(5)});

    的值id类型,数据命令中的参数对应信令集合字段

    样例中参数说明如下:

    表2。MongoDB insert()命令中向信令集发送增量快照信号的字段说明
    价值 描述

    1

    db.开云体育官方注册网址debeziumSignal

    指定源数据库上信令集合的全限定名。开云体育电动老虎机

    2

    _id参数指定分配为id信号请求的标识符。
    前面示例中的insert方法省略了optional的使用_id参数。由于文档没有显式地为该参数分配值,MongoDB自动分配给文档的任意id就变成了id信号请求的标识符。
    使用此字符串标识信令集合中条目的日志消息。开云体育官方注册网址Debezium不使用这个标识符字符串。相反,在快照期间,Debezium生成自己的快照开云体育官方注册网址id字符串作为水印信号。

    3.

    execute-snapshot

    指定类型参数指定信号要触发的操作。

    4

    数据收集

    的必需组件数据信号的字段,该字段指定集合名称或正则表达式的数组,以匹配要包含在快照中的集合名称。
    对象中指定连接器的信令集合名称时使用的格式相同,该数组列出了按全限定名匹配集合的正则表达式signal.data.collection配置属性。

下面的示例显示了连接器捕获的增量快照事件的JSON。

[示例]增量快照事件消息
{“前”:零,“后”:{“pk”:“1”,“价值”:“新数据”},“源”:{…“快照”:“增量”(1)},“人事处”:“r”,(2)"ts_ms":"1620393591654", "transaction":null}
字段名 描述

1

快照

指定要运行的快照操作的类型。
目前,唯一有效的选项是默认值,增量
指定一个类型您提交给信令集合的SQL查询中的值是可选的。
如果不指定值,连接器将运行增量快照。

2

人事处

事件类型。
快照事件的值为r,表示操作。

停止增量快照

您还可以通过向源数据库上的集合发送信号来停止增量快照。开云体育电动老虎机通过向信令集合中插入文档来提交停止快照信号。在Debez开云体育官方注册网址ium检测到信号集合中的变化后,它会读取信号,如果增量快照操作正在进行,它会停止增量快照操作。

提交的查询指定的快照操作增量,以及要删除的当前运行快照的集合(可选)。

先决条件
过程
  1. 在信令集合中插入一个停止快照信号文档:

    < signalDataCollection >.insert({”id": _,"type": "stop-snapshot", "data": {"data-collections" ["< collectionName >”、“< collectionName >”,“类型”:“增量”}});

    例如,

    db.开云体育官方注册网址debeziumSignal.insert ({(1)"type": "stop-snapshot",(2)(3)"data": {"data-collections" ["\"public\"。\“文物\””、“\“公共\”,\“Collection2 \],(4)“类型”:“增量”}(5)});

    的值id类型,数据signal命令中的参数对应信令集合字段

    样例中参数说明如下:

    表3。向信令集合发送停止增量快照文档时,插入命令中字段的说明
    价值 描述

    1

    db.开云体育官方注册网址debeziumSignal

    指定源数据库上信令集合的全限定名。开云体育电动老虎机

    2

    前面示例中的insert方法省略了optional的使用_id参数。由于文档没有显式地为该参数分配值,MongoDB自动分配给文档的任意id就变成了id信号请求的标识符。
    使用此字符串标识信令集合中条目的日志消息。开云体育官方注册网址Debezium不使用这个标识符字符串。

    3.

    stop-snapshot

    类型参数指定信号要触发的操作。

    4

    数据收集

    的可选组件数据信号的字段,该字段指定集合名称或正则表达式的数组,以匹配要从快照中删除的集合名称。
    对象中指定连接器的信令集合名称时使用的格式相同,该数组列出了按全限定名匹配集合的正则表达式signal.data.collection配置属性。如果这个分量数据字段,则信号停止正在进行的整个增量快照。

    5

    增量

    的必需组件数据信号的字段,该字段指定要停止的快照操作的类型。
    目前,唯一有效的选项是增量
    如果没有指定类型值时,信号停止增量快照失败。

流的变化

在复制集的连接器任务记录了偏移量之后,它使用偏移量来确定在oplog中的位置,它应该从哪里开始流化更改。然后,任务(取决于配置)要么连接到复制集的主节点,要么连接到复制集范围的更改流,并从该位置开始流化更改。它处理所有的创建、插入和删除操作,并将它们转换为Debezium开云体育官方注册网址更改事件.每个更改事件都包括在oplog中发现操作的位置,连接器定期将其记录为最近的偏移量。记录偏移量的间隔由offset.flush.interval.ms,这是一个Kafka连接工人配置属性。

当连接器优雅地停止时,将记录处理的最后一个偏移量,以便重新启动时,连接器将准确地继续它停止的位置。然而,如果连接器的任务意外终止,那么任务可能在它最后一次记录偏移量之后,但在最后一次记录偏移量之前处理并生成了事件;重新启动时,连接器从最后一个开始记录偏移,可能会生成一些与之前在崩溃之前生成的事件相同的事件。

当一切正常运行时,Kafka消费者将真正看到每条消息只有一天.然而,当出现问题时,Kafka只能保证消费者能够看到每一条信息至少一次.因此,您的客户需要预期看到不止一次的消息。

如上所述,连接器任务总是使用复制集的主节点来传输来自oplog的更改,从而确保连接器尽可能看到最新的操作,并且能够以比使用辅助节点更低的延迟捕获更改。当副本集选择一个新的主节点时,连接器立即停止流化更改,连接到新的主节点,并从相同位置的新主节点开始流化更改。同样地,如果连接器遇到任何与复制集成员通信的问题,它会尝试重新连接,使用指数回退以避免淹没复制集,并且一旦连接,它将继续从上次离开的地方进行流更改。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。

总之,MongoDB连接器在大多数情况下继续运行。通信问题可能导致连接器等待问题解决。

原像支持

从MongoDB 6.0开始,您可以配置变更流以发出预映像来填充之前字段为Mongo更改事件。要使用预映像,首先需要启用changeStreamPreAndPostImages对于使用db.createCollection ()创建,或collMod.和配置capture.mode作为* _with_pre_image选项。

孟更改流事件的大小限制

由于Mongo更改流事件的大小限制为16兆字节,启用预映像将增加达到此限制并导致失败的可能性。为了避免这种情况,请参考Mongo的官方文档有关建议及详情。

主题名称

MongoDB连接器将对每个集合中的文档的所有插入、更新和删除操作的事件写入到单个Kafka主题。卡夫卡主题的名称总是采用这种形式logicalName开云体育电动老虎机数据库名collectionName,在那里logicalName逻辑名属性指定的连接器的topic.prefix配置属性,开云体育电动老虎机数据库名发生操作的数据库名称和开云体育电动老虎机collectionName是受影响文档所在的MongoDB集合的名称。

例如,考虑一个MongoDB副本集库存开云体育电动老虎机数据库包含四个集合:产品products_on_hand客户,订单.如果监视此数据库的连接器被赋予的逻辑名称为开云体育电动老虎机实现,那么连接器将在这四个Kafka主题上产生事件:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

注意,主题名称不包含复制集名称或碎片名称。因此,对切分集合(其中每个切分包含集合文档的子集)的所有更改都转到相同的Kafka主题。

你可以设置卡夫卡自动创建根据需要选择题目。如果不是,那么你必须在启动连接器之前使用Kafka管理工具来创建主题。

分区

MongoDB连接器没有明确决定如何为事件划分主题。相反,它允许Kafka根据事件键来决定如何划分主题。的名称可以改变Kafka的分区逻辑瓜分者在Kafka Connect worker配置中实现。

Kafka只维护写入单个主题分区的事件的总顺序。按键划分事件意味着具有相同键的所有事件总是进入相同的分区。这确保了特定文档的所有事件总是完全有序的。

事务的元数据

开云体育官方注册网址Debezium可以生成表示事务元数据边界的事件,并丰富更改数据事件消息。

Debezium何时接收事务元开云体育官方注册网址数据的限制

开云体育官方注册网址Debezium仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

每笔交易开始而且结束, 开云体育官方注册网址Debezium生成一个包含以下字段的事件:

状态

开始结束

id

唯一事务标识符的字符串表示形式。

event_count(结束事件)

事务发出的事件总数。

data_collections(结束事件)

对的数组data_collection而且event_count它提供了由来自给定数据收集的更改所发出的事件数。

下面的例子显示了一个典型的消息:

{"status": "BEGIN", "id": "1462833718356672513", "event_count": null, "data_collections": null} {"status": "END", "id": "1462833718356672513", "event_count": 2, "data_collections": [{"data_collection": "rs0.testDB. "collectiona", "event_count": 1}, {"data_collection": "rs0.testDB. collectiona", "event_count": 1}, {"data_collection": "rs0.testDB. "collection ", "event_count": 1}]}

方法覆盖topic.transaction选项时,事务事件被写入指定的主题< topic.prefix >.transaction

更改数据事件丰富

启用事务元数据时,数据消息信封是充实了新的事务字段。这个字段以复合字段的形式提供关于每个事件的信息:

id

唯一事务标识符的字符串表示形式。

total_order

事件在事务生成的所有事件中的绝对位置。

data_collection_order

事件在事务发出的所有事件中的每数据收集位置。

下面是一条消息的示例:

{“后”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“安妮\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,“源”:{…}, "op": "c", "ts_ms": "1580390884335", "transaction": {"id": "1462833718356672513", "total_order": "1", "data_collection_order": "1"}}

数据变更事件

Debe开云体育官方注册网址zium MongoDB连接器为插入、更新或删除数据的每个文档级操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于所更改的集合。

开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。

下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器并配置它来生成所有四个基本的更改事件部分,则更改事件的结构如下:

{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
表4。变更事件基本内容概述
字段名 描述

1

模式

第一个模式字段是事件键的一部分。它指定了一个Kafka Connect模式,用来描述事件键的内容有效载荷部分。换句话说,是第一个模式字段描述已更改文档的键的结构。

2

有效载荷

第一个有效载荷字段是事件键的一部分。它具有前面所描述的结构模式字段,它包含已更改文档的键。

3.

模式

第二个模式字段是事件值的一部分。它指定Kafka Connect模式,描述事件值中的内容有效载荷部分。换句话说,是第二种模式描述已更改文档的结构。通常,这个模式包含嵌套的模式。

4

有效载荷

第二个有效载荷字段是事件值的一部分。它具有前面所描述的结构模式字段,它包含已更改文档的实际数据。

默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始集合相同。看到主题名称

MongoDB连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符必须是拉丁字母、数字或下划线,即a-z、a-z、0-9或\_。开云体育电动老虎机如果存在无效字符,则将其替换为下划线字符。

如果逻辑服务器名称、数据库名称或集合名称包含无效字符,并且区分名称之间的唯一字符无效,因此用下划线替换,则可能导致意外冲突。开云体育电动老虎机

更改事件键

更改事件的键包含已更改文档的键的模式和已更改文档的实际键。对于给定的集合,模式及其相应的有效负载都包含一个单一的id字段。此字段的值是文档的标识符,表示为派生的字符串MongoDB扩展JSON序列化的严格模式

考虑逻辑名为的连接器实现对象的复制集库存开云体育电动老虎机数据库,以及客户集合,其中包含以下文档。

示例文档
{"_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org"}
更改事件键的示例

对象的更改的每个更改事件客户集合具有相同的事件键模式。只要客户集合具有前面的定义,则捕获对的更改的每个更改事件客户集合具有以下关键结构。在JSON中,它看起来是这样的:

{"schema": {(1)"type": "struct", "name": "fulfillment.inventory.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){“字段”:“id”,“类型”:“弦”、“可选”:假}},“有效载荷”:{(5)"id": "1004"}}
表5所示。更改事件键的描述
字段名 描述

1

模式

密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容有效载荷部分。

2

fulfillment.inventory.customers.Key

定义键的有效负载结构的模式的名称。此模式描述已更改文档的键的结构。关键模式名具有该格式connector-name开云体育电动老虎机数据库名称集合名称关键.在这个例子中:

  • 实现生成此事件的连接器的名称。

  • 库存包含已更改的集开云体育电动老虎机合的数据库。

  • 客户包含已更新文档的集合。

3.

可选

指示事件键是否必须在其中包含值有效载荷字段。在本例中,需要键的有效负载中的值。当文档没有键时,键的有效载荷字段中的值是可选的。

4

字段

属性中期望的每个字段有效载荷,包括每个字段的名称、类型以及是否需要。

5

有效载荷

包含为其生成此更改事件的文档的键。在本例中,键包含一个单键id类型字段字符串的值为1004

本例使用一个具有整数标识符的文档,但任何有效的MongoDB文档标识符都以相同的方式工作,包括文档标识符。对于文档标识符,事件键的值payload.idValue是一个字符串,表示更新后的文档的原始内容_id字段作为使用严格模式的MongoDB扩展JSON序列化。下表提供了不同类型的_id字段被表示出来。

表6所示。表示文档的示例_id事件关键有效载荷中的字段
类型 MongoDB_id价值 关键的负载

整数

1234

{"id": "1234"}

浮动

12.34

{"id": "12.34"}

字符串

“1234”

{"id": "\"1234\""}

文档

{“嗨”:“卡夫卡”、“num”:(10.0,100.0,1000.0)}

{" id ": "{\“嗨\”,\“卡夫卡\”,\“num \”:[10.0,100.0,1000.0]}"}

ObjectId

ObjectId(“596 e275826f08b2730779e1f”)

{" id ": "{\“美元oid \”:\ " 596 e275826f08b2730779e1f \ "} "}

二进制

BinData(“a2Fma2E = ", 0)

{" id ": "{\“二进制美元\”:\“a2Fma2E = \”,\“美元类型\”:\ " 00 \ "}"}

更改事件值

change事件中的值比键稍微复杂一些。和键一样,值也有模式Section和a有效载荷部分。的模式类的模式信封的结构有效载荷节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。

考虑用于显示更改事件键示例的相同示例文档:

示例文档
{"_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org"}

本文档更改的更改事件的值部分针对每种事件类型进行了描述:

创建事件

对象中创建数据的操作所生成的更改事件的值部分,示例如下客户集合:

{"schema": {(1)“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”,开云体育官方注册网址(2)“版本”:1、“字段”:“后”},{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“补丁”},{“开云体育官方注册网址类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:"snapshot"}, {"type": "string", "optional": false, "field": "db"}, {"type": "string", "optional": false, "field": "rs"}, {"type": "int32", "optional": false, "field": "ord"}, {"type": "int64", "optional": true, "field": "h"}], "optional": false, "name": "io. debezum .connector.mon . source ",开云体育官方注册网址(3)"field": "source"}, {"type": "string", "optional": true, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "dbserver1.inventory.customers.Envelope"(4)}, "有效载荷":{(5)“后”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“安妮\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,(6)“源”:{(7)“版本”:“2.1.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999}, "op": "c",(8)“ts_ms”:1558965515240(9)}}
表7所示。的描述创建事件值字段
字段名 描述

1

模式

值的模式,它描述值的有效负载的结构。连接器为特定集合生成的每个更改事件中,更改事件的值模式都是相同的。

2

的名字

模式节中,每的名字Field指定值的有效负载中字段的模式。

io.开云体育官方注册网址debezium.data.Json有效负载的模式是补丁,过滤器字段。此模式特定于客户收集。一个创建事件是唯一一种包含字段。一个更新事件包含过滤器场和一个补丁字段。一个删除事件包含过滤器场,而不是一个场或补丁字段。

3.

的名字

io.开云体育官方注册网址debezium.connector.mongo.Source有效负载的模式是字段。这个模式是特定于MongoDB连接器的。连接器将其用于生成的所有事件。

4

的名字

dbserver1.inventory.customers.Envelope有效负载的总体结构的模式在哪里dbserver1是连接器名称,库存是数据库,和开云体育电动老虎机客户是集合。此模式特定于集合。

5

有效载荷

该值为实际数据。这是变更事件提供的信息。

事件的JSON表示形式似乎比它们所描述的文档要大得多。这是因为JSON表示必须包括消息的模式和有效负载部分。然而,通过使用Avro转换器,你可以显著减少连接器流到Kafka主题的消息的大小。

6

可选字段,指定事件发生后文档的状态。在本例中,字段包含新文档的值_idfirst_namelast_name,电子邮件字段。的Value总是一个字符串。按照惯例,它包含文档的JSON表示形式。MongoDB的oplog条目只包含_create_事件和更新事件,当capture.mode选项设置为change_streams_update_full;换句话说,a创建事件是唯一一种包含场无关capture.mode选择。

7

描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:

  • 开云体育官方注册网址Debezium版本。

  • 生成事件的连接器的名称。

  • MongoDB副本集的逻辑名称,它为生成的事件形成一个名称空间,并用于连接器写入的Kafka主题名称中。

  • 包含新文档的集合和数据库的名称。开云体育电动老虎机

  • 如果事件是快照的一部分。

  • 在数据库中进行更改的时间戳,以及时间戳中事件的序号。开云体育电动老虎机

  • MongoDB操作的唯一标识符h字段)。

  • MongoDB会话的唯一标识符lsid和交易号txnNumber如果在事务中执行更改(仅限更改流捕获模式)。

8

人事处

返回string,描述导致连接器产生事件的操作类型。在这个例子中,c指示创建文档的操作。有效值为:

  • c=创建

  • u=更新

  • d=删除

  • r= read(仅适用于快照)

9

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

更新事件

更改流捕获模式

样例中更新的更改事件的值客户集合的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。一个更新事件有一个值仅当capture.mode选项设置为change_streams_update_full.一个之前值将被提供capture.mode选项设置为* _with_pre_image选择。这是一个新的结构化领域updateDescription在本例中有一些额外的字段:

  • updatedFields是包含更新的文档字段及其值的JSON表示的字符串字段吗

  • removedFields是从文档中删除的字段名的列表

  • truncatedArrays文档中的数组列表被截断了吗

中的更新中连接器生成的事件中的更改事件值的示例客户集合:

{"schema":{…}, "payload": {"op": "u",(1)“ts_ms”:1465491461815,(2)“之前”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“未知\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,(3)“后”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“安妮玛丽\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,(4)"updateDescription": {"removedFields": null, "updatedFields": "{\"first_name\": \"Anne Marie\"}",(5)"truncatedArrays": null}, "source": {(6)“版本”:“2.1.2。最后”、“连接器”:“mongodb”、“名称”:“满足”、“ts_ms”:1558965508000,“快照”:假的,“分贝”:“库存”、“rs”:“rs0”、“收藏”:“客户”、“奥德”:1、“h”:null,”托德”:空,“stxnid”:null, lsid”:“{\ " id \ ":{\“二进制美元\”:\“FA7YEzXgQXSX9OxmzllH2w = = \”,\“美元类型\”:\}“04 \”,\“uid \”:{\“二进制美元\”:\“47 deqpj8hbsa + /记时显示+ 5 jceuqerkm5nmpjwzg3hsufu = \”,\“美元类型\”:\“00”\}}”、“txnNumber”:1}}}
表8所示。的描述更新事件值字段
字段名 描述

1

人事处

返回string,描述导致连接器产生事件的操作类型。在这个例子中,u指示更新文档的操作。

2

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

3.

之前

包含更改前实际MongoDB文档的JSON字符串表示形式。+一个更新事件值中不包含之前字段,如果捕获模式未设置为' *_with_preimage '选项之一。

4

包含实际MongoDB文档的JSON字符串表示形式。
一个更新事件值中不包含字段,如果捕获模式未设置为change_streams_update_full

5

updatedFields

包含文档更新字段值的JSON字符串表示形式。在本例中,更新更改了first_name字段转换为新值。

6

描述事件的源元数据的必填字段。该字段包含与a相同的信息创建事件,但是值不同,因为该事件来自oplog中的不同位置。源元数据包括:

  • 开云体育官方注册网址Debezium版本。

  • 生成事件的连接器的名称。

  • MongoDB副本集的逻辑名称,它为生成的事件形成一个名称空间,并用于连接器写入的Kafka主题名称中。

  • 包含已更新文档的集合和数据库的名称。开云体育电动老虎机

  • 如果事件是快照的一部分。

  • 在数据库中进行更改的时间戳,以及时间戳中事件的序号。开云体育电动老虎机

  • MongoDB会话的唯一标识符lsid和交易号txnNumber以防在事务中执行更改。

事件中的值应该作为文档在时间点的值处理。该值不是动态计算的,而是从集合中获得的。因此,如果多个更新紧跟着一个又一个,这是可能的更新更新事件将包含相同的内容值,该值将表示存储在文档中的最后一个值。

如果您的应用程序依赖于渐进的变化演进,那么您应该依赖于updateDescription只有。

删除事件

的值删除改变事件有相同之处模式部分为创建而且更新事件。的有效载荷的一部分删除事件包含的值与创建而且更新事件。特别地,删除事件既不包含价值也不是updateDescription价值。这里有一个例子删除属性中的文档的客户集合:

{"schema":{…}, "payload": {"op": "d",(1)“ts_ms”:1465495462115,(2)“之前”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“安妮玛丽\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,(3)“源”:{(4)“版本”:“2.1.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
表9所示。的描述删除事件值字段
字段名 描述

1

人事处

返回string,描述操作类型。的人事处字段值为d,表示该文档已被删除。

2

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

3.

之前

包含更改前实际MongoDB文档的JSON字符串表示形式。+一个更新事件值中不包含之前字段,如果捕获模式未设置为' *_with_preimage '选项之一。

4

描述事件的源元数据的必填字段。该字段包含与a相同的信息创建更新事件,但是值不同,因为该事件来自oplog中的不同位置。源元数据包括:

  • 开云体育官方注册网址Debezium版本。

  • 生成事件的连接器的名称。

  • MongoDB副本集的逻辑名称,它为生成的事件形成一个名称空间,并用于连接器写入的Kafka主题名称中。

  • 包含已删除文档的集合和数据库的名称。开云体育电动老虎机

  • 如果事件是快照的一部分。

  • 在数据库中进行更改的时间戳,以及时间戳中事件的序号。开云体育电动老虎机

  • MongoDB操作的唯一标识符h字段)。

  • MongoDB会话的唯一标识符lsid和交易号txnNumber如果在事务中执行更改(仅限更改流捕获模式)。

MongoDB连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。

墓碑上的事件

唯一标识文档的所有MongoDB连接器事件都具有完全相同的键。删除文档时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有该键的消息,消息值必须为.为了实现这一点,在Debezium的MongoDB连接器发开云体育官方注册网址出一个删除事件时,连接器发出一个特殊的墓碑事件,该事件具有相同的键,但具有价值。墓碑事件通知Kafka所有具有相同键的消息都可以被删除。

安装MongoDB

MongoDB连接器使用MongoDB的变更流来捕获变更,因此连接器只适用于MongoDB副本集或分片集群,其中每个分片都是一个单独的副本集。请参阅MongoDB文档来设置副本集分片集群.另外,一定要了解如何启用访问控制和认证使用复制集。

还必须有一个具有适当角色的MongoDB用户来读取管理开云体育电动老虎机可以读取oplog的数据库。此外,用户还必须能够读取配置开云体育电动老虎机数据库在配置服务器的一个分片集群中必须有list开云体育电动老虎机Databases特权操作。当使用变更流(默认值)时,用户还必须具有集群范围的特权操作找到而且changeStream

当您打算利用预映像并填充之前字段时,您需要首先启用changeStreamPreAndPostImages对于使用db.createCollection ()创建,或collMod

云中的MongoDB

您可以为MongoDB使用Deb开云体育官方注册网址ezium连接器MongoDB阿特拉斯.注意,MongoDB Atlas仅支持通过SSL的安全连接,即+ mongodb.ssl.enabled连接器的选择必须设置为真正的

部署

要部署Debezium 开云体育官方注册网址MongoDB连接器,您需要安装Debezium MongoDB连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。

过程
  1. 下载连接器的插件存档

  2. 将JAR文件解压缩到Kafka Connect环境中。

  3. 将包含JAR文件的目录添加到卡夫卡连接的plugin.path

  4. 重新启动Kafka Connect进程以获取新的JAR文件。

如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Apache Zookeeper, Apache Kafka和Kafka Connect, MongoDB连接器已经安装并准备运行。

的Deb开云体育官方注册网址ezium教程引导您使用这些图像,这是了解Debezium的好方法。开云体育官方注册网址

MongoDB连接器配置示例

下面是一个从MongoDB副本集中捕获数据的连接器实例的配置示例rs0在192.168.99.100的27017端口,我们在逻辑上命名它fullfillment.通常,通过设置连接器可用的配置属性,可以在JSON文件中配开云体育官方注册网址置Debezium MongoDB连接器。

您可以选择为特定的MongoDB复制集或分片集群生成事件。您还可以选择过滤掉不需要的集合。

{"name": "inventory-connector",(1)"config": {"connector.class": "io.d开云体育官方注册网址ebezium.connector.mongodb.MongoDbConnector",(2)“mongodb。hosts": "rs0/192.168.99.100:27017",(3)”的话题。前缀”:“fullfillment”,(4)“collection.include。库存清单”:“。*”(5)}}
1 当我们向Kafka Connect服务注册连接器时,连接器的名称。
2 MongoDB连接器类的名称。
3. 用于连接到MongoDB复制集的主机地址。
4 逻辑名它为生成的事件形成了一个命名空间,并用于连接器写入的所有Kafka主题的名称、Kafka Connect模式名称以及使用Avro转换器时对应的Avro模式的名称空间。
5 与要监控的所有集合的命名空间(例如.)匹配的正则表达式列表。这是可选的。

有关可以为Debezium MongoDB连接器设置的配置属性的完整列表,请参见开云体育官方注册网址MongoDB连接器配置属性

您可以使用帖子命令到正在运行的Kafka Connect服务。该服务记录配置并启动一个执行以下操作的连接器任务:

  • 连接MongoDB复制集或分片集群。

  • 为每个副本集分配任务。

  • 如果需要,执行快照。

  • 读取更改流。

  • 流将事件记录更改为Kafka主题。

添加连接器配置

要开始运行Debezium Mongo开云体育官方注册网址DB连接器,请创建一个连接器配置,并将该配置添加到Kafka Connect集群。

先决条件
过程
  1. 为MongoDB连接器创建配置。

  2. 使用Kafka连接REST API将该连接器配置添加到Kafka Connect集群中。

结果

连接器启动后,完成以下操作:

  • 执行一致性快照在你的MongoDB复制集集合。

  • 读取副本集的更改流。

  • 为每个插入、更新和删除的文档生成更改事件。

  • 流将事件记录更改为Kafka主题。

连接器属性

Debe开云体育官方注册网址zium MongoDB连接器有许多配置属性,您可以使用这些属性为应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:

以下配置属性为要求除非有默认值可用。

表10。必需的Debezi开云体育官方注册网址um MongoDB连接器配置属性
财产 默认的 描述

没有默认的

连接器的唯一名称。尝试使用相同的名称再次注册将失败。(所有Kafka Connect连接器都需要这个属性。)

没有默认的

连接器的Java类的名称。始终使用值io.开云体育官方注册网址debezium.connector.mongodb.MongoDbConnector用于MongoDB连接器。

没有默认的

复制集中MongoDB服务器的主机名和端口对列表(以'host'或'host:port'的形式),以逗号分隔。该列表可以包含单个主机名和端口对。如果mongodb.members.auto.discover设置为,则主机和端口对应以副本集名称为前缀(例如,rs0 / localhost: 27017).

必须提供当前主地址。这个限制将在下一个Debezium发行版中被移除。开云体育官方注册网址

没有默认的

指定连接器在初始发现MongoDB复制集期间使用的连接字符串。要使用此选项,必须设置的值mongodb.members.auto.discover真正的.不设置此属性和mongodb.hosts同时,财产。

连接器仅在初始副本集发现过程中使用此连接字符串。在此发现过程中,连接器忽略在其他属性(mongodb.usermongodb.passwordmongodb.authsourceSSL配置属性,等等)。在发现过程完成后,当连接器试图建立到主复制集成员的直接连接时,连接器然后返回到使用标准连接属性,并忽略中的值mongodb.connection.string.但是,如果在配置的其他地方没有出现凭据信息,连接器可以从连接字符串的值中提取凭据信息。例如,如果mongodb.user属性没有设置,但是连接字符串包含MongoDB用户名,连接器从字符串中读取信息。

没有默认的

一个唯一的名称,用于标识连接器和/或该连接器监视的MongoDB副本集或分片集群。每个服务器应该由最多一个Debezium连接器监控,因为这个服务器名前缀了所有来自MongoD开云体育官方注册网址B副本集或集群的持久化Kafka主题。只使用字母数字字符、连字符、点和下划线组成名称。逻辑名在所有其他连接器中应该是唯一的,因为该名称被用作命名Kafka主题的前缀,从该连接器接收记录。

不要更改此属性的值。如果更改了name值,在重新启动后,连接器不会继续向原始主题发出事件,而是向名称基于新值的主题发出后续事件。

没有默认的

连接MongoDB时使用开云体育电动老虎机的数据库用户名。只有当MongoDB配置为使用身份验证时,才需要这样做。

没有默认的

连接MongoDB时使用的密码。只有当MongoDB配置为使用身份验证时,才需要这样做。

管理

开云体育电动老虎机包含MongoDB凭证的数据库(身份验证源)。只有当MongoDB配置为使用与另一个身份验证数据库的身份验证时,才需要这样做开云体育电动老虎机管理

连接器将使用SSL连接到MongoDB实例。

启用SSL时,此设置控制在连接阶段是否禁用严格的主机名检查。如果真正的该连接不会阻止中间人攻击。

空字符串

一个可选的、以逗号分隔的正则表达式列表,该列表与要监控的数据库名称匹配。开云体育电动老虎机默认情况下,监视所有数据库。开云体育电动老虎机
开云体育电动老虎机database.include.list时,连接器仅监视属性指定的数据库。开云体育电动老虎机其他数据库被开云体育电动老虎机排除在监控之外。

为了匹配数据库的名称,Debezium应用您指开云体育电动老虎机定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与数据库的整个名称字符串匹配;开云体育电动老虎机它不匹配可能出现在数据库名称中的子字符串。开云体育电动老虎机
如果在配置中包含此属性,则不要同时设置开云体育电动老虎机database.exclude.list财产。

空字符串

一个以逗号分隔的可选正则表达式列表,该列表与要排除在监视之外的数据库名称匹配。开云体育电动老虎机当开云体育电动老虎机database.exclude.list设置后,连接器将监视除属性指定的数据库之外的所有数据库。开云体育电动老虎机

为了匹配数据库的名称,Debezium应用您指开云体育电动老虎机定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与数据库的整个名称字符串匹配;开云体育电动老虎机它不匹配可能出现在数据库名称中的子字符串。开云体育电动老虎机
如果在配置中包含此属性,则不要设置开云体育电动老虎机database.include.list财产。

空字符串

一个可选的逗号分隔的正则表达式列表,它与要监控的MongoDB集合的完全限定名称空间匹配。对象中的集合之外的所有集合默认情况下,连接器监视当地的而且管理开云体育电动老虎机数据库。当collection.include.list时,连接器仅监视属性指定的集合。其他集合被排除在监视之外。集合标识符的格式为开云体育电动老虎机数据库名collectionName

为了匹配名称空间的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与命名空间的整个名称字符串匹配;它不匹配名称中的子字符串。
如果在配置中包含此属性,则不要同时设置collection.exclude.list财产。

空字符串

一个可选的逗号分隔的正则表达式列表,它与MongoDB集合的完全限定名称空间匹配,以排除在监视之外。当collection.exclude.list设置后,连接器将监视除属性指定的集合之外的所有集合。集合标识符的格式为开云体育电动老虎机数据库名collectionName

为了匹配名称空间的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与命名空间的整个名称字符串匹配;它不匹配可能出现在数据库名称中的子字符串。开云体育电动老虎机
如果在配置中包含此属性,则不要设置collection.include.list财产。

最初的

指定在连接器启动时执行快照的条件。将属性设置为以下值之一:

最初的

当连接器启动时,如果在偏移量主题中没有检测到值,则执行数据库快照。开云体育电动老虎机

从来没有

当连接器启动时,它将跳过快照过程,并立即开始为数据库记录到oplog的操作传输更改事件。开云体育电动老虎机

change_streams_update_full

指定用于从MongoDB服务器捕获更改的方法。默认为change_streams_update_full,并指定连接器通过MongoDB Change Streams机制捕获更改更新事件应该包含完整的文档。的change_streamsMode将使用相同的捕获方法,但是更新事件不会包含完整的文档。要捕获前置图像,可以使用前置图像版本的前两种捕获模式-change_streams_update_full_with_pre_image而且change_streams_with_pre_image

中指定的所有集合collection.include.list

匹配完全限定名称的可选、逗号分隔的正则表达式列表(<开云体育电动老虎机数据库名>< collectionName >)您希望包含在快照中的模式。指定的项必须在连接器的名称中命名collection.include.list财产。此属性仅在连接器为snapshot.mode属性的值设置为从来没有
此属性不影响增量快照的行为。

为了匹配模式的名称,Debezium应用您指定为的正则表达式开云体育官方注册网址锚定正则表达式。也就是说,指定的表达式与模式的整个名称字符串匹配;它不匹配可能出现在模式名称中的子字符串。

空字符串

应从更改事件消息值中排除的字段的全限定名称的可选列表,以逗号分隔。字段的完全限定名的格式为开云体育电动老虎机数据库名collectionName字段名nestedFieldName,在那里开云体育电动老虎机数据库名而且collectionName可以包含与任何字符匹配的通配符(*)。

空字符串

一个可选的、以逗号分隔的字段完全限定替换列表,用于重命名更改事件消息值中的字段。字段的完全合格替换是这样的开云体育电动老虎机数据库名collectionName字段名nestedFieldNamenewNestedFieldName,在那里开云体育电动老虎机数据库名而且collectionName可以包含与任何字符匹配的通配符(*),冒号(:)用于确定字段的重命名映射。下一个字段替换应用于列表中前一个字段替换的结果,因此在重命名位于同一路径的多个字段时请记住这一点。

1

应该为此连接器创建的最大任务数。MongoDB连接器将尝试为每个副本集使用单独的任务,因此在使用单个MongoDB副本集的连接器时,默认值是可接受的。当使用MongoDB分片集群的连接器时,我们建议指定一个等于或大于集群中分片数量的值,这样每个副本集的工作就可以通过Kafka Connect分配。

1

正整数值,指定用于对复制集中的集合执行初始同步的最大线程数。默认值为1。

真正的

控制是否删除事件之后是一个墓碑事件。

真正的-删除操作用a表示删除事件和后续的墓碑事件。

-只有一个删除事件被触发。

当一个源记录被删除后,触发一个墓碑事件(默认行为)允许Kafka完全删除与被删除行的键相关的所有事件日志压实已为主题启用。

没有默认的

连接器启动后在快照之前应该等待的毫秒间隔;
可用于在启动集群中的多个连接器时避免快照中断,这可能导致连接器的重新平衡。

0

指定在拍摄快照时应从每个集合中一次性读取的最大文档数。连接器将以这个大小的多个批次读取集合内容。
默认值为0,表示服务器选择适当的获取大小。

没有一个

指定应如何调整模式名称以与连接器使用的消息转换器兼容。可能的设置:

  • 没有一个不应用任何调整。

  • avro将不能在Avro类型名称中使用的字符替换为下划线。

以下先进的配置属性具有良好的默认值,在大多数情况下都可以工作,因此很少需要在连接器的配置中指定。

表11所示。开云体育官方注册网址Debezium MongoDB连接器高级配置属性
财产 默认的 描述

2048

正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为2048年。

8192

正整数值,指定阻塞队列可以容纳的最大记录数。当Debe开云体育官方注册网址zium从数据库读取事件流时,它会在将事件写入Kafka之前开云体育电动老虎机将其放在阻塞队列中。如果连接器接收消息的速度比写入Kafka的速度快,或者Kafka变得不可用,阻塞队列可以为从数据库读取更改事件提供反压力。开云体育电动老虎机当连接器定期记录偏移量时,队列中保存的事件将被忽略。总是设置的值max.queue.size大于…的值max.batch.size

0

一个长整数值,以字节为单位指定阻塞队列的最大容量。默认情况下,不为阻塞队列指定卷限制。若要指定队列可以使用的字节数,请将此属性设置为正的长值。
如果max.queue.size时,当队列大小达到任一属性指定的限制时,将阻塞对队列的写入。例如,如果你设置max.queue.size = 1000,max.queue.size.in.bytes = 5000,当队列中有1000条记录时,或者队列中记录的容量达到5000字节时,写入队列被阻塞。

1000

正整数值,指定连接器在每次迭代期间等待新更改事件出现的毫秒数。缺省值为1000毫秒,即1秒。

1000

正整数值,指定在第一次连接尝试失败或没有主节点可用时试图重新连接到主节点时的初始延迟。缺省值为1秒(1000毫秒)。

1000

正整数值,指定在多次连接尝试失败或没有主服务器可用时试图重新连接到主服务器时的最大延迟。默认值为120秒(120,000毫秒)。

16

正整数值,指定在发生异常和任务中止之前尝试连接主副本集失败的最大次数。默认值为16connect.backoff.initial.delay.ms而且connect.backoff.max.delay.ms结果在失败前的20多分钟的尝试。

真正的

布尔值,用于指定mongodb中的地址。hosts' are seeds that should be used to discover all members of the cluster or replica set (真正的),或地址是否在mongodb.hosts应按原样使用().默认为真正的应该在所有情况下使用,除了MongoDB前面有一个代理人

v2

的架构版本在CDC事件中阻塞。开云体育官方注册网址Debezium 0.10引入了一些突破
的结构的更改块,以便在所有连接器之间统一暴露的结构。
通过将此选项设置为v1可以生成早期版本中使用的结构。注意,不建议使用这种设置,并计划在未来的Debezium版本中删除。开云体育官方注册网址

0

控制心跳消息的发送频率。
此属性包含以毫秒为单位的间隔,该间隔定义连接器向心跳主题发送消息的频率。这可用于监视连接器是否仍在接收来自数据库的更改事件。开云体育电动老虎机在较长一段时间内只更改非捕获集合中的记录的情况下,还应该利用心跳消息。在这种情况下,连接器将继续从数据库中读取oplog/change流,但不会向Kafka发出任何更改消息,这反过来意味着没有偏移更新提交给Kafka。开云体育电动老虎机这将导致oplog文件被旋转出来,但连接器不会注意到它,因此在重新启动时,一些事件不再可用,这导致需要重新执行初始快照。

将此参数设置为0完全不发送心跳消息。
默认禁用。

真正的连接器配置显式指定key.convertervalue.converter参数使用Avro,否则默认为

字段名是否被净化以符合Avro命名要求。看到Avro命名欲知详情。

t

以逗号分隔的操作类型列表,将在流处理期间跳过。操作包括:c插入/创建、u的更新,d删除,t对于截断,和没有一个不跳过任何操作。默认情况下,截断操作将被跳过(此连接器不会触发)。

没有默认的

控制快照中包含哪些集合项。此属性仅影响快照。在表单中指定以逗号分隔的集合名称列表开云体育电动老虎机databaseName.collectionName

对于指定的每个集合,还要指定另一个配置属性:snapshot.collection.filter.overrides。开云体育电动老虎机数据库名collectionName.例如,另一个配置属性的名称可能是:snapshot.collection.filter.overrides.customers.orders.将此属性设置为仅检索快照中需要的项的有效筛选器表达式。当连接器执行快照时,它只检索与筛选器表达式匹配的项。

当设置为真正的开云体育官方注册网址Debezium生成具有事务边界的事件,并使用事务元数据丰富数据事件信封。

看到事务的元数据更多细节。

10000(10秒)

在发生可检索错误后重新启动连接器之前等待的毫秒数。

30000

连接器轮询新的、删除的或更改的副本集的时间间隔。

10000(10秒)

驱动程序在放弃新的连接尝试之前等待的毫秒数。

10000(10秒)

集群监视器试图到达每台服务器的频率。

0

套接字上的发送/接收在超时发生之前所需要的毫秒数。值为0禁用此行为。

30000(30秒)

驱动程序在超时并抛出错误之前等待选择服务器的毫秒数。

0

指定在导致执行超时异常之前,oplog/change流游标等待服务器生成结果的最大毫秒数。值为0表示使用服务器/驱动程序默认等待超时。

没有默认的

用于发送的数据集合的完全限定名称信号到连接器。使用以下格式指定集合名称:
<开云体育电动老虎机数据库名>< collectionName >

1024

在增量快照块期间连接器获取并读入内存的最大文档数量。增加块大小可以提供更高的效率,因为快照运行更少的大大小快照查询。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。

io.开云体育官方注册网址debezium.schema.DefaultTopicNamingStrategy

TopicNamingStrategy类的名称默认为,该类应用于确定数据更改、模式更改、事务、心跳事件等的主题名称DefaultTopicNamingStrategy

指定主题名称的分隔符,默认为

10000

用于在有界并发散列映射中保存主题名称的大小。此缓存将帮助确定与给定数据集合对应的主题名称。

__开云体育官方注册网址debezium-heartbeat

控制连接器向其发送心跳消息的主题的名称。主题名称有这样的模式:

topic.heartbeat.prefixtopic.prefix

例如,如果主题前缀为实现,默认主题名为__开云体育官方注册网址debezium-heartbeat.fulfillment

事务

控制连接器向其发送事务元数据消息的主题的名称。主题名称有这样的模式:

topic.prefixtopic.transaction

例如,如果主题前缀为实现,默认主题名为fulfillment.transaction

监控

Debe开云体育官方注册网址zium MongoDB连接器除了内置支持Zookeeper、Kafka和Kafka Connect的JMX指标外,还有两种指标类型。

  • 快照指标在执行快照时提供有关连接器操作的信息。

  • 流指标当连接器捕获变更和流化变更事件记录时,提供有关连接器操作的信息。

开云体育官方注册网址Debezium监控文档提供了关于如何使用JMX公开这些指标的详细信息。

快照指标

MBean开云体育官方注册网址debezium.mongodb: type = connector-metrics上下文=快照,server =< mongodb.server.name >、任务=< task.id >

快照度量不会公开,除非快照操作是活动的,或者自上次连接器启动以来发生了快照。

下表列出了可用的快照指标。

属性 类型 描述

字符串

连接器读取的最后一个快照事件。

自连接器读取并处理最近事件以来的毫秒数。

自上次启动或重置以来,此连接器已看到的事件总数。

已由连接器上配置的包含/排除列表筛选规则筛选的事件数。

string []

连接器捕获的表列表。

int

用于在快照和主Kafka Connect循环之间传递事件的队列长度。

int

用于在快照和主Kafka Connect循环之间传递事件的队列的空闲容量。

int

快照中包含的表的总数。

int

快照尚未复制的表数。

布尔

快照是否启动。

布尔

快照是否暂停。

布尔

快照是否中止。

布尔

快照是否完成。

快照到目前为止所花费的总秒数,即使没有完成。还包括快照暂停的时间。

快照暂停的总秒数。如果快照被暂停了多次,那么暂停的时间就会累积起来。

Map < String,长>

映射,其中包含为快照中的每个表扫描的行数。在处理期间将表增量地添加到Map中。每扫描10,000行并在完成一个表时更新一次。

队列的最大缓冲区,以字节为单位。该指标是可用的,如果max.queue.size.in.bytes设置为正的长值。

队列中记录的当前卷,以字节为单位。

Debe开云体育官方注册网址zium MongoDB连接器还提供了以下自定义快照指标:

属性 类型 描述

NumberOfDisconnects

数据库断开数。开云体育电动老虎机

流指标

MBean开云体育官方注册网址debezium.mongodb: type = connector-metrics、上下文=流媒体服务器=< mongodb.server.name >、任务=< task.id >

下表列出了可用的流媒体指标。

属性 类型 描述

字符串

连接器读取的最后一个流事件。

自连接器读取并处理最近事件以来的毫秒数。

自上次启动或度量重置以来,此连接器所看到的事件总数。

自上次启动或指标重置以来,此连接器所看到的创建事件的总数。

自上次启动或指标重置以来,此连接器所看到的更新事件总数。

自上次启动或指标重置以来,此连接器所看到的删除事件的总数。

已由连接器上配置的包含/排除列表筛选规则筛选的事件数。

string []

连接器捕获的表列表。

int

用于在streamer和主Kafka Connect循环之间传递事件的队列长度。

int

用于在streamer和主Kafka Connect循环之间传递事件的队列的空闲容量。

布尔

标志,该标志表示连接器当前是否连接到数据库服务器。开云体育电动老虎机

从最后一个更改事件的时间戳到连接器处理它之间的毫秒数。这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。开云体育电动老虎机

提交的已处理事务的数量。

Map < String, String >

上次接收事件的坐标。

字符串

最后处理的事务的事务标识符。

队列的最大缓冲区,以字节为单位。该指标是可用的,如果max.queue.size.in.bytes设置为正的长值。

队列中记录的当前卷,以字节为单位。

Debe开云体育官方注册网址zium MongoDB连接器还提供了以下自定义流指标:

属性 类型 描述

NumberOfDisconnects

数据库断开数。开云体育电动老虎机

NumberOfPrimaryElections

主节点选举的个数。

MongoDB连接器常见问题

开云体育官方注册网址Debezium是一个分布式系统,它捕获多个上游数据库中的所有更改,并且永远不会错过或丢失事件。开云体育电动老虎机当系统正常运行并被仔细管理时,Debezium提供开云体育官方注册网址只有一天交付每个变更事件。

发生故障时,系统不会丢失事件。然而,当它从错误中恢复时,它可能会重复一些更改事件。在这种情况下,Debezium就像Kaf开云体育官方注册网址ka一样提供了帮助至少一次变更事件的交付。

本节的其余部分将描述Debezium如何处理各种错误和问题。开云体育官方注册网址

配置和启动错误

在以下情况下,连接器在尝试启动时失败,在日志中报告错误或异常,并停止运行:

  • 连接器的配置无效。

  • 连接器无法通过指定的连接参数连接到MongoDB。

失败后,连接器尝试使用指数回退重新连接。您可以配置最大重连次数。

在这些情况下,错误将提供有关问题的更多详细信息,并可能提供建议的解决方法。当配置已经纠正或MongoDB问题已经解决时,可以重新启动连接器。

MongoDB不可用

一旦连接器开始运行,如果任何MongoDB副本集的主节点变得不可用或不可达,连接器将重复尝试重新连接到主节点,使用指数回退来防止网络或服务器饱和。如果在可配置的连接尝试次数之后,主服务器仍然不可用,则连接器将失败。

重新连接的尝试由三个属性控制:

  • connect.backoff.initial.delay.ms-第一次尝试重新连接之前的延迟,默认为1秒(1000毫秒)。

  • connect.backoff.max.delay.ms-尝试重新连接之前的最大延迟,默认为120秒(120,000毫秒)。

  • connect.max.attempts—产生错误前的最大尝试次数,默认为16次。

每次延迟都是之前延迟的两倍,直到最大延迟。如果使用默认值,下表显示了每次失败连接尝试的延迟和失败前的累计总时间。

重新连接次数 尝试前延迟,以秒为单位 尝试前的总延迟,以分钟和秒为单位

1

1

00:01

2

2

00:03

3.

4

00:07

4

8

00:15

5

16

00:31

6

32

01:03

7

64

02:07

8

120

04:07

9

120

06:07

10

120

08:07

11

120

10:07

12

120

12:07

13

120

14:07

14

120

16:07

15

120

18:07

16

120

20:07

Kafka Connect进程优雅地停止

如果Kafka Connect正在以分布式模式运行,并且一个Kafka Connect进程被优雅地停止,那么在关闭该进程之前,Kafka Connect将把该进程的所有连接器任务迁移到该组中的另一个Kafka Connect进程中,新的连接器任务将准确地拾取先前任务停止的位置。当连接器任务被优雅地停止并在新进程上重新启动时,处理过程中会有短暂的延迟。

如果组只包含一个进程,并且该进程被优雅地停止,那么Kafka Connect将停止连接器并记录每个副本集的最后偏移量。在重新启动时,副本集任务将继续在他们离开的地方。

Kafka连接进程崩溃

如果Kafka连接器进程意外停止,那么它正在运行的任何连接器任务都将终止,而不会记录它们最近处理的偏移量。当Kafka Connect以分布式模式运行时,它将重新启动其他进程上的连接器任务。但是,MongoDB连接器将从最后一个偏移量恢复记录通过较早的流程,这意味着新的替换任务可能会生成一些与崩溃之前处理的相同的更改事件。重复事件的数量取决于偏移刷新周期和崩溃前更改的数据量。

由于在故障恢复过程中可能会出现重复的事件,因此使用者应该始终预测到某些事件可能会重复。开云体育官方注册网址Debezium的变化是幂等的,所以一系列事件的结果总是相同的状态。

开云体育官方注册网址Debezium还在每个更改事件消息中包含关于事件起源的源特定信息,包括MongoDB事件的唯一事务标识符(h)和时间戳(证券交易委员会而且奥德).消费者可以跟踪这些值中的其他值,以了解它是否已经看到了特定的事件。

Kafka不可用

当连接器生成变更事件时,Kafka Connect框架使用Kafka生产者API将这些事件记录在Kafka中。Kafka Connect还会定期记录在这些更改事件中出现的最新偏移量,频率由你在Kafka Connect worker配置中指定。如果Kafka代理变得不可用,运行连接器的Kafka Connect工作进程将简单地重复尝试重新连接到Kafka代理。换句话说,连接器任务将简单地暂停,直到重新建立连接,此时连接器将完全从它们停止的地方恢复。

连接器在长时间停止后发生故障,如果snapshot.mode设置为最初的

如果连接器被优雅地停止,用户可能会继续对复制集成员执行操作。连接器脱机时发生的更改将继续记录在MongoDB的oplog中。在大多数情况下,连接器重新启动后,它会读取oplog中的offset值,以确定它为每个副本集流化的最后一个操作,然后从该点恢复流化更改。重启后,在连接器停止时发生的数据库操作开云体育电动老虎机会像往常一样被发送到Kafka,一段时间后,连接器会跟上数据库的步伐。连接器需要的时间取决于Kafka的能力和性能,以及数据库中发生的更改量。开云体育电动老虎机

但是,如果连接器停止的时间足够长,MongoDB可能会在连接器处于非活动状态期间清除oplog,导致丢失有关连接器最后位置的信息。连接器重新启动后,它不能恢复流,因为oplog不再包含标记连接器处理的最后一个操作的先前偏移值。连接器也不能执行快照,通常在snapshot.mode属性设置为最初的,并且不存在偏移值。在这种情况下,存在不匹配,因为oplog不包含先前偏移的值,但是偏移值出现在连接器的内部Kafka偏移主题中。导致错误,连接器失效。

要从失败中恢复,请删除失败的连接器,并使用相同的配置创建一个新的连接器,但使用不同的连接器名称。当您启动新的连接器时,它将执行一个快照以获取数据库状态,然后恢复流。开云体育电动老虎机

MongoDB丢失写操作

在某些失败的情况下,MongoDB可能会丢失提交,这将导致MongoDB连接器无法捕获丢失的更改。例如,如果主节点在应用更改并将更改记录到其oplog后突然崩溃,那么在辅助节点读取其内容之前,oplog可能就不可用了。因此,被选为新的主节点的辅助节点可能会错过其oplog中的最新更改。

目前,在MongoDB中还没有办法防止这种副作用。