You are viewing documentation for an outdated version of Debezium.
If you want to view the latest stable version of this page, please gohere.

Debezium connector for Vitess

Debezium’s Vitess connector captures row-level changes in the shards of a Vitesskeyspace. Vitess version8.0.0, 9.0.0 are supported.

The connector does not support snapshot feature at the moment. The first time it connects to a Vitess cluster, it starts from the current VGTID location of the keyspace and continuously captures row-level changes that insert, update, and delete database content and that were committed to a Vitess keyspace. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.

Overview

Vitess’sVStreamfeature was introduced in version 4.0. It is a change event subscription service that provides equivalent information to the MySQL binary logs from the underlying MySQL shards of the Vitess cluster. An user can subscribe to multiple shards in a keyspace, making it a convenient tool to feed downstream CDC processes.

To read and process database changes, the Vitess connector subscribes toVTGate's VStream gRPC service. VTGate is a lightweight, stateless gRPC server, which is part of the Vitess cluster setup.

The connector gives you the flexibility to choose to subscribe to theMASTERnodes, or to theREPLICAnodes for change events.

The connector produces a change event for every row-level insert, update, and delete operation that was captured and sends change event records for each table in a separate Kafka topic. Client applications read the Kafka topics that correspond to the database tables of interest, and can react to every row-level event they receive from those topics.

The connector is tolerant of failures. As the connector reads changes and produces events, it records the VGTID position for each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart the connector continues reading the WAL where it last left off.

How the connector works

To optimally configure and run a Debezium Vitess connector, it is helpful to understand how the connector streams change events, determines Kafka topic names, and uses metadata.

Streaming changes

维塔斯连接器花所有的时间流changes from the VTGate’s VStream gRPC service to which it is subscribed. The client receives changes from VStream as they are committed in the underlying MySQL server’s binlog at certain positions, which are referred to as VGTID.

The VGTID in Vitess is the equivalent of GTID in MySQL, it describes the position in the VStream in which a change event happens. Typically, A VGTID has multiple shard GTIDs, each shard GTID is a tuple of(Keyspace, Shard, GTID), which describes the GTID position of a given shard.

When subscribing to a VStream service, the connector needs to provide a VGTID and aTablet Type(e.g.MASTER,REPLICA). The VGTID describes the position from which VStream should starts sending change events; the Tablet type describes which underlying MySQL instance (master or replica) in each shard do we read change events from.

The first time the connector connects to a Vitess cluster, it gets the current VGTID from a Vitess component calledVTCtldand provides the current VGTID to VStream.

The Debezium Vitess connector acts as a gRPC client of VStream. When the connector receives changes it transforms the events into Debeziumcreate,update, ordeleteevents that include the VGTID of the event. The Vitess connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.

Periodically, Kafka Connect records the most recentoffsetin another Kafka topic. The offset indicates source-specific position information that Debezium includes with each event. For the Vitess connector, the VGTID recorded in each change event is the offset.

When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset. When the connector restarts, it sends a request to VStream to send the events starting just after that position.

Topics names

The Vitess connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. By default, the Kafka topic name isserverName.keyspaceName.tableNamewhere:

  • serverNameis the logical name of the connector as specified with thedatabase.server.nameconnector configuration property.

  • keyspaceNameis the name of the keyspace (a.k.a. database) where the operation occurred.

  • tableNameis the name of the database table in which the operation occurred.

For example, suppose thatfulfillmentis the logical server name in the configuration for a connector that is capturing changes in a Vitess installation that has ancommercekeyspace that contains four tables:products,products_on_hand,customers, andorders. Regardless of how many shards the keyspace has, the connector would stream records to these four Kafka topics:

  • fulfillment.commerce.products

  • fulfillment.commerce.products_on_hand

  • fulfillment.commerce.customers

  • fulfillment.commerce.orders

Data change events

The Debezium Vitess connector generates a data change event for each row-levelINSERT,UPDATE, andDELETEoperation. Each event contains a key and a value. The structure of the key and the value depends on the table that was changed.

Debezium and Kafka Connect are designed aroundcontinuous streams of event messages. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.

The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. Aschemafield is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:

{ "schema": {(1)... }, "payload": {(2)... }, "schema": {(3)... }, "payload": {(4)... }, }
Table 1. Overview of change event basic content
Item Field name Description

1

schema

The firstschemafield is part of the event key. It specifies a Kafka Connect schema that describes what is in the event key’spayloadportion. In other words, the firstschemafield describes the structure of the primary key, or the first single-column unique key if the table does not have a primary key, for the table that was changed. Multi-column unique key is not supported.

It is possible to override the table’s primary key by setting themessage.key.columnsconnector configuration property. In this case, the first schema field describes the structure of the key identified by that property.

2

payload

The firstpayloadfield is part of the event key. It has the structure described by the previousschemafield and it contains the key for the row that was changed.

3

schema

The secondschemafield is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value’spayloadportion. In other words, the secondschemadescribes the structure of the row that was changed. Typically, this schema contains nested schemas.

4

payload

The secondpayloadfield is part of the event value. It has the structure described by the previousschemafield and it contains the actual data for the row that was changed.

By default behavior is that the connector streams change event records totopics with names that are the same as the event’s originating table.

Starting with Kafka 0.10, Kafka can optionally record the event key and value with thetimestampat which the message was created (recorded by the producer) or written to the log by Kafka.

The Vitess connector ensures that all Kafka Connect schema names adhere to theAvro schema name format. This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or _. Each remaining character in the logical server name and each character in the schema and table names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \_. If there is an invalid character it is replaced with an underscore character.

This can lead to unexpected conflicts if the logical server name, a schema name, or a table name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores.

The connector doesn’t allow to name columns with the@prefix at the moment. For example,ageis a valid column name, and@ageis not. The reason is that Vitess vstreamer has a bug that would send events with anonymized column names (e.g. column nameageis anonymized to@1). There’s no easy way to differentiate between a legit column name with the@prefix, and the Vitess bug. See more discussionhere.

Change event keys

For a given table, the change event’s key has a structure that contains a field for each column in the primary key of the table at the time the event was created.

Consider acustomerstable defined in thecommercekeyspace and the example of a change event key for that table.

Example table
CREATE TABLE customers ( id INT NOT NULL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id) );
Example change event key

If thedatabase.server.nameconnector configuration property has the valueVitess_server, every change event for thecustomerstable while it has this definition has the same key structure, which in JSON looks like this:

{ "schema": {(1)"type": "struct", "name": "Vitess_server.commerce.customers.Key",(2)"optional": false,(3)"fields": [(4){ "name": "id", "index": "0", "schema": { "type": "INT32", "optional": "false" } } ] }, "payload": {(5)"id": "1" }, }
Table 2. Description of change event key
Item Field name Description

1

schema

The schema portion of the key specifies a Kafka Connect schema that describes what is in the key’spayloadportion.

2

Vitess_server.commerce.customers.Key

Name of the schema that defines the structure of the key’s payload. This schema describes the structure of the primary key for the table that was changed. Key schema names have the format连接器,name.keyspace-name.table-name.Key. In this example:

  • Vitess_serveris the name of the connector that generated this event.

  • commerceis the keyspace that contains the table that was changed.

  • customersis the table that was updated.

3

optional

Indicates whether the event key must contain a value in itspayloadfield. In this example, a value in the key’s payload is required. A value in the key’s payload field is optional when a table does not have a primary key.

4

fields

Specifies each field that is expected in thepayload, including each field’s name, index, and schema.

5

payload

包含的关键行这种变化event was generated. In this example, the key, contains a singleidfield whose value is1.

Although thecolumn.exclude.listandcolumn.include.listconnector configuration properties allow you to capture only a subset of table columns, all columns in a primary or unique key are always included in the event’s key.

If the table does not have a primary, then the change event’s key is null. The rows in a table without a primary key constraint cannot be uniquely identified.

Change event values

The value in a change event is a bit more complicated than the key. Like the key, the value has aschemasection and apayloadsection. Theschema部分包含模式描述Envelopestructure of thepayloadsection, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.

Consider the same sample table that was used to show an example of a change event key:

CREATE TABLE customers ( id INT NOT NULL, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL, PRIMARY KEY(id) );

The emitted events forUPDATEandDELETEoeprations contain the previous values of all columns in the table.

createevents

The following example shows the value portion of a change event that the connector generates for an operation that creates data in thecustomerstable:

{ "schema": {(1)"type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "Vitess_server.commerce.customers.Value",(2)"field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "Vitess_server.commerce.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "vgtid" } ], "optional": false, "name": "io.debezium.connector.vitess.Source",(3)"field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "Vitess_server.commerce.customers.Envelope"(4)}, "payload": {(5)"before": null,(6)"after": {(7)"id": 1, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": {(8)“版本”:“1.5.4。最后”、“连接器”:“维塔斯”、“name": "my_sharded_connector", "ts_ms": 1559033904863, "snapshot": true, "db": "Vitess_server", "schema": "commerce", "table": "customers", "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-45\"}]" }, "op": "c",(9)"ts_ms": 1559033904863(10)} }
Table 3. Descriptions ofcreateevent value fields
Item Field name Description

1

schema

The value’s schema, which describes the structure of the value’s payload. A change event’s value schema is the same in every change event that the connector generates for a particular table.

2

name

In theschemasection, eachnamefield specifies the schema for a field in the value’s payload.

Vitess_server.commerce.customers.Valueis the schema for the payload’sbeforeandafterfields. This schema is specific to thecustomerstable.

Names of schemas forbeforeandafterfields are of the formlogicalName.keyspaceName.tableName.Value, which ensures that the schema name is unique in the database. This means that when using theAvro converter, the resulting Avro schema for each table in each logical source has its own evolution and history.

3

name

io.debezium.connector.vitess.Sourceis the schema for the payload’ssourcefield. This schema is specific to the Vitess connector. The connector uses it for all events that it generates.

4

name

Vitess_server.commerce.customers.Envelopeis the schema for the overall structure of the payload, whereVitess_serveris the connector name,commerceis the keyspace, andcustomersis the table.

5

payload

The value’s actual data. This is the information that the change event is providing.

It may appear that the JSON representations of the events are much larger than the rows they describe. This is because the JSON representation must include the schema and the payload portions of the message. However, by using theAvro converter, you can significantly decrease the size of the messages that the connector streams to Kafka topics.

6

before

An optional field that specifies the state of the row before the event occurred. When theopfield iscfor create, as it is in this example, thebeforefield isnullsince this change event is for new content.

7

after

An optional field that specifies the state of the row after the event occurred. In this example, theafterfield contains the values of the new row’sid,first_name,last_name, andemailcolumns.

8

source

Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:

  • Debezium version

  • Connector type and name

  • Database (a.k.a keyspace) and table that contains the new row

  • If the event was part of a snapshot

  • Offset of the operation in the database binlog

  • Timestamp for when the change was made in the database

9

op

Mandatory string that describes the type of operation that caused the connector to generate the event. In this example,cindicates that the operation created a row. Valid values are:

  • c= create

  • u= update

  • d= delete

10

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

updateevents

The value of a change event for an update in the samplecustomerstable has the same schema as acreateevent for that table. Likewise, the event value’s payload has the same structure. However, the event value payload contains different values in anupdateevent. Here is an example of a change event value in an event that the connector generates for an update in thecustomerstable:

{ "schema": { ... }, "payload": { "before": {(1)"id": 1, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": {(2)"id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": {(3)“版本”:“1.5.4。最后”、“连接器”:“维塔斯”、“name": "my_sharded_connector", "ts_ms": 1559033904863, "snapshot": null, "db": "Vitess_server", "schema": "commerce", "table": "customers", "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-46\"}]" }, "op": "u",(4)"ts_ms": 1465584025523(5)} }
Table 4. Descriptions ofupdateevent value fields
Item Field name Description

1

before

An optional field that contains all values of all columns that were in the row before the database commit.

2

after

An optional field that specifies the state of the row after the event occurred. In this example, thefirst_namevalue is nowAnne Marie.

3

source

Mandatory field that describes the source metadata for the event. Thesourcefield structure has the same fields as in acreate事件,但有些di值fferent. The source metadata includes:

  • Debezium version

  • Connector type and name

  • Database (a.k.a keyspace) and table that contains the new row

  • If the event was part of a snapshot

  • Offset of the operation in the database log

  • Timestamp for when the change was made in the database

4

op

Mandatory string that describes the type of operation. In anupdateevent value, theopfield value isu, signifying that this row changed because of an update.

5

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

Updating the columns for a row’s primary key changes the value of the row’s key. When a key changes, Debezium outputsthree事件:DELETEevent and atombstone eventwith the old key for the row, followed by an event with the new key for the row. Details are in the next section.

deleteevents

The value in adeletechange event has the sameschemaportion ascreateandupdateevents for the same table. Thepayloadportion in adeleteevent for the samplecustomerstable looks like this:

{ "schema": { ... }, "payload": { "before": {(1)"id": 1, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null,(2)"source": {(3)“版本”:“1.5.4。最后”、“连接器”:“维塔斯”、“name": "my_sharded_connector", "ts_ms": 1559033904863, "snapshot": null, "db": "Vitess_server", "schema": "commerce", "table": "customers", "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-47\"}]" }, "op": "d",(4)"ts_ms": 1465581902461(5)} }
Table 5. Descriptions ofdeleteevent value fields
Item Field name Description

1

before

Optional field that specifies the state of the row before the event occurred. In adeleteevent value, thebeforefield contains the values that were in the row before it was deleted with the database commit.

2

after

Optional field that specifies the state of the row after the event occurred. In adeleteevent value, theafterfield isnull, signifying that the row no longer exists.

3

source

Mandatory field that describes the source metadata for the event. In adeleteevent value, thesourcefield structure is the same as forcreateandupdateevents for the same table. Manysourcefield values are also the same. In adeleteevent value, thets_msandlsnfield values, as well as other values, might have changed. But thesourcefield in adelete事件价值提供了相同的元数据:

  • Debezium version

  • Connector type and name

  • Database (a.k.a keyspace) and table that contains the new row

  • If the event was part of a snapshot

  • Offset of the operation in the database log

  • Timestamp for when the change was made in the database

4

op

Mandatory string that describes the type of operation. Theopfield value isd, signifying that this row was deleted.

5

ts_ms

Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.

In thesourceobject,ts_msindicates the time that the change was made in the database. By comparing the value forpayload.source.ts_mswith the value forpayload.ts_ms, you can determine the lag between the source database update and Debezium.

Adeletechange event record provides a consumer with the information it needs to process the removal of this row.

Vitess connector events are designed to work withKafka log compaction. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.

Tombstone events

When a row is deleted, thedeleteevent value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must benull. To make this possible, the Vitess connector follows adeleteevent with a specialtombstoneevent that has the same key but anullvalue.

Data type mappings

The Vitess connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the Vitess data type of the column. This section describes these mappings.

Basic types

The following table describes how the connector maps basic Vitess data types to aliteral typeand asemantic typein event fields.

  • literal typedescribes how the value is literally represented using Kafka Connect schema types:INT8,INT16,INT32,INT64,FLOAT32,FLOAT64,BOOLEAN,STRING,BYTES,ARRAY,MAP, andSTRUCT.

  • semantic typedescribes how the Kafka Connect schema captures themeaningof the field using the name of the Kafka Connect schema for the field.

Table 6. Mappings for Vitess basic data types
维塔斯的数据类型 Literal type (schema type) Semantic type (schema name) and Notes

BOOLEAN, BOOL

INT16

n/a

BIT(1)

Unsupported yet

n/a

BIT(>1)

Unsupported yet

n/a

TINYINT

INT16

n/a

SMALLINT[(M)]

INT16

n/a

MEDIUMINT[(M)]

INT32

n/a

INT, INTEGER[(M)]

INT32

n/a

BIGINT[(M)]

INT64

n/a

REAL[(M,D)]

FLOAT64

n/a

FLOAT[(M,D)]

FLOAT64

n/a

DOUBLE[(M,D)]

FLOAT64

n/a

CHAR(M)]

STRING

n/a

VARCHAR(M)]

STRING

n/a

BINARY(M)]

STRING

n/a

VARBINARY(M)]

STRING

n/a

TINYBLOB

STRING

n/a

TINYTEXT

STRING

n/a

BLOB

STRING

n/a

TEXT

STRING

n/a

MEDIUMBLOB

STRING

n/a

MEDIUMTEXT

STRING

n/a

LONGBLOB

STRING

n/a

LONGTEXT

STRING

n/a

JSON

STRING

io.debezium.data.Json
Contains the string representation of aJSONdocument, array, or scalar.

ENUM

STRING

io.debezium.data.Enum
Theallowedschema parameter contains the comma-separated list of allowed values.

SET

STRING

io.debezium.data.EnumSet
Theallowedschema parameter contains the comma-separated list of allowed values.

YEAR[(2|4)]

STRING

n/a

TIMESTAMP[(M)]

STRING

n/a
Inyyyy-MM-dd HH:mm:ss.SSSformat with microsecond precision based on UTC. MySQL allowsMto be in the range of0-6.

DATETIME[(M)]

STRING

n/a
Inyyyy-MM-dd HH:mm:ss.SSSformat with microsecond precision. MySQL allowsMto be in the range of0-6.

NUMERIC[(M[,D])]

STRING

n/a

DECIMAL[(M[,D])]

STRING

n/a

GEOMETRY,
LINESTRING,
POLYGON,
MULTIPOINT,
MULTILINESTRING,
MULTIPOLYGON,
GEOMETRYCOLLECTION

Unsupported yet

n/a

Set up

Vitess does not require special setup before you can install and run a Debezium connector.

Setup Vitess

You can follow theLocal Install via Dockerguide, or theVitess Operator for Kubernetesguide to install Vitess. No special setup is needed to support Vitess connector.

Checklist
  • Make sure that the VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed

  • Make sure that the VTCtld host and its gRPC port (default is 15999) is accessible from the machine where the Vitess connector is installed

gRPC authentication

Because Vitess connector reads change events from the VTGate VStream gRPC server, it does not need to connect directly to MySQL instances. Therefore, no special database user and permissions are needed. At the moment, Vitess connector only supports unauthenticated access to the VTGate gRPC server.

Deployment

WithZookeeper,Kafka, andKafka Connectinstalled, the remaining tasks to deploy a Debezium Vitess connector are to download theconnector’s plug-in archive, extract the JAR files into your Kafka Connect environment, and add the directory with the JAR files toKafka Connect’splugin.path. You then need to restart your Kafka Connect process to pick up the new JAR files.

If you are working with immutable containers, seeDebezium’s Container imagesfor Zookeeper, Kafka and Kafka Connect with the Vitess connector already installed and ready to run. You can alsorun Debezium on Kubernetes and OpenShift.

Connector configuration example

Following is an example of the configuration for a Vitess connector that connects to a Vitess (VTGate’s VStream) server on port 15991 at 192.168.99.100, whose logical name isfullfillment. It also connects to a VTCtld server on port 15999 at 192.168.99.101 to get the initial VGTID. Typically, you configure the Debezium Vitess connector in a.jsonfile using the configuration properties available for the connector.

You can choose to produce events for a subset of the schemas and tables. Optionally, ignore, mask, or truncate columns that are sensitive, too large, or not needed.

{ "name": "inventory-connector",(1)"config": { "connector.class": "io.debezium.connector.vitess.VitessConnector",(2)"database.hostname": "192.168.99.100",(3)"database.port": "15991",(4)"database.user": "vitess",(5)"database.password": "vitess_password",(6)"vitess.keyspace": "commerce",(7)"vitess.tablet.type": "MASTER",(8)"vitess.vtctld.host": "192.168.99.101",(9)"vitess.vtctld.port": "15999",(10)"vitess.vtctld.user": "vitess",(11)"vitess.vtctld.password": "vitess_password",(12)“开云体育电动老虎机database.server.name”:“完成了ment",(13)"tasks.max": 1(14)} }
1 The name of the connector when registered with a Kafka Connect service.
2 The name of this Vitess connector class.
3 The address of the Vitess (VTGate’s VStream) server.
4 The port number of the Vitess (VTGate’s VStream) server.
5 The username of the Vitess database server (VTGate gRPC).
6 The password of the Vitess database server (VTGate gRPC).
7 The name of the keyspce (a.k.a database). Because no shard is specified, it reads change events from all shards in the keyspace.
8 The type of MySQL instance (MASTER OR REPLICA) to read change events from.
9 The address of the VTCtld server.
10 The port of the VTCtld server.
11 The username of the VTCtld server (VTCtld gRPC).
12 The password of the VTCtld database server (VTCtld gRPC).
13 The logical name of the Vitess cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
14 Only one task should operate at any one time.

See thecomplete list of Vitess connector propertiesthat can be specified in these configurations.

You can send this configuration with aPOSTcommand to a running Kafka Connect service. The service records the configuration and starts the connector task that connects to the Vitess database and streams change event records to Kafka topics.

Adding connector configuration

To start running a Vitess connector, create a connector configuration and add the configuration to your Kafka Connect cluster.

Prerequisites
  • The VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed

  • The VTCtld host and its gRPC port (default is 15999) is accessible from the machine where the Vitess connector is installed

  • The Vitess connector is installed.

Procedure
  1. Create a configuration for the Vitess connector.

  2. Use theKafka Connect REST APIto add that connector configuration to your Kafka Connect cluster.

Results

当连接器启动时,它开始产生哒ta change events for row-level operations and streaming change event records to Kafka topics.

Monitoring

The Debezium Vitess connector provides only one type of metrics that are in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect provide.

  • Streaming metricsprovide information about connector operation when the connector is capturing changes and streaming change event records.

Debezium monitoring documentationprovides details for how to expose these metrics by using JMX.

Streaming metrics

TheMBeanisdebezium.vitess:type=connector-metrics,context=streaming,server=.

Attributes Type Description

long

The number of milliseconds since the connector has read and processed the most recent event.

long

The total number of events that this connector has seen since last started or reset.

long

The number of events that have been filtered by include/exclude list filtering rules configured on the connector.

int

The length the queue used to pass events between the streamer and the main Kafka Connect loop.

int

The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop.

boolean

Flag that denotes whether the connector is currently connected to the database server.

long

The number of milliseconds between the last change event’s timestamp and the connector processing it. The values will incoporate any differences between the clocks on the machines where the database server and the connector are running.

long

The number of processed transactions that were committed.

long

The maximum buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop.

long

The current buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop.

Connector configuration properties

The Debezium Vitess connector has many configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:

The following configuration properties arerequiredunless a default value is available.

Table 7. Required connector configuration properties
Property Default Description

Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors.

The name of the Java class for the connector. Always use a value ofio.debezium.connector.vitess.VitessConnectorfor the Vitess connector.

1

The maximum number of tasks that should be created for this connector. The Vitess connector always uses a single task and therefore does not use this value, so the default is always acceptable.

IP address or hostname of the Vitess database server (VTGate).

15991

Integer port number of the Vitess database server (VTGate).

The name of the keyspace from which to stream the changes.

n/a

An optional name of the shard from which to stream the changes. If not configured, in case of unsharded keyspace, the connector streams changes from the only shard, in case of sharded keyspace, the connector streams changes from all shards in the keyspace. We recommend not configuring it in order to stream from all shards in the keyspace because it has better support for reshard operation. If configured, for example,-80, the connector will stream changes from the-80shard.

n/a

An optional username of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.

n/a

An optional password of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.

IP address or hostname of the VTCtld server.

15999

Integer port number of the VTCtld server.

n/a

An optional username of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used.

n/a

一个可选的密码VTCtld服务r. If not configured, unauthenticated VTCtld gRPC is used.

MASTER

The type of Tablet (hence MySQL) from which to stream the changes:

MASTERrepresents streaming from the master MySQL instance

REPLICArepresents streaming from the replica slave MySQL instance

RDONLYrepresents streaming from the read-only slave MySQL instance.

Logical name that identifies and provides a namespace for the particular Vitess database server or cluster in which Debezium is capturing changes. Only alphanumeric characters and underscores should be used in the database server logical name. The logical name should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector.

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you want to capture. Any table not included intable.include.listdoes not have its changes captured. Each identifier is of the formkeyspace.tableName. By default, the connector captures changes in every non-system table in each schema whose changes are being captured. Do not also set thetable.exclude.listproperty.

An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes youdo notwant to capture. Any table not included intable.exclude.listhas it changes captured. Each identifier is of the formkeyspace.tableName. Do not also set thetable.include.listproperty.

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the formkeyspace.tableName.columnName. Do not also set thecolumn.exclude.listproperty.

An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event record values. Fully-qualified names for columns are of the formkeyspace.tableName.columnName. Do not also set thecolumn.include.listproperty.

true

Controls whether adeleteevent is followed by a tombstone event.

true- a delete operation is represented by adeleteevent and a subsequent tombstone event.

false- only adeleteevent is emitted.

After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in caselog compactionis enabled for the topic.

empty string

A semicolon separated list of tables with regular expressions that match table column names. The connector maps values in matching columns to key fields in change event records that it sends to Kafka topics. This is useful when a table does not have a primary key, or when you want to order change event records in a Kafka topic according to a field that is not a primary key.

Separate entries with semicolons. Insert a colon between the fully-qualified table name and its regular expression. The format is:

keyspace-name.table-name:_regexp_;…

For example,

keyspaceA.table_a:regex_1;keyspaceA.table_b:regex_2;keyspaceA.table_c:regex_3

Iftable_ahas a anidcolumn, andregex_1is^i(matches any column that starts withi), the connector maps the value intable_a'sidcolumn to a key field in change events that the connector sends to Kafka.

The followingadvancedconfiguration properties have defaults that work in most situations and therefore rarely need to be specified in the connector’s configuration.

Table 8. Advanced connector configuration properties
Property Default Description

fail

Specifies how the connector should react to exceptions during processing of events:

failpropagates the exception, indicates the offset of the problematic event, and causes the connector to stop.

warnlogs the offset of the problematic event, skips that event, and continues processing.

skipskips the problematic event and continues processing.

20240

Positive integer value for the maximum size of the blocking queue. The connector places change events received from streaming replication in the blocking queue before writing them to Kafka. This queue can provide backpressure when, for example, writing records to Kafka is slower that it should be or Kafka is not available.

10240

Positive integer value that specifies the maximum size of each batch of events that the connector processes.

0

朗g value for the maximum size in bytes of the blocking queue. The feature is disabled by default, it will be active if it’s set with a positive long value.

1000

Positive integer value that specifies the number of milliseconds the connector should wait for new change events to appear before it starts processing a batch of events. Defaults to 1000 milliseconds, or 1 second.

trueif connector configuration sets thekey.converterorvalue.converterproperty to the Avro converter.
falseif not.

Indicates whether field names are sanitized to adhere toAvro naming requirements.

Pass-through connector configuration properties

The connector also supportspass-throughconfiguration properties that are used when creating the Kafka producer and consumer.

Be sure to consult theKafka documentationfor all of the configuration properties for Kafka producers and consumers. The Vitess connector does use thenew consumer configuration properties.

Behavior when things go wrong

Debezium is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then Debezium providesexactly oncedelivery of every change event record.

If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In these abnormal situations, Debezium, like Kafka, providesat least oncedelivery of change events.

The rest of this section describes how Debezium handles various kinds of faults and problems.

Configuration and startup errors

In the following situations, the connector fails when trying to start, reports an error/exception in the log, and stops running:

  • The connector’s configuration is invalid.

  • The connector cannot successfully connect to Vitess by using the specified connection parameters.

In these cases, the error message has details about the problem and possibly a suggested workaround. After you correct the configuration or address the Vitess problem, restart the connector.

Vitess becomes unavailable

When the connector is running, the Vitses server (VTGate) that it is connected to could become unavailable for any number of reasons. If this happens, the connector fails with an error and stops. When the server is available again, restart the connector.

The Vitess connector externally stores the last processed offset in the form of a Vitess VGTID. After a connector restarts and connects to a server instance, the connector communicates with the server to continue streaming from that particular offset.

Invalid column name error

This error happens very rarely. If you receive an error with the messageIllegal prefix '@' for column: x, from schema: y, table: z, and your table doesn’t have such a column, it is a Vitess vstreambugthat is caused by column renaming or column type change. It is a transient error. You can restart the connector after a small backoff and it should resolve automatically.

Kafka Connect process stops gracefully

Suppose that Kafka Connect is being run in distributed mode and a Kafka Connect process is stopped gracefully. Prior to shutting down that process, Kafka Connect migrates the process’s connector tasks to another Kafka Connect process in that group. The new connector tasks start processing exactly where the prior tasks stopped. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.

Kafka Connect process crashes

If the Kafka Connector process stops unexpectedly, any connector tasks it was running terminate without recording their most recently processed offsets. When Kafka Connect is being run in distributed mode, Kafka Connect restarts those connector tasks on other processes. However, Vitess connectors resume from the last offset that wasrecordedby the earlier processes. This means that the new replacement tasks might generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.

Because there is a chance that some events might be duplicated during a recovery from failure, consumers should always anticipate some duplicate events. Debezium changes are idempotent, so a sequence of events always results in the same state.

In each change event record, Debezium connectors insert source-specific information about the origin of the event, including the Vitess server’s time of the event, the position in the binlog where the transaction changes were written. Consumers can keep track of this information, especially the VGTID, to determine whether an event is a duplicate.

卡夫卡变得unavailable

As the connector generates change events, the Kafka Connect framework records those events in Kafka by using the Kafka producer API. Periodically, at a frequency that you specify in the Kafka Connect configuration, Kafka Connect records the latest offset that appears in those change events. If the Kafka brokers become unavailable, the Kafka Connect process that is running the connectors repeatedly tries to reconnect to the Kafka brokers. In other words, the connector tasks pause until a connection can be re-established, at which point the connectors resume exactly where they left off.

Connector is stopped for a duration

If the connector is gracefully stopped, the database can continue to be used. Any changes are recorded in the Vitess binlog. When the connector restarts, it resumes streaming changes where it left off. That is, it generates change event records for all database changes that were made while the connector was stopped.

A properly configured Kafka cluster is able to handle massive throughput. Kafka Connect is written according to Kafka best practices, and given enough resources a Kafka Connect connector can also handle very large numbers of database change events. Because of this, after being stopped for a while, when a Debezium connector restarts, it is very likely to catch up with the database changes that were made while it was stopped. How quickly this happens depends on the capabilities and performance of Kafka and the volume of changes being made to the data in Vitess.

Limitations with earlier Vitess versions

Vitess 8.0.0
  • Due to a minor Vitess padding issue (which is fixed in Vitess 9.0.0), decimal values with a precision that is greater than or equal to 13 will cause extra whitespaces in front of the number. E.g. if the column type isdecimal(13,4)in the table definition, the value-1.2300becomes"- 1.2300", and the value1.2300becomes" 1.2300".

  • Does not support theJSONcolumn type.

  • VStream 8.0.0 doesn’t provide additional metadata of permitted values forENUMcolumns. Therefore, the Connector does not support theENUMcolumn type. The index number (1-based) will be emitted instead of the enumeration value. E.g."3"will be emitted as the value instead of"L"if theENUMdefinition isenum('S','M','L').