Avro序列化
开云体育官方注册网址Debezium连接器与Kafka Connect框架一起使用,以捕获数据库中的更改并生成更改事件。开云体育电动老虎机Kafka Connect工作者然后将连接器配置的转换应用到连接器生成的每个消息上,使用工作者的将每个消息键和值序列化为二进制形式转换器,最后将每个消息写入正确的Kafka主题。
转换器在Kafka Connect工作配置中指定,并且相同的转换器用于部署到该工作集群的所有连接器。Kafka Connect附带一个JSON转换器将消息键和值序列化到JSON文档中。JSON转换器可以配置为使用(key.converter.schemas.enable
而且value.converter.schemas.enable
)的属性。我们的教程显示包括有效负载和模式时消息的样子,但模式使消息非常冗长。如果希望使用JSON序列化消息,可以考虑将这些属性设置为假
排除详细模式信息。
另一种选择是使用序列化消息键和值Apache Avro.Avro二进制格式非常紧凑和高效,Avro模式可以确保消息具有正确的结构。Avro的模式演化机制使模式随时间演化成为可能,这对于动态生成消息模式以匹配数据库表结构的Debezium连接器至关重要。开云体育官方注册网址开云体育电动老虎机随着时间的推移,由Debezium连接器捕获并由Kafka Connect写入主题开云体育官方注册网址的更改事件可能具有相同模式的不同版本,而Avro序列化使消费者更容易适应不断变化的模式。
汇合的提供几个组件与Avro合作:
一个Avro转换器,可以在Kafka Connect worker中使用,将Kafka Connect模式映射到Avro模式,然后使用这些Avro模式将消息键和值序列化为非常紧凑的Avro二进制形式。
一个模式注册表,用于跟踪Kafka主题中使用的所有Avro模式,以及Avro转换器将生成的Avro模式发送到哪里。由于Avro模式存储在这个注册中心中,所以每条消息只需要包含一小部分模式标识符.这使得每条消息更小,对于像Kafka这样的I/O绑定系统,这意味着生产者和消费者的总吞吐量更高。
Avro并行转换器(序列化器和反序列化器)用于Kafka生产者和消费者。您编写的任何Kafka消费者应用程序都可以使用Avro serdes来反序列化更改事件。
这些Confluent组件是开源的,你可以将它们安装到任何Kafka发行版中,并与Kafka Connect一起使用。但是,Confluent还提供了一个Confluent开源平台它包括标准Kafka发行版以及这些和其他Confluent开源组件,包括几个源和接收器连接器。一些Kafka Connect的Docker映像也包含Avro转换器。这包括最近的开云体育官方注册网址Debezium Docker图像其中包括Debezium连接器、K开云体育官方注册网址afka Connect和Avro转换器。
技术信息
一个想要使用Avro序列化的系统需要完成两个步骤:
部署一个模式注册表实例
使用这些属性配置Apache Connect实例
key.converter = io.confluent.connect.avro。AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081
注意:除了设置键/值转换器,它是强烈推荐将内部键/值转换器设置为使用JSON转换器,以便更容易分析存储的配置和偏移量。如果你仍然喜欢使用Avro转换器,现在是不可能的,由于一个已知问题.
internal.key.converter = org.apache.kafka.connect.json。JsonConverter internal.value.converter = org.apache.kafka.connect.json.JsonConverter
开云体育官方注册网址Debezium Docker Images
看到MySQL和Avro消息格式教程示例快速入门MySQL。
部署架构注册表实例:
docker运行-it——rm——name schema-registry \——link zookeeper \ -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \ -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \ -p 8181:8181 confluentinc/cp-schema-registry
运行一个Kafka Connect镜像,配置为使用Avro:
docker run -it——rm——name connect \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link schema-registry:schema-registry \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offset \ -e KEY_CONVERTER=io. conflut .connect.avro。AvroConverter \ -e VALUE_CONVERTER=io. confluence .connect.avro。AvroConverter \ -e INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -p 8083:8083 debezium/connect:1.0
控件中读取新的Avro消息的控制台使用者db.myschema.mytable
主题并解码为JSON:
Docker运行-it——rm——name avro-consumer \——link zookeeper:zookeeper \——link kafka:kafka \——link mysql:mysql \——link schema-registry:schema-registry \ debezium/开云体育官方注册网址connect:1.0 \ /kafka/bin/kafka-console-consumer.sh \——bootstrap-server kafka:9092 \——属性打印。io.confluent.kafka.formatter.AvroMessageFormatter \——property schema.registry. key=true \——formatter。url=http://schema-registry:8081 \——topic db.myschema.mytable
命名
如Avro所述文档,名称必须遵循以下规则:
开始
(A-Za-z_)
随后只包含
(A-Za-z0-9_)
字符
开云体育官方注册网址Debezium使用列名作为Avro字段的基础。如果列名不遵守上面的Avro命名规则,则在序列化期间可能会出现问题。开云体育官方注册网址Debezium提供了一个配置选项,sanitize.field.names
可以设置为真正的
如果您有不符合上述规则集的列,则允许序列化这些字段,而不必实际修改您的模式。