Avro序列化

一个开云体育官方注册网址Debezium连接器在Kafka Connect框架中工作,通过生成一个更改事件记录来捕获数据库中的每个行级更改。开云体育电动老虎机对于每个更改事件记录,Debezium连接器完成以下操作:开云体育官方注册网址

  1. 应用配置的转换。

  2. 方法将记录键和值序列化为二进制形式Kafka连接转换器

  3. 将记录写入正确的卡夫卡主题。

您可以为每个单独的Debezium连接器实例指定转换器。开云体育官方注册网址Kafka Connect提供了一个JSON转换器,可以将记录键和值序列化到JSON文档中。默认的行为是JSON转换器包含记录的消息模式,这使得每个记录都非常详细。的开云体育官方注册网址Debezium教程显示包括有效负载和模式时记录的样子。如果希望记录用JSON序列化,请考虑将以下连接器配置属性设置为

  • key.converter.schemas.enable

  • value.converter.schemas.enable

将这些属性设置为从每个记录中排除详细模式信息。

或者,您也可以使用序列化记录键和值Apache Avro.Avro二进制格式紧凑高效。Avro模式可以确保每个记录具有正确的结构。Avro的模式进化机制允许模式进化。这对于Debezium连接器来说是必不可少的开云体育官方注册网址,它动态生成每个记录的模式,以匹配所更改的数据库表的结构。开云体育电动老虎机随着时间的推移,写入同一个Kafka主题的更改事件记录可能具有相同模式的不同版本。Avro序列化使变更事件记录的消费者更容易适应变更的记录模式。

要使用Apache Avro序列化,必须部署一个模式注册中心来管理Avro消息模式及其版本。可用的选项包括Apicurio API和模式注册表以及Confluent模式注册表。这两种都在这里描述。

Apicurio API和模式注册表

关于Apicurio API和模式注册表

Apicurio注册表开源项目提供了几个与Avro一起工作的组件:

  • 可以在Debezium连接器配置中指定的Avro转换器。开云体育官方注册网址这个转换器将Kafka Connect模式映射到Avro模式。转换器然后使用Avro模式将记录键和值序列化为Avro的紧凑二进制形式。

  • 一个API和模式注册表,用于跟踪:

    • Kafka主题中使用的Avro模式。

    • Avro转换器发送生成的Avro模式。

    因为Avro模式存储在这个注册中心中,所以每条记录只需要包含一小部分模式标识符.这使得每条记录更小。对于像Kafka这样的I/O绑定系统,这意味着生产者和消费者的总吞吐量更高。

  • Avro并行转换器(序列化器和反序列化器)用于Kafka生产者和消费者。Kafka消费者应用程序可以使用Avro Serdes来反序列化变更事件记录。

要在Debezium中使用Apicurio Registry,请将A开云体育官方注册网址picurio Registry转换器及其依赖项添加到Kafka Connect容器映像中,用于运行Debezium连接器。

Apicurio Registry项目还提供了一个JSON转换器。该转换器将更少详细消息的优点与人类可读的JSON结合在一起。消息本身不包含模式信息,而只包含模式ID。

要使用Apicurio Registry提供的转换器,您需要提供apicurio.registry.url

Apicurio Registry部署概述

要部署使用Avro序列化开云体育官方注册网址的Debezium连接器,必须完成三个主要任务:

  1. 部署一个Apicurio API和模式注册表实例。

  2. 安装Avro转换器安装包到一个插件目录。如果您正在使用开云体育官方注册网址Debezium连接容器映像,则不需要安装软件包。有关更多信息,请参见使用Debezium容器部署Apicurio Registry开云体育官方注册网址

  3. 通过设置配置属性,配置D开云体育官方注册网址ebezium连接器实例使用Avro序列化,如下所示:

    key.converter = io.apicurio.registry.utils.converter。AvroConverter key.converter.apicurio.registry。url = http://apicurio: 8080 / api /注册/ v2 key.converter.apicurio.registry。auto-register = true key.converter.apicurio.registry。寻找最新= true value.converter = io.apicurio.registry.utils.converter。AvroConverter value.converter.apicurio.registry。url = http://apicurio: 8080 / api /注册/ v2 value.converter.apicurio.registry。auto-register = true value.converter.apicurio.registry。寻找最新= true schema.name.adjustment.mode = avro

在内部,Kafka Connect总是使用JSON键/值转换器来存储配置和偏移量。

使用Debezium容器部署Apicurio Registry开云体育官方注册网址

在您的环境中,您可能希望使用提供的Debezium容器映像来部署使用Avro序列化的Debezium连接器。开云体育官方注册网址按照这里的步骤来做。在此过程中,您将在Debezium Kafka Connect容器映像上启用Apicurio转换器,并配置Debe开云体育官方注册网址zium连接器以使用Avro转换器。

先决条件
  • 您已经安装了Docker,并有足够的权限创建和管理容器。

  • 您下载了希望与Avro序列化一起部署的开云体育官方注册网址Debezium连接器插件。

过程
  1. 部署Apicurio Registry的一个实例。

    下面的例子使用了一个非生产的、内存中的Apicurio Registry实例:

    docker运行-it——rm——name apicurio \ -p 8080:8080 apicurio/apicurio-registry-mem:2.4.1 final
  2. 运行Kafka 开云体育官方注册网址Connect的Debezium容器映像,通过启用Apicurio via来配置它以提供Avro转换器ENABLE_APICURIO_CONVERTERS = true环境变量:

    docker run -it——rm——name connect \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link apicurio:apicurio \ -e enable_apicurio:apicurio \ -e ENABLE_APICURIO_CONVERTERS=true \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offset \ -e KEY_CONVERTER=io.apicuri .registry.utils.converter。AvroConverter \ -e VALUE_CONVERTER=io.apicurio.registry.utils.converter。AvroConverter \ -e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro \ -p 8083:8083 debezium/connect:2.1

合并模式注册表

还有另一种选择模式注册表Confluent提供的实现。

合流模式注册中心部署概述

有关安装独立的Confluent架构注册中心的信息,请参见Confluent平台部署文档

作为一种选择,你可以安装将独立的合流模式注册中心作为容器。

使用Debezium容器部署合流模式注册中心开云体育官方注册网址

从Debezium 2.0.0开云体育官方注册网址开始,Debezium容器中不包括对合流模式注册表的支持。要为Debezium容器启用Confluent Schema Registry,请将以下Co开云体育官方注册网址nfluent Avro转换器JAR文件安装到Connect插件目录中:

  • kafka-connect-avro-converter

  • kafka-connect-avro-data

  • kafka-avro-serializer

  • kafka-schema-serializer

  • kafka-schema-registry-client

  • common-config

  • common-utils

以上文件可从Confluent Maven存储库

构型略有不同。

  1. 在Debeziu开云体育官方注册网址m连接器配置中,指定以下属性:

    key.converter = io.confluent.connect.avro。AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
  2. 部署合流架构注册中心的实例:

    docker运行-it——rm——name schema-registry \——link zookeeper \ -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \ -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \ -p 8181:8181 confluentinc/cp-schema-registry
  3. 运行一个Kafka Connect镜像,配置为使用Avro:

    docker run -it——rm——name connect \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link schema-registry:schema-registry \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offset \ -e KEY_CONVERTER=io. conflut .connect.avro。AvroConverter \ -e VALUE_CONVERTER=io. confluence .connect.avro。AvroConverter \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -p 8083:8083 debezium/connect:2.1
  4. 控件中读取新的Avro消息的控制台使用者db.myschema.mytable主题并解码为JSON:

    Docker运行-it——rm——name avro-consumer \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link schema-registry:schema-registry \ debezium/开云体育官方注册网址connect:2.1 \ /kafka/bin/kafka-console-consumer.sh \——bootstrap-server kafka:9092 \——属性打印。io.confluent.kafka.formatter.AvroMessageFormatter \——property schema.registry. key=true \——formatter。url=http://schema-registry:8081 \——topic db.myschema.mytable

命名

如Avro所述文档,名称必须遵循以下规则:

  • 开始(A-Za-z_)

  • 随后只包含(A-Za-z0-9_)字符

开云体育官方注册网址Debezium使用列名作为相应Avro字段的基础。如果列名不遵守Avro命名规则,则在序列化期间可能会导致问题。每个Deb开云体育官方注册网址ezium连接器都提供一个配置属性,sanitize.field.names你可以设置为真正的如果列的名称不遵守Avro规则。设置sanitize.field.names真正的允许序列化不符合规范的字段,而不必实际修改您的模式。

获取更多信息

这篇文章文章描述了序列化器开云体育官方注册网址、转换器和其他组件的概念,并讨论了使用Avro的优点。自从那篇文章写完之后,一些Kafka Connect转换器的细节有了轻微的变化。

有关使用Avro作为Debezium更改数据事件的消息格式的完整示例,请参见开云体育官方注册网址MySQL和Avro消息格式