您正在查看过时的Debezium版本的文档。开云体育官方注册网址
如果您想查看本页最新的稳定版本,请前往在这里

开云体育官方注册网址MongoDB的Debezium连接器

开云体育官方注册网址Debezium的MongoDB连接器跟踪MongoDB副本集或MongoDB分片集群,以记录数据库和集合中的文档更改,并将这些更改记录为Kafka主题中的事件。开云体育电动老虎机连接器自动处理分片集群中分片的添加或删除、每个副本集成员的更改、每个副本集中的选举以及等待通信问题的解决。

有关与此连接器兼容的MongoDB版本的信息,请参见开云体育官方注册网址Debezium发布概述

概述

MongoDB的复制机制提供了冗余和高可用性,是在生产环境中运行MongoDB的首选方式。MongoDB连接器捕获复制集或分片集群中的更改。

一个MongoDB副本集由一组服务器组成,这些服务器都有相同数据的副本,复制确保客户端对副本集中的文档所做的所有更改主要的正确地应用于其他复制集的服务器,称为次要的人.MongoDB复制的工作原理是让主服务器记录其数据库中的更改oplog(或操作日志),然后每个辅助服务器读取主服务器的oplog,并按顺序将所有操作应用于它们自己的文档。当一个新的服务器被添加到一个副本集时,该服务器首先执行一个快照然后读取主数据库的oplog开云体育电动老虎机,以应用自快照开始以来可能发生的所有更改。当这个新服务器赶上主服务器的oplog的尾部时,它将成为辅助服务器(并能够处理查询)。

MongoDB连接器使用相同的复制机制,尽管它实际上并没有成为复制集的成员。然而,就像MongoDB从服务器一样,连接器总是读取复制集主服务器的oplog。并且,当连接器第一次看到一个副本集时,它会查看oplog以获得最后记录的事务,然后执行主数据库和集合的快照。开云体育电动老虎机复制完所有数据后,连接器就开始从之前从oplog中读取的位置开始流化更改。MongoDB oplog中的操作为幂等,因此无论应用多少次操作,结果都是相同的结束状态。

当MongoDB连接器进程发生变化时,它会定期记录事件起源于oplog中的位置。当MongoDB连接器停止时,它会记录它处理过的最后一个oplog位置,因此在重新启动时,它只是从该位置开始流。换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,它将准确地从停止的地方恢复,而不会丢失任何事件。当然,MongoDB的oplog通常被限制在最大大小,这意味着连接器不应该停止太长时间,否则在连接器有机会读取它们之前,oplog中的一些操作可能会被清除。在这种情况下,在重新启动时,连接器将检测缺失的oplog操作,执行快照,然后继续对更改进行流式处理。

MongoDB连接器还可以容忍副本集的成员和领导、分片集群中分片的添加或删除以及可能导致通信故障的网络问题的更改。连接器总是使用副本集的主节点来流化更改,因此当副本集经历一次选举并且不同的节点成为主节点时,连接器将立即停止流化更改,连接到新的主节点,并使用新的主节点开始流化更改。同样地,如果连接器遇到任何与复制集主通信的问题,它将尝试重新连接(使用指数回退,以避免淹没网络或复制集),并继续从它上次离开的地方传输更改。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。

MongoDB连接器如何工作

对连接器支持的MongoDB拓扑的概述对于规划应用程序非常有用。

在配置和部署MongoDB连接器时,它首先连接到种子地址上的MongoDB服务器,并确定关于每个可用副本集的详细信息。由于每个复制集都有自己独立的oplog,连接器将尝试为每个复制集使用单独的任务。连接器可以限制它将使用的任务的最大数量,如果没有足够的任务可用,连接器将为每个任务分配多个副本集,尽管任务仍将为每个副本集使用单独的线程。

在对分片集群运行连接器时,使用值tasks.max这大于复制集的数量。这将允许连接器为每个副本集创建一个任务,并让Kafka Connect在所有可用的工作进程之间协调、分布和管理任务。

支持的MongoDB拓扑

MongoDB连接器支持以下MongoDB拓扑:

MongoDB副本集

Debe开云体育官方注册网址zium MongoDB连接器可以从单个连接器捕获更改MongoDB副本集.生产副本集需要最少的至少三名成员

要使用带有副本集的MongoDB连接器,请提供一个或多个副本集服务器的地址为种子地址通过连接器mongodb.hosts财产。连接器将使用这些种子连接到复制集,然后一旦连接,将从复制集获得完整的成员集以及哪个成员是主成员。连接器将启动连接到主服务器的任务,并从主服务器的oplog捕获更改。当副本集选择一个新的主副本时,任务将自动切换到新的主副本。

当MongoDB由代理连接时(例如在OS X或Windows上使用Docker),当客户端连接到副本集并发现成员时,MongoDB客户端将排除代理作为有效成员,并尝试直接连接到成员,而不是通过代理连接,但失败。

在这种情况下,将连接器设置为可选mongodb.members.auto.discover配置属性为若要指示连接器放弃成员关系发现,而只需使用第一个种子地址(通过mongodb.hosts属性)作为主节点。这可能会起作用,但当选举发生时,仍然会产生原因问题。

MongoDB分片集群

一个MongoDB分片集群包括:

  • 一个或多个碎片,每一个都作为复制集部署;

  • 一个单独的副本集,充当集群的副本集配置服务器

  • 一个或多个路由器(也称为蒙戈),客户端连接并将请求路由到适当的分片

    要使用带分片集群的MongoDB连接器,请使用配置服务器副本集。当连接器连接到这个复制集时,它会发现自己正在充当一个分片集群的配置服务器,发现关于集群中用作分片的每个复制集的信息,然后启动一个单独的任务来捕获每个复制集的更改。如果向集群中添加了新的分片或删除了现有的分片,连接器将自动相应地调整其任务。

MongoDB独立服务器

MongoDB连接器不能监视独立MongoDB服务器的更改,因为独立服务器没有oplog。如果将独立服务器转换为具有一个成员的副本集,则连接器将正常工作。

MongoDB不建议在生产环境中运行独立服务器。有关更多信息,请参见MongoDB文档

逻辑连接器名称

连接器配置属性mongodb.name作为逻辑名用于MongoDB副本集或分片集群。连接器以多种方式使用逻辑名:作为所有主题名的前缀,以及在记录每个副本集的oplog位置时作为唯一标识符。

您应该为每个MongoDB连接器提供一个唯一的逻辑名称,该名称可以有意义地描述源MongoDB系统。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。

执行快照

当一个任务使用复制集启动时,它使用连接器的逻辑名和复制集名来查找抵消它描述连接器先前停止读取更改的位置。如果可以找到一个偏移量,并且它仍然存在于oplog中,那么任务立即继续执行流的变化,从记录的偏移位置开始。

但是,如果没有发现偏移量,或者如果oplog不再包含该位置,任务必须首先通过执行命令获取复制集内容的当前状态快照.这个过程首先记录oplog的当前位置,并将其记录为偏移量(以及一个表示快照已经启动的标志)。然后,该任务将继续复制每个集合,生成尽可能多的线程(直到snapshot.max.threads配置属性)以并行地执行此工作。连接器将记录一个单独的读取事件对于它看到的每个文档,读取事件将包含对象的标识符、对象的完整状态和关于该对象所在的MongoDB副本集的信息。源信息还将包括一个标志,表示快照期间产生的事件。

此快照将继续,直到复制了与连接器筛选器匹配的所有集合为止。如果连接器在任务的快照完成之前停止,重新启动时连接器将再次开始快照。

在连接器执行任何副本集的快照时,尽量避免重新分配和重新配置任务。连接器使用快照的进度记录消息。为了最大限度地控制,为每个连接器运行一个单独的Kafka Connect集群。

流的变化

在复制集的连接器任务记录了偏移量之后,它使用偏移量来确定在oplog中的位置,它应该从哪里开始流化更改。然后,该任务连接到副本集的主节点,并从该位置开始流化更改。它处理所有的创建、插入和删除操作,并将它们转换为Debezium开云体育官方注册网址更改事件.每个更改事件都包括在oplog中发现操作的位置,连接器定期将其记录为最近的偏移量。记录偏移量的间隔由offset.flush.interval.ms,这是一个Kafka连接工人配置属性。

当连接器优雅地停止时,将记录处理的最后一个偏移量,以便重新启动时,连接器将准确地继续它停止的位置。然而,如果连接器的任务意外终止,那么任务可能在它最后一次记录偏移量之后,但在最后一次记录偏移量之前处理并生成了事件;重新启动时,连接器从最后一个开始记录偏移,可能会生成一些与之前在崩溃之前生成的事件相同的事件。

当一切正常运行时,Kafka消费者将真正看到每条消息只有一天.然而,当出现问题时,Kafka只能保证消费者能够看到每一条信息至少一次.因此,您的客户需要预期看到不止一次的消息。

如上所述,连接器任务总是使用复制集的主节点来传输来自oplog的更改,从而确保连接器尽可能看到最新的操作,并且能够以比使用辅助节点更低的延迟捕获更改。当副本集选择一个新的主节点时,连接器立即停止流化更改,连接到新的主节点,并从相同位置的新主节点开始流化更改。同样地,如果连接器遇到任何与复制集成员通信的问题,它会尝试重新连接,使用指数回退以避免淹没复制集,并且一旦连接,它将继续从上次离开的地方进行流更改。通过这种方式,连接器能够动态地调整副本集成员关系的变化,并自动处理通信故障。

总之,MongoDB连接器在大多数情况下继续运行。通信问题可能导致连接器等待问题解决。

主题名称

MongoDB连接器将对每个集合中的文档的所有插入、更新和删除操作的事件写入到单个Kafka主题。卡夫卡主题的名称总是采用这种形式logicalName开云体育电动老虎机数据库名collectionName,在那里logicalName逻辑名属性指定的连接器的mongodb.name配置属性,开云体育电动老虎机数据库名发生操作的数据库名称和开云体育电动老虎机collectionName是受影响文档所在的MongoDB集合的名称。

例如,考虑一个MongoDB副本集库存开云体育电动老虎机数据库包含四个集合:产品products_on_hand客户,订单.如果监视此数据库的连接器被赋予的逻辑名称为开云体育电动老虎机实现,那么连接器将在这四个Kafka主题上产生事件:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

注意,主题名称不包含复制集名称或碎片名称。因此,对切分集合(其中每个切分包含集合文档的子集)的所有更改都转到相同的Kafka主题。

你可以设置卡夫卡自动创建根据需要选择题目。如果不是,那么你必须在启动连接器之前使用Kafka管理工具来创建主题。

分区

MongoDB连接器没有明确决定如何为事件划分主题。相反,它允许Kafka根据事件键来决定如何划分主题。的名称可以改变Kafka的分区逻辑瓜分者在Kafka Connect worker配置中实现。

Kafka只维护写入单个主题分区的事件的总顺序。按键划分事件意味着具有相同键的所有事件总是进入相同的分区。这确保了特定文档的所有事件总是完全有序的。

事务的元数据

开云体育官方注册网址Debezium可以生成表示事务元数据边界的事件,并丰富更改数据事件消息。

Debezium何时接收事务元开云体育官方注册网址数据的限制

开云体育官方注册网址Debezium仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

每笔交易开始而且结束, 开云体育官方注册网址Debezium生成一个包含以下字段的事件:

状态

开始结束

id

唯一事务标识符的字符串表示形式。

event_count(结束事件)

事务发出的事件总数。

data_collections(结束事件)

对的数组data_collection而且event_count它提供了由来自给定数据收集的更改所发出的事件数。

下面的例子显示了一个典型的消息:

{"status": "BEGIN", "id": "1462833718356672513", "event_count": null, "data_collections": null} {"status": "END", "id": "1462833718356672513", "event_count": 2, "data_collections": [{"data_collection": "rs0.testDB. "collectiona", "event_count": 1}, {"data_collection": "rs0.testDB. collectiona", "event_count": 1}, {"data_collection": "rs0.testDB. "collection ", "event_count": 1}]}

方法覆盖transaction.topic选项时,事务事件被写入指定的主题开云体育电动老虎机database.server.name.transaction

更改数据事件丰富

启用事务元数据时,数据消息信封是充实了新的事务字段。这个字段以复合字段的形式提供关于每个事件的信息:

id

唯一事务标识符的字符串表示形式。

total_order

事件在事务生成的所有事件中的绝对位置。

data_collection_order

事件在事务发出的所有事件中的每数据收集位置。

下面是一条消息的示例:

{“补丁”:零,“后”:“{\“_id \”:{\“numberLong美元\”:\}“1004 \”,\“first_name \”,\“安妮\”,\“last_name \”,\“Kretchmar \”,\“邮件\”,\“annek@noanswer.org \“}”,“源”:{…}, "op": "c", "ts_ms": "1580390884335", "transaction": {"id": "1462833718356672513", "total_order": "1", "data_collection_order": "1"}}

数据变更事件

Debe开云体育官方注册网址zium MongoDB连接器为插入、更新或删除数据的每个文档级操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于所更改的集合。

开云体育官方注册网址Debezium和Kafka Connect就是围绕这个设计的连续的事件消息流.但是,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果使用模式注册中心,则包含消费者可以用来从注册中心获取模式的模式ID。这使得每个事件都是自包含的。

下面的JSON骨架显示了变更事件的四个基本部分。然而,你如何配置你选择在你的应用程序中使用的Kafka Connect转换器,决定了这四个部分在变更事件中的表示。一个模式字段仅在配置转换器以产生该字段时才处于更改事件中。同样,事件键和事件有效负载只有在配置转换器以产生它时才位于更改事件中。如果你使用JSON转换器并配置它来生成所有四个基本的更改事件部分,则更改事件的结构如下:

{"schema": {(1)...}, "有效载荷":{(2)...}, "schema": {(3)...}, "有效载荷":{(4)...}},
表1。变更事件基本内容概述
字段名 描述

1

模式

第一个模式字段是事件键的一部分。它指定了一个Kafka Connect模式,用来描述事件键的内容有效载荷部分。换句话说,是第一个模式字段描述已更改文档的键的结构。

2

有效载荷

第一个有效载荷字段是事件键的一部分。它具有前面所描述的结构模式字段,它包含已更改文档的键。

3.

模式

第二个模式字段是事件值的一部分。它指定Kafka Connect模式,描述事件值中的内容有效载荷部分。换句话说,是第二种模式描述已更改文档的结构。通常,这个模式包含嵌套的模式。

4

有效载荷

第二个有效载荷字段是事件值的一部分。它具有前面所描述的结构模式字段,它包含已更改文档的实际数据。

默认情况下,连接器流将事件记录更改为主题,其名称与事件的原始集合相同。看到主题名称

MongoDB连接器确保所有Kafka Connect模式名称都遵循Avro模式名称格式.这意味着逻辑服务器名必须以拉丁字母或下划线开头,即a-z、a-z或_。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符必须是拉丁字母、数字或下划线,即a-z、a-z、0-9或\_。开云体育电动老虎机如果存在无效字符,则将其替换为下划线字符。

如果逻辑服务器名称、数据库名称或集合名称包含无效字符,并且区分名称之间的唯一字符无效,因此用下划线替换,则可能导致意外冲突。开云体育电动老虎机

更改事件键

更改事件的键包含已更改文档的键的模式和已更改文档的实际键。对于给定的集合,模式及其相应的有效负载都包含一个单一的id字段。此字段的值是文档的标识符,表示为派生的字符串MongoDB扩展JSON序列化的严格模式

考虑逻辑名为的连接器实现对象的复制集库存开云体育电动老虎机数据库,以及客户集合,其中包含以下文档。

示例文档
{"_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org"}
更改事件键的示例

对象的更改的每个更改事件客户集合具有相同的事件键模式。只要客户集合具有前面的定义,则捕获对的更改的每个更改事件客户集合具有以下关键结构。在JSON中,它看起来是这样的:

{"schema": {(1)"type": "struct", "name": "fulfillment.inventory.customers.Key",(2)“可选”:假的,(3)“字段”:[(4){“字段”:“id”,“类型”:“弦”、“可选”:假}},“有效载荷”:{(5)"id": "1004"}}
表2。更改事件键的描述
字段名 描述

1

模式

密钥的模式部分指定了一个Kafka Connect模式,该模式描述了密钥中的内容有效载荷部分。

2

fulfillment.inventory.customers.Key

定义键的有效负载结构的模式的名称。此模式描述已更改文档的键的结构。关键模式名具有该格式connector-name开云体育电动老虎机数据库名称集合名称关键.在这个例子中:

  • 实现生成此事件的连接器的名称。

  • 库存包含已更改的集开云体育电动老虎机合的数据库。

  • 客户包含已更新文档的集合。

3.

可选

指示事件键是否必须在其中包含值有效载荷字段。在本例中,需要键的有效负载中的值。当文档没有键时,键的有效载荷字段中的值是可选的。

4

字段

属性中期望的每个字段有效载荷,包括每个字段的名称、类型以及是否需要。

5

有效载荷

包含为其生成此更改事件的文档的键。在本例中,键包含一个单键id类型字段字符串的值为1004

本例使用一个具有整数标识符的文档,但任何有效的MongoDB文档标识符都以相同的方式工作,包括文档标识符。对于文档标识符,事件键的值payload.idValue是一个字符串,表示更新后的文档的原始内容_id字段作为使用严格模式的MongoDB扩展JSON序列化。下表提供了不同类型的_id字段被表示出来。

表3。表示文档的示例_id事件关键有效载荷中的字段
类型 MongoDB_id价值 关键的负载

整数

1234

{"id": "1234"}

浮动

12.34

{"id": "12.34"}

字符串

“1234”

{"id": "\"1234\""}

文档

{“嗨”:“卡夫卡”、“num”:(10.0,100.0,1000.0)}

{" id ": "{\“嗨\”,\“卡夫卡\”,\“num \”:[10.0,100.0,1000.0]}"}

ObjectId

ObjectId(“596 e275826f08b2730779e1f”)

{" id ": "{\“美元oid \”:\ " 596 e275826f08b2730779e1f \ "} "}

二进制

BinData(“a2Fma2E = ", 0)

{" id ": "{\“二进制美元\”:\“a2Fma2E = \”,\“美元类型\”:\ " 00 \ "}"}

更改事件值

change事件中的值比键稍微复杂一些。和键一样,值也有模式Section和a有效载荷部分。的模式类的模式信封的结构有效载荷节,包括其嵌套字段。用于创建、更新或删除数据的操作的更改事件都具有具有信封结构的值有效负载。

考虑用于显示更改事件键示例的相同示例文档:

示例文档
{"_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org"}

本文档更改的更改事件的值部分针对每种事件类型进行了描述:

创建事件

对象中创建数据的操作所生成的更改事件的值部分,示例如下客户集合:

{"schema": {(1)“类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”,开云体育官方注册网址(2)“版本”:1、“字段”:“后”},{“类型”:“弦”、“可选”:真的,“名字”:“io.debezium.data.Json”、“版本”:1、“字段”:“补丁”},{“开云体育官方注册网址类型”:“结构”、“字段”:[{“类型”:“弦”、“可选”:假的,“场”:“版本”},{“类型”:“弦”、“可选”:假的,“场”:“连接器”},{“类型”:“弦”、“可选”:假的,“场”:“name”},{“类型”:“int64”、“可选”:假的,“场”:“ts_ms”},{“类型”:“布尔”、“可选”:真的,“默认”:假的,“场”:"snapshot"}, {"type": "string", "optional": false, "field": "db"}, {"type": "string", "optional": false, "field": "rs"}, {"type": "int32", "optional": false, "field": "ord"}, {"type": "int64", "optional": true, "field": "h"}], "optional": false, "name": "io. debezum .connector.mon . source ",开云体育官方注册网址(3)"field": "source"}, {"type": "string", "optional": true, "field": "op"}, {"type": "int64", "optional": true, "field": "ts_ms"}], "optional": false, "name": "dbserver1.inventory.customers.Envelope"(4)}, "有效载荷":{(5)“后”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”},\“first_name \”,\“安妮\”,\“last_name \”,\“Kretchmar \”,\“邮件\”:\“annek@noanswer.org \“}”,(6)"patch": null, "source": {(7)“版本”:“是1.7.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999}, "op": "c",(8)“ts_ms”:1558965515240(9)}}
表4。的描述创建事件值字段
字段名 描述

1

模式

值的模式,它描述值的有效负载的结构。连接器为特定集合生成的每个更改事件中,更改事件的值模式都是相同的。

2

的名字

模式节中,每的名字Field指定值的有效负载中字段的模式。

io.开云体育官方注册网址debezium.data.Json有效负载的模式是补丁,过滤器字段。此模式特定于客户收集。一个创建事件是唯一一种包含字段。一个更新事件包含过滤器场和一个补丁字段。一个删除事件包含过滤器场,而不是一个场或补丁字段。

3.

的名字

io.开云体育官方注册网址debezium.connector.mongo.Source有效负载的模式是字段。这个模式是特定于MongoDB连接器的。连接器将其用于生成的所有事件。

4

的名字

dbserver1.inventory.customers.Envelope有效负载的总体结构的模式在哪里dbserver1是连接器名称,库存是数据库,和开云体育电动老虎机客户是集合。此模式特定于集合。

5

有效载荷

该值为实际数据。这是变更事件提供的信息。

事件的JSON表示形式似乎比它们所描述的文档要大得多。这是因为JSON表示必须包括消息的模式和有效负载部分。然而,通过使用Avro转换器,你可以显著减少连接器流到Kafka主题的消息的大小。

6

可选字段,指定事件发生后文档的状态。在本例中,字段包含新文档的值_idfirst_namelast_name,电子邮件字段。的Value总是一个字符串。按照惯例,它包含文档的JSON表示形式。MongoDB的oplog条目只包含_create_事件的文档的完整状态;换句话说,a创建事件是唯一一种包含字段。

7

描述事件的源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的起源、事件发生的顺序以及事件是否是同一事务的一部分。源元数据包括:

  • 开云体育官方注册网址Debezium版本。

  • 生成事件的连接器的名称。

  • MongoDB副本集的逻辑名称,它为生成的事件形成一个名称空间,并用于连接器写入的Kafka主题名称中。

  • 包含新文档的集合和数据库的名称。开云体育电动老虎机

  • 如果事件是快照的一部分。

  • 在数据库中进行更改的时间戳,以及时间戳中事件的序号。开云体育电动老虎机

  • MongoDB操作的唯一标识符,依赖于MongoDB的版本。要么是h字段,或者一个名为stxnid,表示lsid而且txnNumberoplog事件中的字段。

8

人事处

返回string,描述导致连接器产生事件的操作类型。在这个例子中,c指示创建文档的操作。有效值为:

  • c=创建

  • u=更新

  • d=删除

  • r= read(仅适用于快照)

9

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

更新事件

样例中更新的更改事件的值客户集合的模式与创建事件。同样,事件值的有效负载具有相同的结构。事件值有效负载中包含不同的值更新事件。一个更新事件没有价值。相反,它有以下两个字段:

  • 补丁是一个包含幂等更新操作的JSON表示的字符串字段

  • 过滤器是一个字符串字段,其中包含更新选择标准的JSON表示形式。的过滤器字符串可以包含多个分片集合的分片键字段。

中的更新中连接器生成的事件中的更改事件值的示例客户集合:

{"schema":{…}, "payload": {"op": "u",(1)“ts_ms”:1465491461815,(2)“补丁”:“{\“\”美元:{\“first_name \”,\“安妮玛丽\}}”,(3)“过滤器”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”}}”,(4)“源”:{(5)“版本”:“是1.7.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
表5所示。的描述更新事件值字段
字段名 描述

1

人事处

返回string,描述导致连接器产生事件的操作类型。在这个例子中,u指示更新文档的操作。

2

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

3.

补丁

包含对文档的实际MongoDB幂等变化的JSON字符串表示。在本例中,更新更改了first_name字段转换为新值。

一个更新事件值中不包含字段。

4

过滤器

包含MongoDB选择标准的JSON字符串表示形式,用于标识要更新的文档。

5

描述事件的源元数据的必填字段。该字段包含与a相同的信息创建事件,但是值不同,因为该事件来自oplog中的不同位置。源元数据包括:

  • 开云体育官方注册网址Debezium版本。

  • 生成事件的连接器的名称。

  • MongoDB副本集的逻辑名称,它为生成的事件形成一个名称空间,并用于连接器写入的Kafka主题名称中。

  • 包含已更新文档的集合和数据库的名称。开云体育电动老虎机

  • 如果事件是快照的一部分。

  • 在数据库中进行更改的时间戳,以及时间戳中事件的序号。开云体育电动老虎机

  • MongoDB操作的唯一标识符,依赖于MongoDB的版本。要么是h字段,或者一个名为stxnid,表示lsid而且txnNumberoplog事件中的字段。

在Debe开云体育官方注册网址zium更改事件中,MongoDB提供补丁字段。该字段的格式与MongoDB数据库版本有关。开云体育电动老虎机因此,在升级到新的MongoDB数据库版本时,要为格式的潜在变化做好准备。开云体育电动老虎机本文档中的示例来自MongoDB 3.4,在您的应用程序中,事件格式可能不同。

在MongoDB的博客中,更新事件不包含之前已更改文档的状态。因此,Debezium连接器不可能提供此信息。开云体育官方注册网址但是,Debezium开云体育官方注册网址连接器提供文档的开始状态创建而且事件。流的下游消费者可以通过保留每个文档的最新状态并将新事件中的状态与保存的状态进行比较来重建文档状态。开云体育官方注册网址Debezium连接器不能保持这种状态。

删除事件

的值删除改变事件有相同之处模式部分为创建而且更新事件。的有效载荷的一部分删除事件包含的值与创建而且更新事件。特别地,删除事件既不包含价值也不是补丁价值。这里有一个例子删除属性中的文档的客户集合:

{"schema":{…}, "payload": {"op": "d",(1)“ts_ms”:1465495462115,(2)“过滤器”:“{\“_id \”:{\“numberLong美元\”:\“1004 \”}}”,(3)“源”:{(4)“版本”:“是1.7.2。Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999}}}
表6所示。的描述删除事件值字段
字段名 描述

1

人事处

返回string,描述操作类型。的人事处字段值为d,表示该文档已被删除。

2

ts_ms

可选字段,显示连接器处理事件的时间。该时间基于运行Kafka Connect任务的JVM中的系统时钟。

对象,ts_ms指示在数据库中进行更改的时间。开云体育电动老虎机通过比较的值payload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和Debezium之间的延迟。开云体育官方注册网址开云体育电动老虎机

3.

过滤器

包含用于识别要删除的文档的MongoDB选择标准的JSON字符串表示形式。

4

描述事件的源元数据的必填字段。该字段包含与a相同的信息创建更新事件,但是值不同,因为该事件来自oplog中的不同位置。源元数据包括:

  • 开云体育官方注册网址Debezium版本。

  • 生成事件的连接器的名称。

  • MongoDB副本集的逻辑名称,它为生成的事件形成一个名称空间,并用于连接器写入的Kafka主题名称中。

  • 包含已删除文档的集合和数据库的名称。开云体育电动老虎机

  • 如果事件是快照的一部分。

  • 在数据库中进行更改的时间戳,以及时间戳中事件的序号。开云体育电动老虎机

  • MongoDB操作的唯一标识符,依赖于MongoDB的版本。要么是hoplog事件,或命名为stxnid,表示lsid而且txnNumber字段从oplog事件。

MongoDB连接器事件设计用于工作Kafka对数压缩.日志压缩允许删除一些较旧的消息,只要每个键至少保留最近的消息。这让Kafka回收存储空间,同时确保主题包含一个完整的数据集,并可用于重新加载基于键的状态。

墓碑上的事件

唯一标识文档的所有MongoDB连接器事件都具有完全相同的键。删除文档时,删除event值仍然适用于日志压缩,因为Kafka可以删除所有具有相同键的早期消息。然而,Kafka要删除所有具有该键的消息,消息值必须为.为了实现这一点,在Debezium的MongoDB连接器发开云体育官方注册网址出一个删除事件时,连接器发出一个特殊的墓碑事件,该事件具有相同的键,但具有价值。墓碑事件通知Kafka所有具有相同键的消息都可以被删除。

安装MongoDB

MongoDB连接器使用MongoDB的oplog来捕获更改,因此连接器只适用于MongoDB副本集或分片集群,其中每个分片都是一个单独的副本集。请参阅MongoDB文档来设置副本集分片集群.另外,一定要了解如何启用访问控制和认证使用复制集。

还必须有一个具有适当角色的MongoDB用户来读取管理开云体育电动老虎机可以读取oplog的数据库。此外,用户还必须能够读取配置开云体育电动老虎机数据库在配置服务器的一个分片集群中必须有list开云体育电动老虎机Databases特权操作。

部署

要部署Debezium 开云体育官方注册网址MongoDB连接器,您需要安装Debezium MongoDB连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。

过程
  1. 下载连接器的插件存档

  2. 将JAR文件解压缩到Kafka Connect环境中。

  3. 将包含JAR文件的目录添加到卡夫卡连接的plugin.path

  4. 重新启动Kafka Connect进程以获取新的JAR文件。

如果使用不可变容器,请参见开云体育官方注册网址Debezium的容器图像对于Apache Zookeeper, Apache Kafka和Kafka Connect, MongoDB连接器已经安装并准备运行。

的Deb开云体育官方注册网址ezium教程引导您使用这些图像,这是了解Debezium的好方法。开云体育官方注册网址