由Debezium提供的数据库中的更改事件流(用开发人员的话开云体育电动老虎机说)是强类型的。开云体育官方注册网址这意味着事件使用者应该了解事件中传递的数据类型。传递消息类型数据的问题可以通过多种方式解决:
消息结构被带外传递给消费者,消费者能够处理存储在其中的数据
该消息包含元数据模式)内嵌在讯息内
该消息包含对包含相关元数据的注册中心的引用
第一种情况的一个例子是众所周知的Apache KafkaJsonConverter
.它可以在两种模式下操作——带模式和不带模式。当配置为在没有模式的情况下工作时,它会生成一个纯JSON消息,其中消费者要么需要事先知道每个字段的类型,要么需要执行启发式规则来“猜测”并将值映射到数据类型。虽然这种方法相当灵活,但在更高级的情况下可能会失败,例如编码为字符串的时态或其他语义类型。此外,与类型相关的约束通常会丢失。
这里有一个这样的消息的例子:
{"之前":零,"后": {"id":1001,"first_name":"莎莉","last_name":"托马斯。","电子邮件":"sally.thomas@acme.com"},"源": {"版本":"1.1.0.Final","连接器":"mysql","的名字":"dbserver1","ts_ms":0,"快照":"真正的","db":"库存","表格":"客户","server_id":0,"gtid":零,"文件":"mysql-bin.000003","pos":154,"行":0,"线程":零,"查询":零},"人事处":"c","ts_ms":1586331101491,"事务":零}
请注意,除了JSON的基本类型系统之外,没有任何类型信息。例如,消费者不能从事件本身得出结论,它的长度为数字id
字段。
第二种情况的一个例子是JsonConverter
.通过它的方法schemas.enable
选项,JSON消息将由两部分组成-模式
而且有效载荷
.的有效载荷
部分与前面的情况完全相同;的模式
部分包含消息、其字段、字段类型和相关类型约束的描述。这使使用者能够以类型安全的方式处理消息。这种方法的缺点是消息的大小显著增加,因为模式是一个相当大的对象。由于模式往往很少更改(您多久更改一次数据库表列的定义?),将模式添加到每个事件会带来很大的开销。开云体育电动老虎机
下面带有模式的消息示例清楚地表明,模式本身可能比有效负载大得多,而且使用起来并不十分经济:
{"模式": {"类型":"结构体","字段": [{"类型":"结构体","字段": [{"类型":"int32","可选":假,"场":"id"}, {"类型":"字符串","可选":假,"场":"first_name"}, {"类型":"字符串","可选":假,"场":"last_name"}, {"类型":"字符串","可选":假,"场":"电子邮件"}),"可选":真正的,"的名字":"dbserver1.inventory.customers.Value","场":"之前"}, {"类型":"结构体","字段": [{"类型":"int32","可选":假,"场":"id"}, {"类型":"字符串","可选":假,"场":"first_name"}, {"类型":"字符串","可选":假,"场":"last_name"}, {"类型":"字符串","可选":假,"场":"电子邮件"}),"可选":真正的,"的名字":"dbserver1.inventory.customers.Value","场":"后"}, {"类型":"结构体","字段": [{"类型":"字符串","可选":假,"场":"版本"}, {"类型":"字符串","可选":假,"场":"连接器"}, {"类型":"字符串","可选":假,"场":"的名字"}, {"类型":"int64","可选":假,"场":"ts_ms"}, {"类型":"字符串","可选":真正的,"的名字":"io.开云体育官方注册网址debezium.data.Enum","版本":1,"参数": {"允许":"真的,最后,假的"},"默认的":"假","场":"快照"}, {"类型":"字符串","可选":假,"场":"db"}, {"类型":"字符串","可选":真正的,"场":"表格"}, {"类型":"int64","可选":假,"场":"server_id"}, {"类型":"字符串","可选":真正的,"场":"gtid"}, {"类型":"字符串","可选":假,"场":"文件"}, {"类型":"int64","可选":假,"场":"pos"}, {"类型":"int32","可选":假,"场":"行"}, {"类型":"int64","可选":真正的,"场":"线程"}, {"类型":"字符串","可选":真正的,"场":"查询"}),"可选":假,"的名字":"io.开云体育官方注册网址debezium.connector.mysql.Source","场":"源"}, {"类型":"字符串","可选":假,"场":"人事处"}, {"类型":"int64","可选":真正的,"场":"ts_ms"}, {"类型":"结构体","字段": [{"类型":"字符串","可选":假,"场":"id"}, {"类型":"int64","可选":假,"场":"total_order"}, {"类型":"int64","可选":假,"场":"data_collection_order"}),"可选":真正的,"场":"事务"}),"可选":假,"的名字":"dbserver1.inventory.customers.Envelope"},"有效载荷": {"之前":零,"后": {"id":1001,"first_name":"莎莉","last_name":"托马斯。","电子邮件":"sally.thomas@acme.com"},"源": {"版本":"1.1.0.Final","连接器":"mysql","的名字":"dbserver1","ts_ms":0,"快照":"真正的","db":"库存","表格":"客户","server_id":0,"gtid":零,"文件":"mysql-bin.000003","pos":154,"行":0,"线程":零,"查询":零},"人事处":"c","ts_ms":1586331101491,"事务":零}}
注册表
然后是第三种方法,它结合了前两种方法的优点,同时以引入一个新组件(注册表)为代价消除了它们的缺点,该组件用于存储和版本消息模式。
有多种模式注册中心实现可用;下面我们将重点讨论Apicurio注册表,这是一个开源(Apache许可证2.0)API和模式注册表。该项目不仅提供了注册表本身,还提供了客户端库,并以序列化器和转换器的形式与Apache Kafka和Kafka Connect紧密集成。
Apicurio允许Debeziu开云体育官方注册网址m和消费者交换模式存储在注册中心中的消息,并且仅在消息本身中传递对模式的引用。随着捕获的源表结构和消息模式的演变,注册中心也会创建模式的新版本,因此不仅可以使用当前的模式,还可以使用历史的模式。
Apicurio提供多种开箱即用的序列化格式:
支持外部化模式的JSON
每个序列化器和反序列化器都知道如何自动与Apicurio API交互,因此作为实现细节,消费者与它是隔离的。唯一需要的信息是注册中心的位置。
Apicurio还为来自IBM和Confluent的模式注册中心提供API兼容层。这是一个非常有用的功能,因为它允许使用第三方工具,如kafkacat,即使他们不知道Apicurio的原生API。
JSON转换器
在Debezi开云体育官方注册网址um示例存储库中,有一个码头工人组成它将Apicurio注册表与标准Debezium教程示例设置并排部署。开云体育官方注册网址
图1。部署拓扑
要遵循这个示例,您需要克隆Debezium开云体育官方注册网址示例库.
自Debez开云体育官方注册网址ium 1.2以来开云体育官方注册网址Debezium容器图像与Apicurio转换器支持发货。 类型可以启用Apicurio转换器 |
$ cd tutorial $ export DEBZIUM_VERSION=1.1 #启动部署$ docker-compose -f docker-compose-mysql-apicuriocurl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://localhost:8083/connectors/ -d @register-mysql-apicurio-converter-json。$ docker run——rm——tty \——network tutorial_default debezium/ toolbash \ -c 'kafkacat -b kafka:9开云体育官方注册网址092 -c -o beginning -q -t dbserver1.inventory. json #读取第一条消息的内容。客户-c 1 | jq。”
生成的消息应该如下所示:
{"schemaId":48,"有效载荷": {"之前":零,"后": {"id":1001,"first_name":"莎莉","last_name":"托马斯。","电子邮件":"sally.thomas@acme.com"},"源": {"版本":"1.1.0.Final","连接器":"mysql","的名字":"dbserver1","ts_ms":0,"快照":"真正的","db":"库存","表格":"客户","server_id":0,"gtid":零,"文件":"mysql-bin.000003","pos":154,"行":0,"线程":零,"查询":零},"人事处":"c","ts_ms":1586334283147,"事务":零}}
JSON消息包含完整的有效负载,同时包含对带id的模式的引用48
.可以从注册表中查询模式id
或者使用由Debezium文档定义的模式符号名。开云体育官方注册网址在本例中,这两个命令都有
$ docker run——rm——tty \——network tutorial_default \ deb开云体育官方注册网址ezium/工装bash -c 'http http://apicurio:8080/ids/64 | jq .'
得到相同的模式描述:
{"类型":"结构体","字段": [{"类型":"结构体","字段": [{"类型":"int32","可选":假,"场":"id"}, {"类型":"字符串","可选":假,"场":"first_name"}, {"类型":"字符串","可选":假,"场":"last_name"}, {"类型":"字符串","可选":假,"场":"电子邮件"}),"可选":真正的,"的名字":"dbserver1.inventory.customers.Value","场":"之前"},...),"可选":假,"的名字":"dbserver1.inventory.customers.Envelope"}
这与我们之前在“带模式的JSON”示例中看到的相同。
连接器注册请求与前一个请求有几行不同:
..."key.converter":"io.apicurio.registry.utils.converter.ExtJsonConverter",(1)"key.converter.apicurio.registry.url":"http://apicurio:8080",(2)"key.converter.apicurio.registry.global-id":"io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",(3)"value.converter":"io.apicurio.registry.utils.converter.ExtJsonConverter",(1)"value.converter.apicurio.registry.url":"http://apicurio:8080",(2)"value.converter.apicurio.registry.global-id":"io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy"(3)...
1 | Apicurio JSON转换器同时用作键和值转换器 |
2 | Apicurio注册表端点 |
3. | 该设置确保可以自动注册模式id,这是Debezium部署中的典型设置开云体育官方注册网址 |
Avro转换器
到目前为止,我们只演示了将消息序列化为JSON格式。虽然在注册表中使用JSON格式有很多优点,比如易于人类阅读,但它仍然不是很节省空间。
要真正只传输数据而没有任何重大开销,使用二进制格式序列化(如Avro格式)是很有用的。在本例中,我们将只打包数据,而不打包任何字段名和其他仪式,并且消息将再次包含对存储在注册中心中的模式的引用。
让我们看看Apicurio的Avro转换器可以多么容易地使用Avro序列化。
#删除之前的部署$ docker-compose -f docker-compose-mysql-apicurio启动部署$ docker-compose -f docker-compose-mysql-apicurio。启动连接器curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://localhost:8083/connectors/ \ -d @register-mysql-apicurio-converter-avro.json
我们可以使用模式名查询注册表:
$ docker run——rm——tty \——network tutorial_default \ deb开云体育官方注册网址ezium/tool \ bash -c 'http http://apicurio:8080/artifacts/dbserver1.inventory.customers-value | jq .'
生成的模式描述与前面的略有不同,因为它具有Avro风格:
{"类型":"记录","的名字":"信封","名称空间":"dbserver1.inventory.customers","字段": [{"的名字":"之前","类型":【"零", {"类型":"记录","的名字":"价值","字段": [{"的名字":"id","类型":"int"}, {"的名字":"first_name","类型":"字符串"}, {"的名字":"last_name","类型":"字符串"}, {"的名字":"电子邮件","类型":"字符串"}),"connect.name":"dbserver1.inventory.customers.Value"}),"默认的":零}, {"的名字":"后","类型":【"零","价值"),"默认的":零},...),"connect.name":"dbserver1.inventory.customers.Envelope"}
连接器注册请求在几行中也与标准请求不同:
..."key.converter":"io.apicurio.registry.utils.converter.AvroConverter",(1)"key.converter.apicurio.registry.url":"http://apicurio:8080",(2)"key.converter.apicurio.registry.converter.serializer":"io.apicurio.registry.utils.serde.AvroKafkaSerializer",(3)"key.converter.apicurio.registry.converter.deserializer":"io.apicurio.registry.utils.serde.AvroKafkaDeserializer",(3)"key.converter.apicurio.registry.global-id":"io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",(4)"value.converter":"io.apicurio.registry.utils.converter.AvroConverter",(1)"value.converter.apicurio.registry.url":"http://apicurio:8080",(2)"value.converter.apicurio.registry.converter.serializer":"io.apicurio.registry.utils.serde.AvroKafkaSerializer",(3)"value.converter.apicurio.registry.converter.deserializer":"io.apicurio.registry.utils.serde.AvroKafkaDeserializer",(3)"value.converter.apicurio.registry.global-id":"io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",(4)...
1 | Apicurio Avro转换器同时用作键和值转换器 |
2 | Apicurio注册表端点 |
3. | 规定转换器应使用哪种序列化器和反序列化器 |
4 | 该设置确保可以自动注册模式id,这是Debezium部署中的典型设置开云体育官方注册网址 |
为了演示接收端消息的使用,我们可以使用Kafka连接Elasticsearch连接器.接收器配置将再次使用转换器配置进行扩展,接收器连接器可以使用启用了avro的主题,而不需要任何其他更改。
{"的名字":"elastic-sink","配置": {"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","主题":"客户","connection.url":"http://elastic:9200","转换":"打开、关键","transforms.unwrap.type":"io.开云体育官方注册网址debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"假","transforms.key.type":"org.apache.kafka.connect.transforms.ExtractField美元关键","transforms.key.field":"id","key.ignore":"假","type.name":"客户","behavior.on.null.values":"删除","key.converter":"io.apicurio.registry.utils.converter.AvroConverter","key.converter.apicurio.registry.url":"http://apicurio:8080","key.converter.apicurio.registry.converter.serializer":"io.apicurio.registry.utils.serde.AvroKafkaSerializer","key.converter.apicurio.registry.converter.deserializer":"io.apicurio.registry.utils.serde.AvroKafkaDeserializer","key.converter.apicurio.registry.global-id":"io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy","value.converter":"io.apicurio.registry.utils.converter.AvroConverter","value.converter.apicurio.registry.url":"http://apicurio:8080","value.converter.apicurio.registry.converter.serializer":"io.apicurio.registry.utils.serde.AvroKafkaSerializer","value.converter.apicurio.registry.converter.deserializer":"io.apicurio.registry.utils.serde.AvroKafkaDeserializer","value.converter.apicurio.registry.global-id":"io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",}}
结论
在本文中,我们讨论了消息/模式关联的多种方法。Apicurio注册表是作为模式存储和版本控制的解决方案提出的,我们已经演示了Apicurio如何与Debezium连接器集成,以有效地将带有模式的消息传递给消费者。开云体育官方注册网址
中可以找到一个将Debezium连接器与Apicurio注册表一起使用的完整示例开云体育官方注册网址教程在GitHub上的Debezi开云体育官方注册网址um示例库的项目。
关于Debe开云体育官方注册网址zium
开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源下Apache许可证,版本2.0.
参与
我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium,在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题.