Avro序列化
De开云体育官方注册网址bezium连接器在Kafka Connect框架中工作,通过生成更改事件记录来捕获数据库中的每个行级更改。开云体育电动老虎机对于每个更改事件记录,Debezium连接器完成以下操作:开云体育官方注册网址
应用已配置的转换。
将记录键和值序列化为二进制形式Kafka连接转换器。
将记录写入正确的Kafka主题。
您可以为每个单独的Debezium连接器实例指定转换器。开云体育官方注册网址Kafka Connect提供了一个JSON转换器,将记录的键和值序列化为JSON文档。默认行为是JSON转换器包含记录的消息模式,这使得每个记录都非常冗长。的开云体育官方注册网址Debezium教程显示同时包含有效负载和模式时记录的样子。如果希望使用JSON序列化记录,请考虑将以下连接器配置属性设置为假
:
key.converter.schemas.enable
value.converter.schemas.enable
将这些属性设置为假
从每个记录中排除详细的架构信息。
或者,您可以通过使用序列化记录键和值Apache Avro。Avro二进制格式紧凑而高效。Avro模式可以确保每条记录都有正确的结构。Avro的模式进化机制使模式能够进化。这对于Debezium连接器是必不可少的,它开云体育官方注册网址动态生成每条记录的模式,以匹配已更改的数据库表的结构。开云体育电动老虎机随着时间的推移,写入相同Kafka主题的更改事件记录可能具有相同模式的不同版本。Avro序列化使得更改事件记录的使用者更容易适应不断变化的记录模式。
要使用Apache Avro序列化,必须部署一个模式注册表来管理Avro消息模式及其版本。可用的选项包括Apicurio API和模式注册表以及Confluent模式注册表。这两种方法在这里都有介绍。
Apicurio API和模式注册表
关于Apicurio API和Schema Registry
的Apicurio注册表开源项目提供了几个与Avro一起工作的组件:
可以在Debezium连接器配置中指定的Avro转换器。开云体育官方注册网址这个转换器将Kafka Connect模式映射到Avro模式。然后,转换器使用Avro模式将记录键和值序列化为Avro的紧凑二进制形式。
一个API和模式注册表,跟踪:
Kafka主题中使用的Avro模式。
Avro转换器发送生成的Avro模式。
因为Avro模式存储在这个注册表中,所以每条记录只需要包含很少的数据模式标识符。这使得每个记录更小。对于像Kafka这样的I/O绑定系统,这意味着生产者和消费者的总吞吐量更高。
Avro并行转换器(序列化器和反序列化器)用于Kafka生产者和消费者。Kafka消费者应用程序可以使用Avro Serdes来反序列化更改事件记录。
要将Apicurio Registry与Debezium一起使用,请开云体育官方注册网址将Apicurio Registry转换器及其依赖项添加到用于运行Debezium连接器的Kafka Connect容器映像中。
Apicurio Registry项目还提供了一个JSON转换器。这个转换器结合了较少冗长消息和人类可读JSON的优点。消息本身不包含模式信息,而只包含模式ID。 |
要使用Apicurio Registry提供的转换器,您需要提供 |
Apicurio注册中心部署概述
要部署使用Avro序列化开云体育官方注册网址的Debezium连接器,您必须完成三个主要任务:
部署一个Apicurio API和模式注册表实例。
安装Avro转换器安装包到一个插件目录中。如果您正在使用开云体育官方注册网址连接容器映像,则不需要安装该软件包。有关更多信息,请参见使用Debezium容器部署Apicurio Registry开云体育官方注册网址。
配置Debezium连接开云体育官方注册网址器实例以使用Avro序列化,通过设置如下配置属性:
key.converter = io.apicurio.registry.utils.converter。AvroConverter key.converter.apicurio.registry。url = http://apicurio: 8080 / api /注册/ v2 key.converter.apicurio.registry。auto-register = true key.converter.apicurio.registry。寻找最新= true value.converter = io.apicurio.registry.utils.converter。AvroConverter value.converter.apicurio.registry。url = http://apicurio: 8080 / api /注册/ v2 value.converter.apicurio.registry。auto-register = true value.converter.apicurio.registry。寻找最新= true schema.name.adjustment.mode = avro
在内部,Kafka Connect总是使用JSON键/值转换器来存储配置和偏移量。
使用Debezium容器部署Apicurio Registry开云体育官方注册网址
在您的环境中,您可能希望使用提供的Debezium容器映像来部署使用Avro序列化的Debezium连接器。开云体育官方注册网址按照这里的步骤来做。在这个过程中,您在Debezium Kafka Connect容器镜像上启用Apicurio转换器,并配置Debe开云体育官方注册网址zium连接器使用Avro转换器。
您已经安装了Docker,并且拥有创建和管理容器的足够权限。
您下载了希望与Avro序列化一起部署的开云体育官方注册网址Debezium连接器插件。
部署Apicurio Registry的一个实例。
下面的例子使用了一个非生产的、内存中的Apicurio注册表实例:
apicurio/apicurio-registry-mem:2.4.1.Final . docker运行它——rm——name apicurio \ -p 8080:8080
运行Kafka 开云体育官方注册网址Connect的Debezium容器镜像,通过启用Apicurio via来配置它以提供Avro转换器
ENABLE_APICURIO_CONVERTERS = true
环境变量:docker运行它——rm——name connect \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link apicurio:apicurio \ -e ENABLE_APICURIO_CONVERTERS=true \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.apicurio.registry.utils.converter。AvroConverter \ -e VALUE_CONVERTER=io.apicurio.registry.utils.converter。AvroConverter \ -e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro \ -p 8083:8083 debezium/connect:2.2
融合模式注册表
还有另一种选择模式注册表实现由Confluent提供。
Confluent Schema Registry部署概述
有关安装独立Confluent架构注册表的信息,请参阅Confluent平台部署文档。
作为一种选择,你可以安装将独立的Confluent架构注册表作为容器。
使用Debezium容器部署Confluent模式注册表开云体育官方注册网址
从Debezium 2.0.0开云体育官方注册网址开始,Debezium容器中不包括Confluent Schema Registry支持。要为Debezium容器启用Confluent架构注册表,请将以下Confluent Avr开云体育官方注册网址o转换器JAR文件安装到Connect插件目录中:
kafka-connect-avro-converter
kafka-connect-avro-data
kafka-avro-serializer
kafka-schema-serializer
kafka-schema-registry-client
common-config
common-utils
您可以从网站下载上述文件Confluent Maven存储库。
配置略有不同。
在Debeziu开云体育官方注册网址m连接器配置中,指定以下属性:
key.converter = io.confluent.connect.avro。AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
部署Confluent模式注册表的一个实例:
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \ -p 8181:8181 confluentinc/cp-schema-registry . docker运行-rm——name schema-registry \——link zookeeper \ -e schema_registry_connection_url =zookeeper:2181
运行一个配置为使用Avro的Kafka Connect映像:
docker运行-it——rm——name connect \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link schema-registry:schema-registry \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io. conflut .connect.avro。AvroConverter \ -e VALUE_CONVERTER=io. fluent.connect. AvroConverter。AvroConverter \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -p 8083:8083 debezium/connect:2.2
控件中读取新的Avro消息的控制台消费者
db.myschema.mytable
主题并解码为JSON:Docker运行:it——rm——name avro-consumer \——link zookeeper \——link kafka:kafka \——link mysql:mysql \——link schema-registry:schema-registry \ debezium/connect:2.开云体育官方注册网址2 \ /kafka/bin/kafka-console-consumer.sh \——bootstrap-server kafka:9092 \——property print。\——property schema.registry. key=true \——formatter io.confluent.kafka.formatter.AvroMessageFormatter \——url=http://schema-registry:8081 \——topic db.myschema.mytable
命名
正如阿芙罗所说文档,名称必须遵守以下规则:
开始
(A-Za-z_)
随后只包含
(A-Za-z0-9_)
字符
开云体育官方注册网址Debezium使用列的名称作为相应Avro字段的基础。如果列名也不遵守Avro命名规则,这可能会导致序列化过程中的问题。每个Deb开云体育官方注册网址ezium连接器提供一个配置属性,field.name.adjustment.mode
你可以设置为avro
如果有不遵守Avro名称规则的列。设置field.name.adjustment.mode
来avro
允许序列化不一致的字段,而不必实际修改您的模式。
获取更多信息
这篇文章来自Debeziu开云体育官方注册网址m博客的文章描述了序列化器、转换器和其他组件的概念,并讨论了使用Avro的优点。自那篇文章发表以来,Kafka Connect转换器的一些细节发生了轻微的变化。
有关使用Avro作为Debezium更改数据事件的消息格式的完整示例,请参见开云体育官方注册网址MySQL和Avro消息格式。