You are viewing documentation for an outdated version of Debezium.
If you want to view the latest stable version of this page, please gohere

Avro Serialization

A Debezium connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record. For each change event record, the Debezium connector completes the following actions:

  1. Applies configured transformations.

  2. Serializes the record key and value into a binary form by using the configuredKafka Connect converters

  3. Writes the record to the correct Kafka topic.

You can specify converters for each individual Debezium connector instance. Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents. The default behavior is that the JSON converter includes the record’s message schema, which makes each record very verbose. TheDebezium Tutorialshows what the records look like when both payload and schemas are included. If you want records to be serialized with JSON, consider setting the following connector configuration properties tofalse:

  • key.converter.schemas.enable

  • value.converter.schemas.enable

Setting these properties tofalseexcludes the verbose schema information from each record.

Alternatively, you can serialize the record keys and values by usingApache Avro.The Avro binary format is compact and efficient. Avro schemas make it possible to ensure that each record has the correct structure. Avro’s schema evolution mechanism enables schemas to evolve. This is essential for Debezium connectors, which dynamically generate each record’s schema to match the structure of the database table that was changed. Over time, change event records written to the same Kafka topic might have different versions of the same schema. Avro serialization makes it easier for the consumers of change event records to adapt to a changing record schema.

To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. Available options include the Apicurio API and Schema Registry as well as the Confluent Schema Registry. Both are described here.

About the Apicurio API and Schema Registry

TheApicurio Registryopen-source project provides several components that work with Avro:

  • An Avro converter that you can specify in Debezium connector configurations. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the record keys and values into Avro’s compact binary form.

  • An API and schema registry that tracks:

    • Avro schemas that are used in Kafka topics.

    • Where the Avro converter sends the generated Avro schemas.

    Because the Avro schemas are stored in this registry, each record needs to contain only a tinyschema identifier.这使得每个记录更小。对于一个I / Obound system like Kafka, this means more total throughput for producers and consumers.

  • AvroSerdes(serializers and deserializers) for Kafka producers and consumers. Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.

To use the Apicurio Registry with Debezium, add Apicurio Registry converters and their dependencies to the Kafka Connect container image that you are using for running a Debezium connector.

The Apicurio Registry project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.

To use converters provided by Apicurio Registry you need to provideapicurio.registry.url

Deployment overview

To deploy a Debezium connector that uses Avro serialization, you must complete three main tasks:

  1. Deploy anApicurio API and Schema Registryinstance.

  2. Install the Avro converter fromthe installation packageinto a plug-in directory. This is not needed when using theDebezium Connect container image, see details inDeploying with Debezium containers

  3. Configure a Debezium connector instance to use Avro serialization by setting configuration properties as follows:

    key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 key.converter.apicurio.registry.auto-register=true key.converter.apicurio.registry.find-latest=true value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 value.converter.apicurio.registry.auto-register=true value.converter.apicurio.registry.find-latest=true

Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.

Deploying with Debezium containers

In your environment, you might want to use a provided Debezium container image to deploy Debezium connectors that use Avro serialization. Follow the procedure here to do that. In this procedure, you enable Apicurio converters on the Debezium Kafka Connect container image, and configure the Debezium connector to use the Avro converter.

Prerequisites
  • You have Docker installed and sufficient rights to create and manage containers.

  • You downloaded the Debezium connector plug-in(s) that you want to deploy with Avro serialization.

Procedure
  1. Deploy an instance of Apicurio Registry.

    The following example uses a non-production, in-memory, Apicurio Registry instance:

    docker run -it --rm --name apicurio \ -p 8080:8080 apicurio/apicurio-registry-mem:2.1.5.Final
  2. Run the Debezium container image for Kafka Connect, configuring it to provide the Avro converter by enabling Apicurio viaENABLE_APICURIO_CONVERTERS=trueenvironment variable:

    docker run -it --rm --name connect \ --link zookeeper:zookeeper \ --link kafka:kafka \ --link mysql:mysql \ --link apicurio:apicurio \ -e ENABLE_APICURIO_CONVERTERS=true \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -p 8083:8083 debezium/connect:1.9

Confluent Schema Registry

There is an alternativeschema registryimplementation provided by Confluent. The configuration is slightly different.

  1. In your Debezium connector configuration, specify the following properties:

    key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
  2. Deploy an instance of the Confluent Schema Registry:

    docker run -it --rm --name schema-registry \ --link zookeeper \ -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \ -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \ -p 8181:8181 confluentinc/cp-schema-registry
  3. Run a Kafka Connect image configured to use Avro:

    docker run -it --rm --name connect \ --link zookeeper:zookeeper \ --link kafka:kafka \ --link mysql:mysql \ --link schema-registry:schema-registry \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \ -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -p 8083:8083 debezium/connect:1.9
  4. Run a console consumer that reads new Avro messages from thedb.myschema.mytabletopic and decodes to JSON:

    码头工人——运行——rm——名字avro-consumer \——链接zookeeper:zookeeper \ --link kafka:kafka \ --link mysql:mysql \ --link schema-registry:schema-registry \ debezium/connect:1.9 \ /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --property print.key=true \ --formatter io.confluent.kafka.formatter.AvroMessageFormatter \ --property schema.registry.url=http://schema-registry:8081 \ --topic db.myschema.mytable

Naming

As stated in the Avrodocumentation, names must adhere to the following rules:

  • Start with[A-Za-z_]

  • Subsequently contains only[A-Za-z0-9_]characters

Debezium uses the column’s name as the basis for the corresponding Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules. Each Debezium connector provides a configuration property,sanitize.field.namesthat you can set totrue如果哟u have columns that do not adhere to Avro rules for names. Settingsanitize.field.namestotrueallows serialization of non-conformant fields without having to actually modify your schema.

Getting More Information

This postfrom the Debezium blog describes the concepts of serializers, converters, and other components, and discusses the advantages of using Avro. Some Kafka Connect converter details have slightly changed since that post was written.

For a complete example of using Avro as the message format for Debezium change data events, seeMySQL and the Avro message format