您正在查看Debezium未发布版本的文档。开云体育官方注册网址
如果您想查看此页面的最新稳定版本,请转在这里

Avro序列化

De开云体育官方注册网址bezium连接器在Kafka Connect框架中工作,通过生成更改事件记录来捕获数据库中的每个行级更改。开云体育电动老虎机对于每个更改事件记录,Debezium连接器完成以下操作:开云体育官方注册网址

  1. 应用已配置的转换。

  2. 将记录键和值序列化为二进制形式Kafka连接转换器

  3. 将记录写入正确的Kafka主题。

您可以为每个单独的Debezium连接器实例指定转换器。开云体育官方注册网址Kafka Connect提供了一个JSON转换器,将记录的键和值序列化为JSON文档。默认行为是JSON转换器包含记录的消息模式,这使得每个记录都非常冗长。的开云体育官方注册网址Debezium教程显示同时包含有效负载和模式时记录的样子。如果希望使用JSON序列化记录,请考虑将以下连接器配置属性设置为

  • key.converter.schemas.enable

  • value.converter.schemas.enable

将这些属性设置为从每个记录中排除详细的架构信息。

或者,您可以通过使用序列化记录键和值Apache Avro。Avro二进制格式紧凑而高效。Avro模式可以确保每条记录都有正确的结构。Avro的模式进化机制使模式能够进化。这对于Debezium连接器是必不可少的,它开云体育官方注册网址动态生成每条记录的模式,以匹配已更改的数据库表的结构。开云体育电动老虎机随着时间的推移,写入相同Kafka主题的更改事件记录可能具有相同模式的不同版本。Avro序列化使得更改事件记录的使用者更容易适应不断变化的记录模式。

要使用Apache Avro序列化,必须部署一个模式注册表来管理Avro消息模式及其版本。可用的选项包括Apicurio API和模式注册表以及Confluent模式注册表。这两种方法在这里都有介绍。

Apicurio API和模式注册表

关于Apicurio API和Schema Registry

Apicurio注册表开源项目提供了几个与Avro一起工作的组件:

  • 可以在Debezium连接器配置中指定的Avro转换器。开云体育官方注册网址这个转换器将Kafka Connect模式映射到Avro模式。然后,转换器使用Avro模式将记录键和值序列化为Avro的紧凑二进制形式。

  • 一个API和模式注册表,跟踪:

    • Kafka主题中使用的Avro模式。

    • Avro转换器发送生成的Avro模式。

    因为Avro模式存储在这个注册表中,所以每条记录只需要包含很少的数据模式标识符。这使得每个记录更小。对于像Kafka这样的I/O绑定系统,这意味着生产者和消费者的总吞吐量更高。

  • Avro并行转换器(序列化器和反序列化器)用于Kafka生产者和消费者。Kafka消费者应用程序可以使用Avro Serdes来反序列化更改事件记录。

要将Apicurio Registry与Debezium一起使用,请开云体育官方注册网址将Apicurio Registry转换器及其依赖项添加到用于运行Debezium连接器的Kafka Connect容器映像中。

Apicurio Registry项目还提供了一个JSON转换器。这个转换器结合了较少冗长消息和人类可读JSON的优点。消息本身不包含模式信息,而只包含模式ID。

要使用Apicurio Registry提供的转换器,您需要提供apicurio.registry.url

Apicurio注册中心部署概述

要部署使用Avro序列化开云体育官方注册网址的Debezium连接器,您必须完成三个主要任务:

  1. 部署一个Apicurio API和模式注册表实例。

  2. 安装Avro转换器安装包到一个插件目录中。如果您正在使用开云体育官方注册网址连接容器映像,则不需要安装该软件包。有关更多信息,请参见使用Debezium容器部署Apicurio Registry开云体育官方注册网址

  3. 配置Debezium连接开云体育官方注册网址器实例以使用Avro序列化,通过设置如下配置属性:

    key.converter = io.apicurio.registry.utils.converter。AvroConverter key.converter.apicurio.registry。url = http://apicurio: 8080 / api /注册/ v2 key.converter.apicurio.registry。auto-register = true key.converter.apicurio.registry。寻找最新= true value.converter = io.apicurio.registry.utils.converter。AvroConverter value.converter.apicurio.registry。url = http://apicurio: 8080 / api /注册/ v2 value.converter.apicurio.registry。auto-register = true value.converter.apicurio.registry。寻找最新= true schema.name.adjustment.mode = avro

在内部,Kafka Connect总是使用JSON键/值转换器来存储配置和偏移量。

使用Debezium容器部署Apicurio Registry开云体育官方注册网址

在您的环境中,您可能希望使用提供的Debezium容器映像来部署使用Avro序列化的Debezium连接器。开云体育官方注册网址按照这里的步骤来做。在这个过程中,您在Debezium Kafka Connect容器镜像上启用Apicurio转换器,并配置Debe开云体育官方注册网址zium连接器使用Avro转换器。

先决条件
  • 您已经安装了Docker,并且拥有创建和管理容器的足够权限。

  • 您下载了希望与Avro序列化一起部署的开云体育官方注册网址Debezium连接器插件。

过程
  1. 部署Apicurio Registry的一个实例。

    下面的例子使用了一个非生产的、内存中的Apicurio注册表实例:

    apicurio/apicurio-registry-mem:2.4.1.Final . docker运行它——rm——name apicurio \ -p 8080:8080
  2. 运行Kafka 开云体育官方注册网址Connect的Debezium容器镜像,通过启用Apicurio via来配置它以提供Avro转换器ENABLE_APICURIO_CONVERTERS = true环境变量:

    docker运行它——rm——name connect \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link apicurio:apicurio \ -e ENABLE_APICURIO_CONVERTERS=true \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.apicurio.registry.utils.converter。AvroConverter \ -e VALUE_CONVERTER=io.apicurio.registry.utils.converter。AvroConverter \ -e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro \ -p 8083:8083 debezium/connect:2.2

融合模式注册表

还有另一种选择模式注册表实现由Confluent提供。

Confluent Schema Registry部署概述

有关安装独立Confluent架构注册表的信息,请参阅Confluent平台部署文档

作为一种选择,你可以安装将独立的Confluent架构注册表作为容器。

使用Debezium容器部署Confluent模式注册表开云体育官方注册网址

从Debezium 2.0.0开云体育官方注册网址开始,Debezium容器中不包括Confluent Schema Registry支持。要为Debezium容器启用Confluent架构注册表,请将以下Confluent Avr开云体育官方注册网址o转换器JAR文件安装到Connect插件目录中:

  • kafka-connect-avro-converter

  • kafka-connect-avro-data

  • kafka-avro-serializer

  • kafka-schema-serializer

  • kafka-schema-registry-client

  • common-config

  • common-utils

您可以从网站下载上述文件Confluent Maven存储库

配置略有不同。

  1. 在Debeziu开云体育官方注册网址m连接器配置中,指定以下属性:

    key.converter = io.confluent.connect.avro。AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
  2. 部署Confluent模式注册表的一个实例:

    -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \ -p 8181:8181 confluentinc/cp-schema-registry . docker运行-rm——name schema-registry \——link zookeeper \ -e schema_registry_connection_url =zookeeper:2181
  3. 运行一个配置为使用Avro的Kafka Connect映像:

    docker运行-it——rm——name connect \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link schema-registry:schema-registry \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io. conflut .connect.avro。AvroConverter \ -e VALUE_CONVERTER=io. fluent.connect. AvroConverter。AvroConverter \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -p 8083:8083 debezium/connect:2.2
  4. 控件中读取新的Avro消息的控制台消费者db.myschema.mytable主题并解码为JSON:

    Docker运行:it——rm——name avro-consumer \——link zookeeper \——link kafka:kafka \——link mysql:mysql \——link schema-registry:schema-registry \ debezium/connect:2.开云体育官方注册网址2 \ /kafka/bin/kafka-console-consumer.sh \——bootstrap-server kafka:9092 \——property print。\——property schema.registry. key=true \——formatter io.confluent.kafka.formatter.AvroMessageFormatter \——url=http://schema-registry:8081 \——topic db.myschema.mytable

命名

正如阿芙罗所说文档,名称必须遵守以下规则:

  • 开始(A-Za-z_)

  • 随后只包含(A-Za-z0-9_)字符

开云体育官方注册网址Debezium使用列的名称作为相应Avro字段的基础。如果列名也不遵守Avro命名规则,这可能会导致序列化过程中的问题。每个Deb开云体育官方注册网址ezium连接器提供一个配置属性,field.name.adjustment.mode你可以设置为avro如果有不遵守Avro名称规则的列。设置field.name.adjustment.modeavro允许序列化不一致的字段,而不必实际修改您的模式。

获取更多信息

这篇文章来自Debeziu开云体育官方注册网址m博客的文章描述了序列化器、转换器和其他组件的概念,并讨论了使用Avro的优点。自那篇文章发表以来,Kafka Connect转换器的一些细节发生了轻微的变化。

有关使用Avro作为Debezium更改数据事件的消息格式的完整示例,请参见开云体育官方注册网址MySQL和Avro消息格式