自定义转换器
数据类型转换
Debe开云体育官方注册网址zium连接器将数据库列类型开云体育电动老虎机映射到相应的Kafka Connect模式类型,并相应地转换列值。每个连接器都记录了特定的列类型映射,并在大多数情况下表示合理的默认行为。由于下游系统需求,应用程序仍然可能需要对特定列类型或特定列进行特定处理。例如,您可能希望将时态列值导出为格式化的字符串,而不是从epoch开始的毫秒。
为此,Debezium提供了一个扩开云体育官方注册网址展点,允许用户根据自己的业务需求注入自己的转换器。转换器是用Java编写的,通过连接器属性启用和配置。
在连接器启动期间,所有配置的转换器都被实例化并放置在注册表中。在构建连接器-内部模式表示时,将为每个表/集合的每个列/字段调用每个转换器,并且它可以注册自己来负责给定列或字段的转换。
每当Debezium处理一个新的更改时,就会调用转换器对已注册的列或字段执行开云体育官方注册网址实际的转换。
实现转换器
转换器实现是实现接口的Java类io.开云体育官方注册网址debezium.spi.converter.CustomConverter
:
公共接口CustomConverter {@FunctionalInterface接口转换器{对象转换(对象输入);}公共接口ConverterRegistration{无效寄存器(S fieldSchema, Converter Converter);} void配置(属性道具);void converterFor(F字段,ConverterRegistration注册);}
该方法配置()
用于在转换器实例化后将转换器配置选项传递给转换器,因此它可以为每个特定实例修改其运行时行为。该方法converterFor ()
是由Debezium调用和转开云体育官方注册网址换器需要调用登记
如对该转换承担责任。注册提供了目标模式定义和实际转换代码。模式目前是使用Kafka Connect来表示的SchemaBuilder
API。将来,将添加一个独立的模式定义API。类传递关于列或字段的元数据场
参数。它们包含表或集合名称、列或字段名称、类型名称等信息。
下面的例子实现了一个简单的转换器:
接受一个名为
schema.name
为类型的每一列注册自己
国际标准图书编号
与目标
字符串
类型命名的模式schema.name
参数将ISBN数据转换为的转换代码
字符串
公共静态类IsbnConverter实现CustomConverter{私有SchemaBuilder isbnSchema;@覆盖公共无效配置(属性道具){isbnSchema = SchemaBuilder.string().name(props. getproperty ("schema.name"));} @覆盖公共无效converterFor(RelationalColumn列,ConverterRegistration注册){if ("isbn".equals(column. typename())){注册。寄存器(isbnSchema, x -> x.toString());}}}
类的依赖项,以编译代码开云体育官方注册网址debezium-api
而且connect-api
模块:
io.开云体育官方注册网址debezium debezium-api ${version.debezium} org.apache。kafka connect-api ${版本。卡夫卡}< /版本> < / >的依赖
在哪里$ {version开云体育官方注册网址.debezium}
而且$ {version.kafka}
分别是Debezium和Apache K开云体育官方注册网址afka的版本。
配置和使用转换器
转换器开发完成后,必须将其与Debezium连接器JAR并排部署在JAR文件中。开云体育官方注册网址要为给定的连接器实例启用转换器,必须提供如下连接器选项:
isbn isbn.type=io.debezium.te开云体育官方注册网址st.IsbnConverter isbn.schema.name=io.debezium.postgresql.type.Isbn
的选项转换器
是必选项,并枚举要使用的转换器实例的逗号分隔的符号名称。符号名称用作进一步配置选项的前缀。
isbn.type
(一般<前缀> .type
)是强制的,是实现转换器的类的名称。
isbn.schema.name
是传递给转换器的转换器参数吗配置
方法schema.name
.