Debezium connector for MongoDB

Debezium’s MongoDB connector tracks a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Kafka topics. The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and awaiting the resolution of communications problems.

For information about the MongoDB versions that are compatible with this connector, see theDebezium release overview

Overview

MongoDB’s replication mechanism provides redundancy and high availability, and is the preferred way to run MongoDB in production. MongoDB connector captures the changes in a replica set or sharded cluster.

A MongoDBreplica setconsists of a set of servers that all have copies of the same data, and replication ensures that all changes made by clients to documents on the replica set’sprimaryare correctly applied to the other replica set’s servers, calledsecondaries.MongoDB复制通过的主要工作record the changes in itsoplog(or operation log), and then each of the secondaries reads the primary’s oplog and applies in order all of the operations to their own documents. When a new server is added to a replica set, that server first performs ansnapshotof all of the databases and collections on the primary, and then reads the primary’s oplog to apply all changes that might have been made since it began the snapshot. This new server becomes a secondary (and able to handle queries) when it catches up to the tail of the primary’s oplog.

Change Stream

The Debezium MongoDB connector uses a similar replication mechanism to the one described above, though it does not actually become a member of the replica set. The main difference is that the connector does not read the oplog directly, but delegates capturing and decoding the oplog to MongoDB’sChange Streamsfeature. With change streams, the MongoDB server exposes changes to collections as an event stream. The Debezium connector watches the stream and delivers the changes downstream. And, when the connector sees a replica set for the first time, it looks at the oplog to get the last recorded transaction and then performs a snapshot of the primary’s databases and collections. When all the data is copied, the connector then creates a change stream from the position it read earlier from the oplog.

As the MongoDB connector processes changes, it periodically records the position in the oplog/stream where the event originated. When the connector stops, it records the last oplog/stream position that it processed, so that upon restart it simply begins streaming from that position. In other words, the connector can be stopped, upgraded or maintained, and restarted some time later, and it will pick up exactly where it left off without losing a single event. Of course, MongoDB’s oplogs are usually capped at a maximum size, which means that the connector should not be stopped for too long, or else some of the operations in the oplog might be purged before the connector has a chance to read them. In this case, upon restart the connector will detect the missing oplog operations, perform a snapshot, and then proceed with streaming the changes.

ch的MongoDB连接器也很宽容anges in membership and leadership of the replica sets, of additions or removals of shards within a sharded cluster, and network problems that might cause communication failures. The connector always uses the replica set’s primary node to stream changes, so when the replica set undergoes an election and a different node becomes primary, the connector will immediately stop streaming changes, connect to the new primary, and start streaming changes using the new primary node. Likewise, if connector experiences any problems communicating with the replica set primary, it will try to reconnect (using exponential backoff so as to not overwhelm the network or replica set) and continue streaming changes from where it last left off. In this way the connector is able to dynamically adjust to changes in replica set membership and to automatically handle communication failures.

How the MongoDB connector works

An overview of the MongoDB topologies that the connector supports is useful for planning your application.

When a MongoDB connector is configured and deployed, it starts by connecting to the MongoDB servers at the seed addresses, and determines the details about each of the available replica sets. Since each replica set has its own independent oplog, the connector will try to use a separate task for each replica set. The connector can limit the maximum number of tasks it will use, and if not enough tasks are available the connector will assign multiple replica sets to each task, although the task will still use a separate thread for each replica set.

When running the connector against a sharded cluster, use a value oftasks.maxthat is greater than the number of replica sets. This will allow the connector to create one task for each replica set, and will let Kafka Connect coordinate, distribute, and manage the tasks across all of the available worker processes.

Supported MongoDB topologies

The MongoDB connector supports the following MongoDB topologies:

MongoDB replica set

The Debezium MongoDB connector can capture changes from a singleMongoDB replica set.Production replica sets require a minimum ofat least three members

To use the MongoDB connector with a replica set, provide the addresses of one or more replica set servers asseed addressesthrough the connector’smongodb.hostsproperty. The connector will use these seeds to connect to the replica set, and then once connected will get from the replica set the complete set of members and which member is primary. The connector will start a task to connect to the primary and capture the changes from the primary’s oplog. When the replica set elects a new primary, the task will automatically switch over to the new primary.

当MongoDB的通过一个代理(如Docker on OS X or Windows), then when a client connects to the replica set and discovers the members, the MongoDB client will exclude the proxy as a valid member and will attempt and fail to connect directly to the members rather than go through the proxy.

In such a case, set the connector’s optionalmongodb.members.auto.discoverconfiguration property tofalseto instruct the connector to forgo membership discovery and instead simply use the first seed address (specified via themongodb.hostsproperty) as the primary node. This may work, but still make cause issues when election occurs.

MongoDB sharded cluster

AMongoDB sharded clusterconsists of:

  • One or moreshards, each deployed as a replica set;

  • A separate replica set that acts as the cluster’sconfiguration server

  • One or morerouters(also calledmongos) to which clients connect and that routes requests to the appropriate shards

    To use the MongoDB connector with a sharded cluster, configure the connector with the host addresses of theconfiguration serverreplica set. When the connector connects to this replica set, it discovers that it is acting as the configuration server for a sharded cluster, discovers the information about each replica set used as a shard in the cluster, and will then start up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly.

MongoDB standalone server

The MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog. The connector will work if the standalone server is converted to a replica set with one member.

MongoDB does not recommend running a standalone server in production. For more information, see theMongoDB documentation

Logical connector name

The connector configuration propertytopic.prefixserves as alogical namefor the MongoDB replica set or sharded cluster. The connector uses the logical name in a number of ways: as the prefix for all topic names, and as a unique identifier when recording the change stream position of each replica set.

You should give each MongoDB connector a unique logical name that meaningfully describes the source MongoDB system. We recommend logical names begin with an alphabetic or underscore character, and remaining characters that are alphanumeric or underscore.

Performing a snapshot

When a task starts up using a replica set, it uses the connector’s logical name and the replica set name to find anoffsetthat describes the position where the connector previously stopped reading changes. If an offset can be found and it still exists in the oplog, then the task immediately proceeds withstreaming changes, starting at the recorded offset position.

However, if no offset is found or if the oplog no longer contains that position, the task must first obtain the current state of the replica set contents by performing asnapshot.This process starts by recording the current position of the oplog and recording that as the offset (along with a flag that denotes a snapshot has been started). The task will then proceed to copy each collection, spawning as many threads as possible (up to the value of thesnapshot.max.threadsconfiguration property) to perform this work in parallel. The connector will record a separateread eventfor each document it sees, and that read event will contain the object’s identifier, the complete state of the object, andsourceinformation about the MongoDB replica set where the object was found. The source information will also include a flag that denotes the event was produced during a snapshot.

This snapshot will continue until it has copied all collections that match the connector’s filters. If the connector is stopped before the tasks' snapshots are completed, upon restart the connector begins the snapshot again.

Try to avoid task reassignment and reconfiguration while the connector performs snapshots of any replica sets. The connector generates log messages to report on the progress of the snapshot. To provide for the greatest control, run a separate Kafka Connect cluster for each connector.

Ad hoc snapshots

By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only.

However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing collection data, Debezium includes an option to perform ad hoc snapshots. The following changes in a database might be cause for performing an ad hoc snapshot:

  • The connector configuration is modified to capture a different set of collections.

  • Kafka topics are deleted and must be rebuilt.

  • Data corruption occurs due to a configuration error or some other problem.

You can re-run a snapshot for a collection for which you previously captured a snapshot by initiating a so-calledad-hoc snapshot.Ad hoc snapshots require the use ofsignaling collections.You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling collection.

When you initiate an ad hoc snapshot of an existing collection, the connector appends content to the topic that already exists for the collection. If a previously existing topic was removed, Debezium can create a topic automatically ifautomatic topic creationis enabled.

Ad hoc snapshot signals specify the collections to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the collections in the database.

You specify the collections to capture by sending anexecute-snapshotmessage to the signaling collection. Set the type of theexecute-snapshotsignal toincremental, and provide the names of the collections to include in the snapshot, as described in the following table:

Table 1. Example of an ad hocexecute-snapshotsignal record
Field Default Value

type

incremental

Specifies the type of snapshot that you want to run.
Setting the type is optional. Currently, you can request onlyincrementalsnapshots.

data-collections

N/A

An array that contains regular expressions matching the fully-qualified names of the collection to be snapshotted.
The format of the names is the same as for thesignal.data.collectionconfiguration option.

Triggering an ad hoc snapshot

You initiate an ad hoc snapshot by adding an entry with theexecute-snapshotsignal type to the signaling collection. After the connector processes the message, it begins the snapshot operation. The snapshot process reads the first and last primary key values and uses those values as the start and end point for each collection. Based on the number of entries in the collection, and the configured chunk size, Debezium divides the collection into chunks, and proceeds to snapshot each chunk, in succession, one at a time.

Currently, theexecute-snapshotaction type triggersincremental snapshotsonly. For more information, seeIncremental snapshots

Incremental snapshots

To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known asincremental snapshotting.Incremental snapshots rely on the Debezium mechanism forsending signals to a Debezium connector.Incremental snapshots are based on theDDD-3design document.

In an incremental snapshot, instead of capturing the full state of a database all at once, as in an initial snapshot, Debezium captures each collection in phases, in a series of configurable chunks. You can specify the collections that you want the snapshot to capture and thesize of each chunk.The chunk size determines the number of rows that the snapshot collects during each fetch operation on the database. The default chunk size for incremental snapshots is 1 KB.

As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each collection row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process:

  • You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other.

  • If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the collection from the beginning.

  • You can run an incremental snapshot on demand at any time, and repeat the process as needed to adapt to database updates. For example, you might re-run a snapshot after you modify the connector configuration to add a collection to itscollection.include.listproperty.

Incremental snapshot process

When you run an incremental snapshot, Debezium sorts each collection by primary key and then splits the collection into chunks based on theconfigured chunk size.Working chunk by chunk, it then captures each collection row in a chunk. For each row that it captures, the snapshot emits aREADevent. That event represents the value of the row when the snapshot for the chunk began.

As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying collection records. To reflect such changes,INSERT,UPDATE, orDELETEoperations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka.

How Debezium resolves collisions among records with the same primary key

In some cases, theUPDATEorDELETEevents that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a collection row before the snapshot captures the chunk that contains theREADevent for that row. When the snapshot eventually emits the correspondingREADevent for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka.

Snapshot window

To assist in resolving collisions between late-arrivingREADevents and streamed events that modify the same collection row, Debezium employs a so-calledsnapshot window.The snapshot windows demarcates the interval during which an incremental snapshot captures data for a specified collection chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key..

For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted asREADoperations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emitsUPDATEorDELETEoperations for each change.

As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of theREADevents in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the bufferedREADevent, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains onlyREADevents for which no related transaction log events exist. Debezium emits these remainingREADevents to the collection’s Kafka topic.

The connector repeats the process for each snapshot chunk.

Incremental snapshots requires the primary key to be stably ordered. However,Stringmay not guarantees stable ordering as encodings and special characters can lead to unexpected behaviour (Mongo sortString).Please consider using other types for the primary key when performing incremental snapshots.

Incremental snapshots are currently supported for single replica set deployments only. This limitation will be removed in the next version.

Triggering an incremental snapshot

Currently, the only way to initiate an incremental snapshot is to send anad hoc snapshot signalto the signaling collection on the source database. You submit a signal to the signaling collection by using the MongoDBinsert()method.

After Debezium detects the change in the signaling collection, it reads the signal, and runs the requested snapshot operation.

The query that you submit specifies the collections to include in the snapshot, and, optionally, specifies the kind of snapshot operation. Currently, the only valid option for snapshots operations is the default value,incremental

To specify the collections to include in the snapshot, provide adata-collectionsarray that lists the collections or an array of regular expressions used to match collections, for example,
{"data-collections": ["public.Collection1", "public.Collection2"]}

Thedata-collectionsarray for an incremental snapshot signal has no default value. If thedata-collectionsarray is empty, Debezium detects that no action is required and does not perform a snapshot.

If the name of a collection that you want to include in a snapshot contains a dot () in the name of the database, schema, or table, to add the collection to thedata-collections数组,你必须摆脱在做的每个部分的名字uble quotes.

For example, to include a data collection that exists in thepublicdatabase, and that has the nameMyCollection, use the following format:"public"."MyCollection"

Prerequisites
Procedure
  1. Insert a snapshot signal document into the signaling collection:

    .insert({"id" : _,"type" :, "data" : {"data-collections" ["", ""],"type":}});

    For example,

    db.debeziumSignal.insert({(1)"type" : "execute-snapshot",(2)(3)"data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""],(4)"type": "incremental"}(5)});

    The values of theid,type, anddataparameters in the command correspond to thefields of the signaling collection

    The following table describes the parameters in the example:

    Table 2. Descriptions of fields in a MongoDB insert() command for sending an incremental snapshot signal to the signaling collection
    Item Value Description

    1

    db.debeziumSignal

    Specifies the fully-qualified name of the signaling collection on the source database.

    2

    null

    The_idparameter specifies an arbitrary string that is assigned as theididentifier for the signal request.
    The insert method in the preceding example omits use of the optional_idparameter. Because the document does not explicitly assign a value for the parameter, the arbitrary id that MongoDB automatically assigns to the document becomes theididentifier for the signal request.
    Use this string to identify logging messages to entries in the signaling collection. Debezium does not use this identifier string. Rather, during the snapshot, Debezium generates its ownidstring as a watermarking signal.

    3

    execute-snapshot

    Specifiestypeparameter specifies the operation that the signal is intended to trigger.

    4

    data-collections

    A required component of thedatafield of a signal that specifies an array of collection names or regular expressions to match collection names to include in the snapshot.
    The array lists regular expressions which match collections by their fully-qualified names, using the same format as you use to specify the name of the connector’s signaling collection in thesignal.data.collectionconfiguration property.

The following example, shows the JSON for an incremental snapshot event that is captured by a connector.

Example: Incremental snapshot event message
{ "before":null, "after": { "pk":"1", "value":"New data" }, "source": { ... "snapshot":"incremental"(1)}, "op":"r",(2)"ts_ms":"1620393591654", "transaction":null }
Item Field name Description

1

snapshot

Specifies the type of snapshot operation to run.
Currently, the only valid option is the default value,incremental
Specifying atypevalue in the SQL query that you submit to the signaling collection is optional.
If you do not specify a value, the connector runs an incremental snapshot.

2

op

Specifies the event type.
The value for snapshot events isr, signifying aREADoperation.

Stopping an incremental snapshot

You can also stop an incremental snapshot by sending a signal to the collection on the source database. You submit a stop snapshot signal by inserting a document into the to the signaling collection. After Debezium detects the change in the signaling collection, it reads the signal, and stops the incremental snapshot operation if it’s in progress.

The query that you submit specifies the snapshot operation ofincremental, and, optionally, the collections of the current running snapshot to be removed.

Prerequisites
Procedure
  1. Insert a stop snapshot signal document into the signaling collection:

    .insert({"id" : _,"type" : "stop-snapshot", "data" : {"data-collections" ["", ""],"type": "incremental"}});

    For example,

    db.debeziumSignal.insert({(1)"type" : "stop-snapshot",(2)(3)"data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""],(4)"type": "incremental"}(5)});

    The values of theid,type, anddataparameters in the signal command correspond to thefields of the signaling collection

    The following table describes the parameters in the example:

    Table 3. Descriptions of fields in an insert command for sending a stop incremental snapshot document to the signaling collection
    Item Value Description

    1

    db.debeziumSignal

    Specifies the fully-qualified name of the signaling collection on the source database.

    2

    null

    The insert method in the preceding example omits use of the optional_idparameter. Because the document does not explicitly assign a value for the parameter, the arbitrary id that MongoDB automatically assigns to the document becomes theididentifier for the signal request.
    Use this string to identify logging messages to entries in the signaling collection. Debezium does not use this identifier string.

    3

    stop-snapshot

    Thetypeparameter specifies the operation that the signal is intended to trigger.

    4

    data-collections

    An optional component of thedatafield of a signal that specifies an array of collection names or regular expressions to match collection names to remove from the snapshot.
    The array lists regular expressions which match collections by their fully-qualified names, using the same format as you use to specify the name of the connector’s signaling collection in thesignal.data.collectionconfiguration property. If this component of thedatafield is omitted, the signal stops the entire incremental snapshot that is in progress.

    5

    incremental

    A required component of thedatafield of a signal that specifies the kind of snapshot operation that is to be stopped.
    Currently, the only valid option isincremental
    If you do not specify atypevalue, the signal fails to stop the incremental snapshot.

Streaming changes

After the connector task for a replica set records an offset, it uses the offset to determine the position in the oplog where it should start streaming changes. The task then (depending on the configuration) either connects to the replica set’s primary node or connects to a replica-set-wide change stream and starts streaming changes from that position. It processes all of create, insert, and delete operations, and converts them into Debeziumchange events.Each change event includes the position in the oplog where the operation was found, and the connector periodically records this as its most recent offset. The interval at which the offset is recorded is governed byoffset.flush.interval.ms, which is a Kafka Connect worker configuration property.

When the connector is stopped gracefully, the last offset processed is recorded so that, upon restart, the connector will continue exactly where it left off. If the connector’s tasks terminate unexpectedly, however, then the tasks may have processed and generated events after it last records the offset but before the last offset is recorded; upon restart, the connector begins at the lastrecordedoffset, possibly generating some the same events that were previously generated just prior to the crash.

When all components in a Kafka pipeline operate nominally, Kafka consumers receive every messageexactly once.However, when things go wrong, Kafka can only guarantee that consumers receive every messageat least once.To avoid unexpected results, consumers must be able to handle duplicate messages.

As mentioned earlier, the connector tasks always use the replica set’s primary node to stream changes from the oplog, ensuring that the connector sees the most up-to-date operations as possible and can capture the changes with lower latency than if secondaries were to be used instead. When the replica set elects a new primary, the connector immediately stops streaming changes, connects to the new primary, and starts streaming changes from the new primary node at the same position. Likewise, if the connector experiences any problems communicating with the replica set members, it tries to reconnect, by using exponential backoff so as to not overwhelm the replica set, and once connected it continues streaming changes from where it last left off. In this way, the connector is able to dynamically adjust to changes in replica set membership and automatically handle communication failures.

To summarize, the MongoDB connector continues running in most situations. Communication problems might cause the connector to wait until the problems are resolved.

Pre-image support

In MongoDB 6.0 and later, you can configure change streams to emit the pre-image state of a document to populate thebeforefield for MongoDB change events. To enable the use of pre-images in MongoDB, you must set thechangeStreamPreAndPostImagesfor a collection by usingdb.createCollection(),create, orcollMod.To enable the Debezium MongoDB to include pre-images in change events, set thecapture.modefor the connector to one of the*_with_pre_imageoptions.

Size limits on MongoDB change stream events

The size of a MongoDB change stream event is limited to 16 megabytes. The use of pre-images thus increases the likelihood of exceeding this threshold, which can lead to failures. For information about how to avoid exceeding the change stream limit, see theMongoDB documentation

Topic names

The MongoDB connector writes events for all insert, update, and delete operations to documents in each collection to a single Kafka topic. The name of the Kafka topics always takes the formlogicalNamedatabaseNamecollectionName, wherelogicalNameis thelogical nameof the connector as specified with thetopic.prefixconfiguration property,databaseNameis the name of the database where the operation occurred, andcollectionNameis the name of the MongoDB collection in which the affected document existed.

For example, consider a MongoDB replica set with aninventorydatabase that contains four collections:products,products_on_hand,customers, andorders.If the connector monitoring this database were given a logical name offulfillment, then the connector would produce events on these four Kafka topics:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

Notice that the topic names do not incorporate the replica set name or shard name. As a result, all changes to a sharded collection (where each shard contains a subset of the collection’s documents) all go to the same Kafka topic.

You can set up Kafka toauto-createthe topics as they are needed. If not, then you must use Kafka administration tools to create the topics before starting the connector.

Partitions

The MongoDB connector does not make any explicit determination about how to partition topics for events. Instead, it allows Kafka to determine how to partition topics based on event keys. You can change Kafka’s partitioning logic by defining the name of thePartitionerimplementation in the Kafka Connect worker configuration.

Kafka maintains total order only for events written to a single topic partition. Partitioning the events by key does mean that all events with the same key always go to the same partition. This ensures that all events for a specific document are always totally ordered.

Transaction Metadata

Debezium can generate events that represents transaction metadata boundaries and enrich change data event messages.

Limits on when Debezium receives transaction metadata

Debezium registers and receives metadata only for transactions that occur after you deploy the connector. Metadata for transactions that occur before you deploy the connector is not available.

For every transactionBEGINandEND, Debezium generates an event that contains the following fields:

status

BEGINorEND

id

String representation of unique transaction identifier.

event_count(forENDevents)

Total number of events emitted by the transaction.

data_collections(forENDevents)

An array of pairs ofdata_collectionandevent_countthat provides number of events emitted by changes originating from given data collection.

The following example shows a typical message:

{“状态”:“开始”,“id”:“1462833718356672513”,"event_count": null, "data_collections": null } { "status": "END", "id": "1462833718356672513", "event_count": 2, "data_collections": [ { "data_collection": "rs0.testDB.collectiona", "event_count": 1 }, { "data_collection": "rs0.testDB.collectionb", "event_count": 1 } ] }

Unless overridden via thetopic.transactionoption, transaction events are written to the topic named.transaction

Change data event enrichment

When transaction metadata is enabled, the data messageEnvelopeis enriched with a newtransaction字段。This field provides information about every event in the form of a composite of fields:

id

String representation of unique transaction identifier.

total_order

The absolute position of the event among all events generated by the transaction.

data_collection_order

The per-data collection position of the event among all events that were emitted by the transaction.

Following is an example of what a message looks like:

{ "after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", "source": { ... }, "op": "c", "ts_ms": "1580390884335", "transaction": { "id": "1462833718356672513", "total_order": "1", "data_collection_order": "1" } }

Data change events

The Debezium MongoDB connector generates a data change event for each document-level operation that inserts, updates, or deletes data. Each event contains a key and a value. The structure of the key and the value depends on the collection that was changed.

Debezium and Kafka Connect are designed aroundcontinuous streams of event messages.However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.

The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. Aschemafield is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:

{ "schema": {(1)... }, "payload": {(2)... }, "schema": {(3)... }, "payload": {(4)... }, }
Table 4. Overview of change event basic content
Item Field name Description

1

schema

The firstschema字段是关键事件的一部分。It specifies a Kafka Connect schema that describes what is in the event key’spayloadportion. In other words, the firstschemafield describes the structure of the key for the document that was changed.

2

payload

The firstpayload字段是关键事件的一部分。它的结构re described by the previousschemafield and it contains the key for the document that was changed.

3

schema

The secondschemafield is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value’spayloadportion. In other words, the secondschemadescribes the structure of the document that was changed. Typically, this schema contains nested schemas.

4

payload

The secondpayloadfield is part of the event value. It has the structure described by the previousschemafield and it contains the actual data for the document that was changed.

By default, the connector streams change event records to topics with names that are the same as the event’s originating collection. Seetopic names

The MongoDB connector ensures that all Kafka Connect schema names adhere to theAvro schema name format.This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or _. Each remaining character in the logical server name and each character in the database and collection names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \_. If there is an invalid character it is replaced with an underscore character.

This can lead to unexpected conflicts if the logical server name, a database name, or a collection name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores.

Change event keys

A change event’s key contains the schema for the changed document’s key and the changed document’s actual key. For a given collection, both the schema and its corresponding payload contain a singleid字段。The value of this field is the document’s identifier represented as a string that is derived fromMongoDB extended JSON serialization strict mode

Consider a connector with a logical name offulfillment, a replica set containing aninventorydatabase, and acustomerscollection that contains documents such as the following.

Example document
{ "_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }
Example change event key

Every change event that captures a change to thecustomerscollection has the same event key schema. For as long as thecustomerscollection has the previous definition, every change event that captures a change to thecustomerscollection has the following key structure. In JSON, it looks like this:

{ "schema": {(1)"type": "struct", "name": "fulfillment.inventory.customers.Key",(2)"optional": false,(3)"fields": [(4){ "field": "id", "type": "string", "optional": false } ] }, "payload": {(5)"id": "1004" } }
Table 5. Description of change event key
Item Field name Description

1

schema

The schema portion of the key specifies a Kafka Connect schema that describes what is in the key’spayloadportion.

2

fulfillment.inventory.customers.Key

Name of the schema that defines the structure of the key’s payload. This schema describes the structure of the key for the document that was changed. Key schema names have the formatconnector-namedatabase-namecollection-nameKey.In this example:

  • fulfillmentis the name of the connector that generated this event.

  • inventoryis the database that contains the collection that was changed.

  • customersis the collection that contains the document that was updated.

3

optional

Indicates whether the event key must contain a value in itspayload字段。In this example, a value in the key’s payload is required. A value in the key’s payload field is optional when a document does not have a key.

4

fields

Specifies each field that is expected in thepayload, including each field’s name, type, and whether it is required.

5

payload

Contains the key for the document for which this change event was generated. In this example, the key contains a singleidfield of typestringwhose value is1004

This example uses a document with an integer identifier, but any valid MongoDB document identifier works the same way, including a document identifier. For a document identifier, an event key’spayload.idvalue is a string that represents the updated document’s original_idfield as a MongoDB extended JSON serialization that uses strict mode. The following table provides examples of how different types of_idfields are represented.

Table 6. Examples of representing document_idfields in event key payloads
Type MongoDB_idValue Key’s payload

Integer

1234

{ "id" : "1234" }

Float

12.34

{ "id" : "12.34" }

String

"1234"

{ "id" : "\"1234\"" }

Document

{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }

{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }

ObjectId

ObjectId("596e275826f08b2730779e1f")

{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }

Binary

BinData("a2Fma2E=",0)

{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }

Change event values

The value in a change event is a bit more complicated than the key. Like the key, the value has aschemasection and apayloadsection. Theschemasection contains the schema that describes theEnvelopestructure of thepayloadsection, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.

Consider the same sample document that was used to show an example of a change event key:

Example document
{ "_id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }

The value portion of a change event for a change to this document is described for each event type:

createevents

The following example shows the value portion of a change event that the connector generates for an operation that creates data in thecustomerscollection:

{ "schema": {(1)"type": "struct", "fields": [ { "type": "string", "optional": true, "name": "io.debezium.data.Json",(2)"version": 1, "field": "after" }, { "type": "string", "optional": true, "name": "io.debezium.data.Json", "version": 1, "field": "patch" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "rs" }, { "type": "string", "optional": false, "field": "collection" }, { "type": "int32", "optional": false, "field": "ord" }, { "type": "int64", "optional": true, "field": "h" } ], "optional": false, "name": "io.debezium.connector.mongo.Source",(3)"field": "source" }, { "type": "string", "optional": true, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope"(4)}, "payload": {(5)"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",(6)"source": {(7)"version": "2.1.2.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 31, "h": 1546547425148721999 }, "op": "c",(8)"ts_ms": 1558965515240(9)} }
Table 7. Descriptions ofcreateevent value fields
Item Field name Description

1

schema

The value’s schema, which describes the structure of the value’s payload. A change event’s value schema is the same in every change event that the connector generates for a particular collection.

2

name

In theschemasection, eachnamefield specifies the schema for a field in the value’s payload.

io.debezium.data.Jsonis the schema for the payload’safter,patch, andfilterfields. This schema is specific to thecustomerscollection. Acreateevent is the only kind of event that contains anafter字段。Anupdateevent contains afilterfield and apatch字段。Adeleteevent contains afilterfield, but not anafterfield nor apatch字段。

3

name

io.debezium.connector.mongo.Sourceis the schema for the payload’ssource字段。This schema is specific to the MongoDB connector. The connector uses it for all events that it generates.

4

name

dbserver1.inventory.customers.Envelopeis the schema for the overall structure of the payload, wheredbserver1is the connector name,inventory是数据库,开云体育电动老虎机customersis the collection. This schema is specific to the collection.

5

payload

The value’s actual data. This is the information that the change event is providing.

It may appear that the JSON representations of the events are much larger than the documents they describe. This is because the JSON representation must include the schema and the payload portions of the message. However, by using theAvro converter, you can significantly decrease the size of the messages that the connector streams to Kafka topics.

6

after

An optional field that specifies the state of the document after the event occurred. In this example, theafterfield contains the values of the new document’s_id,first_name,last_name, andemailfields. Theaftervalue is always a string. By convention, it contains a JSON representation of the document. MongoDB oplog entries contain the full state of a document only for _create_ events and also forupdateevents, when thecapture.modeoption is set tochange_streams_update_full; in other words, acreateevent is the only kind of event that contains anafterfield regardless ofcapture.modeoption.

7

source

Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:

  • Debezium version.

  • Name of the connector that generated the event.

  • Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.

  • 名称的集合nd database that contain the new document.

  • If the event was part of a snapshot.

  • Timestamp for when the change was made in the database and ordinal of the event within the timestamp.

  • Unique identifier of the MongoDB operation (thehfield in the oplog event).

  • Unique identifiers of the MongoDB sessionlsidand transaction numbertxnNumberin case the change was executed inside a transaction (change streams capture mode only).

8

op

Mandatory string that describes the type of operation that caused the connector to generate the event. In this example,cindicates that the operation created a document. Valid values are:

  • c= create

  • u= update

  • d= delete

  • r= read (applies to only snapshots)

9

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

updateevents

Change streams capture mode

The value of a change event for an update in the samplecustomerscollection has the same schema as acreateevent for that collection. Likewise, the event value’s payload has the same structure. However, the event value payload contains different values in anupdateevent. Anupdateevent includes anaftervalue only if thecapture.modeoption is set tochange_streams_update_full.Abeforevalue is provided if thecapture.modeoption is set to one of the*_with_pre_imageoption. There is a new structured fieldupdateDescriptionwith a few additional fields in this case:

  • updatedFieldsis a string field that contains the JSON representation of the updated document fields with their values

  • removedFieldsis a list of field names that were removed from the document

  • truncatedArraysis a list of arrays in the document that were truncated

Here is an example of a change event value in an event that the connector generates for an update in thecustomerscollection:

{ "schema": { ... }, "payload": { "op": "u",(1)"ts_ms": 1465491461815,(2)"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",(3)"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",(4)"updateDescription": { "removedFields": null, "updatedFields": "{\"first_name\": \"Anne Marie\"}",(5)"truncatedArrays": null }, "source": {(6)"version": "2.1.2.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": false, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 1, "h": null, "tord": null, "stxnid": null, "lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}", "txnNumber":1 } } }
表8所示。的描述updateevent value fields
Item Field name Description

1

op

Mandatory string that describes the type of operation that caused the connector to generate the event. In this example,uindicates that the operation updated a document.

2

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

3

before

Contains the JSON string representation of the actual MongoDB document before change. + Anupdateevent value does not contain anbeforefield if the capture mode is not set to one of the*_with_preimageoptions.

4

after

Contains the JSON string representation of the actual MongoDB document.
Anupdateevent value does not contain anafterfield if the capture mode is not set tochange_streams_update_full

5

updatedFields

Contains the JSON string representation of the updated field values of the document. In this example, the update changed thefirst_namefield to a new value.

6

source

Mandatory field that describes the source metadata for the event. This field contains the same information as acreateevent for the same collection, but the values are different since this event is from a different position in the oplog. The source metadata includes:

  • Debezium version.

  • Name of the connector that generated the event.

  • Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.

  • 名称的集合nd database that contain the updated document.

  • If the event was part of a snapshot.

  • Timestamp for when the change was made in the database and ordinal of the event within the timestamp.

  • Unique identifiers of the MongoDB sessionlsidand transaction numbertxnNumberin case the change was executed inside a transaction.

Theaftervalue in the event should be handled as the at-point-of-time value of the document. The value is not calculated dynamically but is obtained from the collection. It is thus possible if multiple updates are closely following one after the other, that allupdateupdates events will contain the sameaftervalue which will be representing the last value stored in the document.

If your application depends on gradual change evolution then you should rely onupdateDescriptiononly.

deleteevents

The value in adeletechange event has the sameschemaportion ascreateandupdateevents for the same collection. Thepayloadportion in adeleteevent contains values that are different fromcreateandupdateevents for the same collection. In particular, adeleteevent contains neither anaftervalue nor aupdateDescriptionvalue. Here is an example of adeleteevent for a document in thecustomerscollection:

{ "schema": { ... }, "payload": { "op": "d",(1)"ts_ms": 1465495462115,(2)"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",(3)"source": {(4)"version": "2.1.2.Final", "connector": "mongodb", "name": "fulfillment", "ts_ms": 1558965508000, "snapshot": true, "db": "inventory", "rs": "rs0", "collection": "customers", "ord": 6, "h": 1546547425148721999 } } }
Table 9. Descriptions ofdeleteevent value fields
Item Field name Description

1

op

Mandatory string that describes the type of operation. Theopfield value isd, signifying that this document was deleted.

2

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

3

before

Contains the JSON string representation of the actual MongoDB document before change. + Anupdateevent value does not contain anbeforefield if the capture mode is not set to one of the*_with_preimageoptions.

4

source

Mandatory field that describes the source metadata for the event. This field contains the same information as acreateorupdateevent for the same collection, but the values are different since this event is from a different position in the oplog. The source metadata includes:

  • Debezium version.

  • Name of the connector that generated the event.

  • Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.

  • 名称的集合nd database that contained the deleted document.

  • If the event was part of a snapshot.

  • Timestamp for when the change was made in the database and ordinal of the event within the timestamp.

  • Unique identifier of the MongoDB operation (thehfield in the oplog event).

  • Unique identifiers of the MongoDB sessionlsidand transaction numbertxnNumberin case the change was executed inside a transaction (change streams capture mode only).

MongoDB connector events are designed to work withKafka log compaction.Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.

Tombstone events

All MongoDB connector events for a uniquely identified document have exactly the same key. When a document is deleted, thedeleteevent value still works with log compaction because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that key, the message value must benull.To make this possible, after Debezium’s MongoDB connector emits adeleteevent, the connector emits a special tombstone event that has the same key but anullvalue. A tombstone event informs Kafka that all messages with that same key can be removed.

Setting up MongoDB

The MongoDB connector uses MongoDB’s change streams to capture the changes, so the connector works only with MongoDB replica sets or with sharded clusters where each shard is a separate replica set. See the MongoDB documentation for setting up areplica setorsharded cluster.Also, be sure to understand how to enableaccess control and authenticationwith replica sets.

You must also have a MongoDB user that has the appropriate roles to read theadmindatabase where the oplog can be read. Additionally, the user must also be able to read theconfigdatabase in the configuration server of a sharded cluster and must havelistDatabasesprivilege action. When change streams are used (the default) the user also must have cluster-wide privilege actionsfindandchangeStream

When you intend to utilize pre-image and populate thebeforefield, you need to first enablechangeStreamPreAndPostImagesfor a collection usingdb.createCollection(),create, orcollMod

MongoDB in the Cloud

You can use the Debezium connector for MongoDB withMongoDB Atlas.Note that MongoDB Atlas only supports secure connections via SSL, i.e. the+mongodb.ssl.enabledconnector optionmustbe set totrue

Deployment

To deploy a Debezium MongoDB connector, you install the Debezium MongoDB connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.

Prerequisites
Procedure
  1. Download theconnector’s plug-in archive,

  2. Extract the JAR files into your Kafka Connect environment.

  3. Add the directory with the JAR files toKafka Connect’splugin.path

  4. Restart your Kafka Connect process to pick up the new JAR files.

If you are working with immutable containers, seeDebezium’s Container imagesfor Apache Zookeeper, Apache Kafka, and Kafka Connect with the MongoDB connector already installed and ready to run.

The Debeziumtutorialwalks you through using these images, and this is a great way to learn about Debezium.

MongoDB connector configuration example

Following is an example of the configuration for a connector instance that captures data from a MongoDB replica setrs0at port 27017 on 192.168.99.100, which we logically namefullfillment.Typically, you configure the Debezium MongoDB connector in a JSON file by setting the configuration properties that are available for the connector.

You can choose to produce events for a particular MongoDB replica set or sharded cluster. Optionally, you can filter out collections that are not needed.

{ "name": "inventory-connector",(1)"config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",(2)"mongodb.hosts": "rs0/192.168.99.100:27017",(3)"topic.prefix": "fullfillment",(4)"collection.include.list": "inventory[.]*"(5)} }
1 The name of our connector when we register it with a Kafka Connect service.
2 The name of the MongoDB connector class.
3 The host addresses to use to connect to the MongoDB replica set.
4 Thelogical nameof the MongoDB replica set, which forms a namespace for generated events and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
5 A list of regular expressions that match the collection namespaces (for example, .) of all collections to be monitored. This is optional.

For the complete list of the configuration properties that you can set for the Debezium MongoDB connector, seeMongoDB connector configuration properties

You can send this configuration with aPOSTcommand to a running Kafka Connect service. The service records the configuration and starts one connector task that performs the following actions:

  • Connects to the MongoDB replica set or sharded cluster.

  • Assigns tasks for each replica set.

  • Performs a snapshot, if necessary.

  • Reads the change stream.

  • Streams change event records to Kafka topics.

Adding connector configuration

To start running a Debezium MongoDB connector, create a connector configuration, and add the configuration to your Kafka Connect cluster.

Prerequisites
Procedure
  1. Create a configuration for the MongoDB connector.

  2. Use theKafka Connect REST APIto add that connector configuration to your Kafka Connect cluster.

Results

After the connector starts, it completes the following actions:

  • Performs a consistent snapshotof the collections in your MongoDB replica sets.

  • Reads the change streams for the replica sets.

  • Produces change events for every inserted, updated, and deleted document.

  • Streams change event records to Kafka topics.

Connector properties

The Debezium MongoDB connector has numerous configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:

The following configuration properties arerequiredunless a default value is available.

Table 10. Required Debezium MongoDB connector configuration properties
Property Default Description

No default

Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)

No default

The name of the Java class for the connector. Always use a value ofio.debezium.connector.mongodb.MongoDbConnectorfor the MongoDB connector.

No default

The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list can contain a single hostname and port pair. Ifmongodb.members.auto.discoveris set tofalse, then the host and port pair should be prefixed with the replica set name (e.g.,rs0/localhost:27017).

It is mandatory to provide the current primary address. This limitation will be removed in the next Debezium release.

No default

Specifies a connection string that the connector uses during the initial discovery of a MongoDB replica set. To use this option, you must set the value ofmongodb.members.auto.discovertotrue.Do not set this property and themongodb.hostsproperty at the same time.

The connector uses this connection string only during the initial replica set discovery process. During this discovery process, the connector ignores connection values that are specified in other properties (mongodb.user,mongodb.password,mongodb.authsource, SSL configuration properties, and so forth). After the discovery process completes, when the connector attempts to establish a direct connection to a primary replica set member, the connector then returns to using the standard connection properties, and it ignores values inmongodb.connection.string.However, if credential information is not present elsewhere in the configuration, the connector can extract credential information from the value of the connection string. For example, if themongodb.userproperty is not set, but the connection string includes the MongoDB username, the connector reads the information from the string.

No default

A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster. Use only alphanumeric characters, hyphens, dots and underscores to form the name. The logical name should be unique across all other connectors, because the name is used as the prefix in naming the Kafka topics that receive records from this connector.

Do not change the value of this property. If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.

No default

Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

No default

Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

admin

Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database thanadmin

false

Connector will use SSL to connect to MongoDB instances.

false

When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. Iftruethe connection will not prevent man-in-the-middle attacks.

empty string

An optional comma-separated list of regular expressions that match database names to be monitored. By default, all databases are monitored.
Whendatabase.include.listis set, the connector monitors only the databases that the property specifies. Other databases are excluded from monitoring.

To match the name of a database, Debezium applies the regular expression that you specify as ananchored正则表达式。也就是说,指定的表达ion is matched against the entire name string of the database; it does not match substrings that might be present in a database name.
If you include this property in the configuration, do not also set thedatabase.exclude.listproperty.

empty string

An optional comma-separated list of regular expressions that match database names to be excluded from monitoring. Whendatabase.exclude.listis set, the connector monitors every database except the ones that the property specifies.

To match the name of a database, Debezium applies the regular expression that you specify as ananchored正则表达式。也就是说,指定的表达ion is matched against the entire name string of the database; it does not match substrings that might be present in a database name.
If you include this property in the configuration, do not set thedatabase.include.listproperty.

empty string

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored. By default, the connector monitors all collections except those in thelocalandadmindatabases. Whencollection.include.listis set, the connector monitors only the collections that the property specifies. Other collections are excluded from monitoring. Collection identifiers are of the formdatabaseNamecollectionName

To match the name of a namespace, Debezium applies the regular expression that you specify as ananchored正则表达式。也就是说,指定的表达ion is matched against the entire name string of the namespace; it does not match substrings in the name.
If you include this property in the configuration, do not also set thecollection.exclude.listproperty.

empty string

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring. Whencollection.exclude.listis set, the connector monitors every collection except the ones that the property specifies. Collection identifiers are of the formdatabaseNamecollectionName

To match the name of a namespace, Debezium applies the regular expression that you specify as ananchored正则表达式。也就是说,指定的表达ion is matched against the entire name string of the namespace; it does not match substrings that might be present in a database name.
If you include this property in the configuration, do not set thecollection.include.listproperty.

initial

Specifies the criteria for performing a snapshot when the connector starts. Set the property to one of the following values:

initial

When the connector starts, if it does not detect a value in its offsets topic, it performs a snapshot of the database.

never

When the connector starts, it skips the snapshot process and immediately begins to stream change events for operations that the database records to the oplog.

change_streams_update_full

Specifies the method that the connector uses to captureupdateevent changes from a MongoDB server. Set this property to one of the following values:

change_streams

updateevent messages do not include the full document. Messages do not include a field that represents the state of the documentbeforethe change.

change_streams_update_full

updateevent messages include the full document. Messages do not include abeforefield that represents the state of the document before the update. The event message returns the full state of the document in theafter字段。

In some situations, whencapture.modeis configured to return full documents, theupdateDescriptionandafterfields of the update event message might report inconsistent values. Such discrepancies can result after multiple updates are applied to a document in rapid succession. The connector requests the full document from the MongoDB database only after it receives the update described in the event’supdateDescription字段。If a later update modifies the source document before the connector can retrieve it from the database, the connector receives the document that is modified by this later update.

When you configurecapture.modeto return the full document, you might notice a discrepancy between the content in theupdateDescriptionandafterfields of the message. Discrepancies can result when multiple changes are applied to a document in rapid succession. Because the connector submits the request for the full document only after an update is applied, later updates can modify the source document after the update that is described in theupdateDescription字段。完整的文档,然后连接器receives in response to its query reflects the result of the later change.

change_streams_update_full_with_pre_image

update事件事件消息包括完整的文档,一个d include a field that represents the state of the documentbeforethe change.

change_streams_with_pre_image

updateevents do not include the full document, but include a field that represents the state of the documentbeforethe change.

All collections specified incollection.include.list

An optional, comma-separated list of regular expressions that match the fully-qualified names ()的模式,您想要包含在系统网络体系结构(sna)pshot. The specified items must be named in the connectors’scollection.include.listproperty. This property takes effect only if the connector’ssnapshot.modeproperty is set to a value other thannever
This property does not affect the behavior of incremental snapshots.

To match the name of a schema, Debezium applies the regular expression that you specify as ananchored正则表达式。也就是说,指定的表达ion is matched against the entire name string of the schema; it does not match substrings that might be present in a schema name.

empty string

An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the formdatabaseNamecollectionNamefieldNamenestedFieldName, wheredatabaseNameandcollectionNamemay contain the wildcard (*) which matches any characters.

empty string

An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the formdatabaseNamecollectionNamefieldNamenestedFieldName:newNestedFieldName, wheredatabaseNameandcollectionNamemay contain the wildcard (*) which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path.

1

应该创建的最大数量的任务for this connector. The MongoDB connector will attempt to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, we recommend specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.

1

Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1.

true

Controls whether adeleteevent is followed by a tombstone event.

true- a delete operation is represented by adeleteevent and a subsequent tombstone event.

false- only adeleteevent is emitted.

After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in caselog compactionis enabled for the topic.

No default

An interval in milliseconds that the connector should wait before taking a snapshot after starting up;
Can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which may cause re-balancing of connectors.

0

Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in multiple batches of this size.
Defaults to 0, which indicates that the server chooses an appropriate fetch size.

none

Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings:

  • nonedoes not apply any adjustment.

  • avroreplaces the characters that cannot be used in the Avro type name with underscore.

The followingadvancedconfiguration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector’s configuration.

Table 11. Debezium MongoDB connector advanced configuration properties
Property Default Description

2048

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.

8192

Positive integer value that specifies the maximum number of records that the blocking queue can hold. When Debezium reads events streamed from the database, it places the events in the blocking queue before it writes them to Kafka. The blocking queue can provide backpressure for reading change events from the database in cases where the connector ingests messages faster than it can write them to Kafka, or when Kafka becomes unavailable. Events that are held in the queue are disregarded when the connector periodically records offsets. Always set the value ofmax.queue.sizeto be larger than the value ofmax.batch.size

0

A long integer value that specifies the maximum volume of the blocking queue in bytes. By default, volume limits are not specified for the blocking queue. To specify the number of bytes that the queue can consume, set this property to a positive long value.
Ifmax.queue.sizeis also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property. For example, if you setmax.queue.size=1000, andmax.queue.size.in.bytes=5000, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.

1000

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 500 milliseconds, or 0.5 second.

1000

Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1 second (1000 ms).

1000

Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120 seconds (120,000 ms).

16

Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults forconnect.backoff.initial.delay.msandconnect.backoff.max.delay.msresults in just over 20 minutes of attempts before failing.

true

Boolean value that specifies whether the addresses in 'mongodb.hosts' are seeds that should be used to discover all members of the cluster or replica set (true), or whether the address(es) inmongodb.hostsshould be used as is (false).The default istrueand should be used in all cases except where MongoDB isfronted by a proxy

v2

Schema version for thesource在疾控中心事件。开云体育官方注册网址Debezium 0.10引入了一个铁w breaking
changes to the structure of thesourceblock in order to unify the exposed structure across all the connectors.
By setting this option tov1the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.

0

Controls how frequently heartbeat messages are sent.
这个属性包含一个时间间隔,以毫秒为单位that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog/change stream from the database but never emit any change messages into Kafka, which in turn means that no offset updates are committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.

Set this parameter to0to not send heartbeat messages at all.
Disabled by default.

truewhen connector configuration explicitly specifies thekey.converterorvalue.converterparameters to use Avro, otherwise defaults tofalse

Whether field names are sanitized to adhere to Avro naming requirements. SeeAvro namingfor more details.

t

A comma-separated list of operation types that will be skipped during streaming. The operations include:cfor inserts/create,ufor updates,dfor deletes,tfor truncates, andnoneto not skip any operations. By default, truncate operations are skipped (not emitted by this connector).

No default

Controls which collection items are included in snapshot. This property affects snapshots only. Specify a comma-separated list of collection names in the formdatabaseName.collectionName

For each collection that you specify, also specify another configuration property:snapshot.collection.filter.overrides.databaseNamecollectionName.For example, the name of the other configuration property might be:snapshot.collection.filter.overrides.customers.orders.Set this property to a valid filter expression that retrieves only the items that you want in the snapshot. When the connector performs a snapshot, it retrieves only the items that matches the filter expression.

false

When set totrueDebezium generates events with transaction boundaries and enriches data events envelope with transaction metadata.

SeeTransaction Metadatafor additional details.

10000 (10 seconds)

The number of milliseconds to wait before restarting a connector after a retriable error occurs.

30000

The interval in which the connector polls for new, removed, or changed replica sets.

10000 (10 seconds)

The number of milliseconds the driver will wait before a new connection attempt is aborted.

10000 (10 seconds)

The frequency that the cluster monitor attempts to reach each server.

0

The number of milliseconds before a send/receive on the socket can take before a timeout occurs. A value of0disables this behavior.

30000 (30 seconds)

The number of milliseconds the driver will wait to select a server before it times out and throws an error.

0

Specifies the maximum number of milliseconds the oplog/change stream cursor will wait for the server to produce a result before causing an execution timeout exception. A value of0indicates using the server/driver default wait timeout.

No default

Fully-qualified name of the data collection that is used to sendsignalsto the connector. Use the following format to specify the collection name:

1024

The maximum number of documents that the connector fetches and reads into memory during an incremental snapshot chunk. Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size. However, larger chunk sizes also require more memory to buffer the snapshot data. Adjust the chunk size to a value that provides the best performance in your environment.

io.debezium.schema.DefaultTopicNamingStrategy

The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults toDefaultTopicNamingStrategy

Specify the delimiter for topic name, defaults to

10000

The size used for holding the topic names in bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.

__debezium-heartbeat

Controls the name of the topic to which the connector sends heartbeat messages. The topic name has this pattern:

topic.heartbeat.prefixtopic.prefix

For example, if the topic prefix isfulfillment, the default topic name is__debezium-heartbeat.fulfillment

transaction

Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has this pattern:

topic.prefixtopic.transaction

For example, if the topic prefix isfulfillment, the default topic name isfulfillment.transaction

Monitoring

Debe开云体育官方注册网址zium MongoDB连接器有两个指标类型s in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect have.

  • Snapshot metricsprovide information about connector operation while performing a snapshot.

  • Streaming metricsprovide information about connector operation when the connector is capturing changes and streaming change event records.

TheDebezium monitoring documentationprovides details about how to expose these metrics by using JMX.

Snapshot Metrics

TheMBeanisdebezium.mongodb:type=connector-metrics,context=snapshot,server=,task=

Snapshot metrics are not exposed unless a snapshot operation is active, or if a snapshot has occurred since the last connector start.

The following table lists the shapshot metrics that are available.

Attributes Type Description

string

The last snapshot event that the connector has read.

long

The number of milliseconds since the connector has read and processed the most recent event.

long

The total number of events that this connector has seen since last started or reset.

long

The number of events that have been filtered by include/exclude list filtering rules configured on the connector.

string[]

The list of tables that are captured by the connector.

int

The length the queue used to pass events between the snapshotter and the main Kafka Connect loop.

int

The free capacity of the queue used to pass events between the snapshotter and the main Kafka Connect loop.

int

The total number of tables that are being included in the snapshot.

int

The number of tables that the snapshot has yet to copy.

boolean

Whether the snapshot was started.

boolean

Whether the snapshot was paused.

boolean

Whether the snapshot was aborted.

boolean

Whether the snapshot completed.

long

The total number of seconds that the snapshot has taken so far, even if not complete. Includes also time when snapshot was paused.

long

The total number of seconds that the snapshot was paused. If the snapshot was paused several times, the paused time adds up.

Map

Map containing the number of rows scanned for each table in the snapshot. Tables are incrementally added to the Map during processing. Updates every 10,000 rows scanned and upon completing a table.

long

The maximum buffer of the queue in bytes. This metric is available ifmax.queue.size.in.bytesis set to a positive long value.

long

The current volume, in bytes, of records in the queue.

The Debezium MongoDB connector also provides the following custom snapshot metrics:

Attribute Type Description

NumberOfDisconnects

long

Number of database disconnects.

Streaming Metrics

TheMBeanisdebezium.mongodb:type=connector-metrics,context=streaming,server=,task=

The following table lists the streaming metrics that are available.

Attributes Type Description

string

The last streaming event that the connector has read.

long

The number of milliseconds since the connector has read and processed the most recent event.

long

The total number of events that this connector has seen since the last start or metrics reset.

long

The total number of create events that this connector has seen since the last start or metrics reset.

long

The total number of update events that this connector has seen since the last start or metrics reset.

long

The total number of delete events that this connector has seen since the last start or metrics reset.

long

The number of events that have been filtered by include/exclude list filtering rules configured on the connector.

string[]

The list of tables that are captured by the connector.

int

The length the queue used to pass events between the streamer and the main Kafka Connect loop.

int

The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop.

boolean

Flag that denotes whether the connector is currently connected to the database server.

long

The number of milliseconds between the last change event’s timestamp and the connector processing it. The values will incoporate any differences between the clocks on the machines where the database server and the connector are running.

long

The number of processed transactions that were committed.

Map

The coordinates of the last received event.

string

Transaction identifier of the last processed transaction.

long

The maximum buffer of the queue in bytes. This metric is available ifmax.queue.size.in.bytesis set to a positive long value.

long

The current volume, in bytes, of records in the queue.

The Debezium MongoDB connector also provides the following custom streaming metrics:

Attribute Type Description

NumberOfDisconnects

long

Number of database disconnects.

NumberOfPrimaryElections

long

Number of primary node elections.

MongoDB connector common issues

Debezium is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. When the system is operating normally and is managed carefully, then Debezium providesexactly oncedelivery of every change event.

If a fault occurs, the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In such situations, Debezium, like Kafka, providesat least oncedelivery of change events.

The rest of this section describes how Debezium handles various kinds of faults and problems.

Configuration and startup errors

In the following situations, the connector fails when trying to start, reports an error or exception in the log, and stops running:

  • The connector’s configuration is invalid.

  • The connector cannot successfully connect to MongoDB by using the specified connection parameters.

After a failure, the connector attempts to reconnect by using exponential backoff. You can configure the maximum number of reconnection attempts.

In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the MongoDB problem has been addressed.

MongoDB becomes unavailable

Once the connector is running, if the primary node of any of the MongoDB replica sets become unavailable or unreachable, the connector will repeatedly attempt to reconnect to the primary node, using exponential backoff to prevent saturating the network or servers. If the primary remains unavailable after the configurable number of connection attempts, the connector will fail.

The attempts to reconnect are controlled by three properties:

  • connect.backoff.initial.delay.ms- The delay before attempting to reconnect for the first time, with a default of 1 second (1000 milliseconds).

  • connect.backoff.max.delay.ms- The maximum delay before attempting to reconnect, with a default of 120 seconds (120,000 milliseconds).

  • connect.max.attempts- The maximum number of attempts before an error is produced, with a default of 16.

Each delay is double that of the prior delay, up to the maximum delay. Given the default values, the following table shows the delay for each failed connection attempt and the total accumulated time before failure.

Reconnection attempt number Delay before attempt, in seconds Total delay before attempt, in minutes and seconds

1

1

00:01

2

2

00:03

3

4

00:07

4

8

00:15

5

16

00:31

6

32

01:03

7

64

02:07

8

120

04:07

9

120

06:07

10

120

08:07

11

120

10:07

12

120

12:07

13

120

14:07

14

120

16:07

15

120

18:07

16

120

20:07

Kafka Connect process stops gracefully

If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.

If the group contains only one process and that process is stopped gracefully, then Kafka Connect will stop the connector and record the last offset for each replica set. Upon restart, the replica set tasks will continue exactly where they left off.

Kafka Connect process crashes

If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will terminate without recording their most recently-processed offsets. When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes. However, the MongoDB connectors will resume from the last offsetrecordedby the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.

Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. Debezium changes are idempotent, so a sequence of events always results in the same state.

Debezium also includes with each change event message the source-specific information about the origin of the event, including the MongoDB event’s unique transaction identifier (h) and timestamp (secandord).消费者可以跟踪对方er of these values to know whether it has already seen a particular event.

Kafka becomes unavailable

Kafk连接器生成更改事件,a Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency that you have specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.

Connector fails after it is stopped for a long interval ifsnapshot.modeis set toinitial

如果连接器优雅地停止,用户开启t continue to perform operations on replica set members. Changes that occur while the connector is offline continue to be recorded in MongoDB’s oplog. In most cases, after the connector is restarted, it reads the offset value in the oplog to determine the last operation that it streamed for each replica set, and then resumes streaming changes from that point. After the restart, database operations that occurred while the connector was stopped are emitted to Kafka as usual, and after some time, the connector catches up with the database. The amount of time required for the connector to catch up depends on the capabilities and performance of Kafka and the volume of changes that occurred in the database.

However, if the connector remains stopped for a long enough interval, it can occur that MongoDB purges the oplog during the time that the connector is inactive, resulting in the loss of information about the connector’s last position. After the connector restarts, it cannot resume streaming, because the oplog no longer contains the previous offset value that marks the last operation that the connector processed. The connector also cannot perform a snapshot, as it typically would when thesnapshot.modeproperty is set toinitial, and no offset value is present. In this case, a mismatch exists, because the oplog does not contain the value of the previous offset, but the offset value is present in the connector’s internal Kafka offsets topic. An error results and the connector fails.

To recover from the failure, delete the failed connector, and create a new connector with the same configuration but with a different connector name. When you start the new connector, it performs a snapshot to ingest the state of database, and then resumes streaming.

MongoDB loses writes

In certain failure situations, MongoDB can lose commits, which results in the MongoDB connector being unable to capture the lost changes. For example, if the primary crashes suddenly after it applies a change and records the change to its oplog, the oplog might become unavailable before secondary nodes can read its contents. As a result, the secondary node that is elected as the new primary node might be missing the most recent changes from its oplog.

At this time, there is no way to prevent this side effect in MongoDB.