You are viewing documentation for an unreleased version of Debezium.
If you want to view the latest stable version of this page, please gohere.

MongoDB New Document State Extraction

This SMT is supported only for the MongoDB connector. SeeExtracting source recordafterstate from Debezium change eventsfor the relational database equivalent to this SMT.

The Debezium MongoDB connector generates the data in a form of a complex message structure. The message consists of two parts:

  • operation and metadata

  • 插入的le data after the insert has been executed; for updates a patch element describing the altered fields

Theafterandpatch元素包含JS是字符串ON representations of the inserted/altered data. E.g. the general message structure for a insert event looks like this:

{ "op": "r", "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}", "source": { ... } }

More details about the message structure are provided inthe documentationof the MongoDB connector.

While this structure is a good fit to represent changes to MongoDB’s schemaless collections, it is not understood by existing sink connectors such as the Confluent JDBC sink connector.

Therefore Debezium provides aa single message transformation(SMT) which converts theafter/patch信息从MongoDB疾控中心事件变成一个structure suitable for consumption by existing sink connectors. To do so, the SMT parses the JSON strings and reconstructs properly typed Kafka Connect (comprising the correct message payload and schema) records from that, which then can be consumed by connectors such as the JDBC sink connector.

Using JSON as visualization of the emitted record structure, the event from above would like this:

{ "field1" : "newvalue1", "field2" : "newvalue2" }

The SMT should be applied on a sink connector.

配置

The configuration is a part of sink task connector and is expressed in a set of properties:

transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.drop.tombstones=false transforms.unwrap.delete.handling.mode=drop transforms.unwrap.add.headers=op
Customizing the configuration

The connector might emit many types of event messages (for example, heartbeat messages, tombstone messages, or metadata messages about transactions). To apply the transformation to a subset of events, you can definean SMT predicate statement that selectively applies the transformationto specific events only.

Array encoding

The SMT converts MongoDB arrays into arrays as defined by Apache Connect (or Apache Avro) schema. The problem is that such arrays must contains elements of the same type. MongoDB allows the user to store elements of heterogeneous types into the same array. To bypass this impedance mismatch it is possible to encode the array in two different ways usingarray.encodingconfiguration option.

transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.array.encoding=

Valuearray(the default) will encode arrays as the array datatype. It is user’s responsibility to ensure that all elements for a given array instance are of the same type. This option is a restricting one but offers easy processing of arrays by downstream clients.

Valuedocumentwill convert the array into astructofstructsin the similar way as done byBSON serialization. The mainstructcontains fields named_0,_1,_2etc. where the name represents the index of the element in the array. Every element is then passed as the value for the given field.

Let’s suppose an example source MongoDB document with array with heterogeneous types

{ "_id": 1, "a1": [ { "a": 1, "b": "none" }, { "a": "c", "d": "something" } ] }

This document will be encoded as

{ "_id": 1, "a1": { "_0": { "a": 1, "b": "none" }, "_1": { "a": "c", "d": "something" } } }

This option allows you to process arbitrary arrays but the consumer need to know how to properly handle them.

Note: The underscore in index names is present because Avro encoding requires field names not to start with digit.

Nested structure flattening

When a MongoDB document contains a nested document (structure) it is faithfully encoded as a nested structure field. If the sink connector does support only flat structure it is possible to flatten the internal structure into a flat one with a consistent field naming. To enable this feature the optionflatten.structmust be set totrue.

transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.flatten.struct= transforms.unwrap.flatten.struct.delimiter=

The resulting flat document will consist of fields whose names are created by joining the name of the parent field and the name of the fields in the nested document. Those elements are separated with string defined by an optionstruct.delimiterby default set to theunderscore.

Let’s suppose an example source MongoDB document with a field with a nested document

{ "_id": 1, "a": { "b": 1, "c": "none" }, "d": 100 }

Such document will be encoded as

{ "_id": 1, "a_b": 1, "a_c": "none", "d": 100 }

This option allows you to convert a hierarchical document into a flat structure suitable for a table-like storage.

MongoDB$unsethandling

MongoDB allows$unsetoperations that remove a certain field from a document. Because the collections are schemaless, it becomes hard to inform consumers/sinkers about the field that is now missing. The approach that Debezium uses is to set the field being removed to a null value.

Given the operation

{ "after":null, "patch":"{\"$unset\" : {\"a\" : true}}" }

The final encoding will look like

{ "id": 1, "a": null }

Note that other MongoDB operations might cause an$unsetinternally,$renameis one example.

Determine original operation

When a message is flattened the final result does not show whether it was an insert, update or first read. (Deletions can be detected via tombstones or rewrites, see配置options.)

To solve this problem, you can propagate the original operation either as a field added to message value or as a header property, e.g. like so to use a header property:

transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.headers=op

The possible values are the ones from theopfield ofMongoDB connector change events.

Adding metadata fields

The SMT can optionally add metadata fields from the original change event’s to the final flattened record (prefixed with "__"). This ability to add metadata to the event record makes it possible to include content such as the name of the collection associated with the change event, or such connector-specific fields as the replica set name. Currently, only fields from change event sub-structuressource,transactionandupdateDescriptioncan be added. For more information about the MongoDB change event structure, seethe documentationfor the MongoDB connector.

For example, you might specify the following configuration to add a replica set name (rs) and the collection name for a change event to the final flattened event record:

transforms=unwrap,... transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState transforms.unwrap.add.fields=rs,collection

The preceding configuration results in the following content being added to the flattened record:

{ "__rs" : "rs0", "__collection" : "my-collection", ... }

ForDELETEevents, the option to add metadata fields is supported only if thedelete.handling.modeoption is set torewrite.

Options for applying the transformation selectively

In addition to the change event messages that a Debezium connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions. Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it’s best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.

For more information about how to apply the SMT selectively, seeConfigure an SMT predicate for the transformation.

配置options

Property Default Description

array

The SMT converts MongoDB arrays into arrays as defined by Apache Connect (or Apache Avro) schema.

false

The SMT flattens structs by concatenating the fields into plain properties, using a configurable delimiter.

_

Delimiter to concat between field names from the input record when generating field names for the output record. Only applies whenflatten.structis set totrue

true

The SMT removes the tombstone generated by Debezium from the stream.

drop

The SMT candrop,rewriteor pass delete records (none). Therewritemode will add a__deletedfield set totrueorfalsedepending on the represented operation.

__ (double-underscore)

Set this optional string to prefix a header.

Specify a list of metadata fields to add to header of the flattened message. In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with__or____, depending on the specification of the struct. Please use a comma separated list without spaces.

__ (double-underscore)

Set this optional string to prefix a field.

Specify a list of metadata fields to add to the flattened message. In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with__or____, depending on the specification of the struct. Please use a comma separated list without spaces.

Known limitations

  • Feeding data changes from a schemaless store such as MongoDB to strictly schema-based datastores such as a relational database can by definition work within certain limits only. Specifically, all fields of documents within one collection with the same name must be of the same type. Otherwise, no consistent column definition can be derived in the target database.

  • Arrays will be restored in the emitted Kafka Connect record correctly, but they are not supported by sink connector just expecting a "flat" message structure.