自定义转换器

这个功能目前正在酝酿状态,即精确的语义,配置选项等可能会改变在未来修订,根据我们得到的反馈。请让我们知道如果你遇到任何问题时使用这个扩展。

数据类型转换

每个字段在Debezium更改事开云体育官方注册网址件记录代表一个源表中的字段或列或数据收集。当连接器发出更改事件记录,卡夫卡,它将每个字段的数据类型转换源卡夫卡连接模式类型。列值同样转换为匹配目标字段的模式类型。对于每一个连接器,一个默认的映射指定连接器如何转换每个数据类型。这些默认映射文档中描述数据类型为每个连接器。

虽然默认映射通常是足够的,对于某些应用程序可能需要应用另一种映射。例如,您可能需要一个自定义映射如果默认映射出口一列使用UNIX纪元以来的毫秒的格式,但是下游应用程序只能使用格式化的字符串的列值。你定制类型映射的数据开发和部署自定义转换器。配置自定义转换器来作用于某种类型的所有列,或者你可以缩小范围,这样他们只适用于一个特定的表列。转换函数截取数据类型转换请求任何列匹配指定的标准,然后执行指定的转换。转换器忽略列不匹配指定的标准。

自定义转换器的Java类实现Debezium服务提供程序接口(SPI)。开云体育官方注册网址您启用和配置自定义转换器通过设置转换器房地产在连接器配置。的转换器属性指定连接器可用的转换器,和可以包括sub-properties进一步修改转换行为。

在你开始一个连接器,连接器配置中启用的转换器是实例化并添加到注册表中。注册中心将每一个转换器与列或字段的过程。每当Debeziu开云体育官方注册网址m处理新更改事件,它调用配置转换器来转换或字段的列的登记。

实现自定义转换器

下面的例子显示了一个Java类,它实现了接口的转换器实现io.开云体育官方注册网址debezium.spi.converter.CustomConverter:

公共接口CustomConverter < S、F扩展ConvertedField > {@FunctionalInterface接口转换器{(1)对象转换(对象输入);}公共接口ConverterRegistration <年代> {(2)无效注册(S fieldSchema转换器转换);(3)}无效配置(属性道具);空白converterFor (F, ConverterRegistration <年代>登记);(4)}
1 一个函数将数据从一种类型转换为另一个。
2 回调注册一个转换器。
3 寄存器给定的模式和转换为当前字段。不应该不止一次调用相同的字段。
4 注册自定义值和模式转换器使用一个特定的领域。

自定义转换器方法

的实现CustomConverter接口必须包括以下方法:

配置()

通过属性中指定连接器配置转换器实例。的配置当连接器运行方法初始化。您可以使用一个转换器与多个连接器和修改其行为根据连接器的属性设置。
配置方法接受以下参数:

道具

通过转换器实例包含属性。每个属性指定的格式转换为一个特定类型的列的值。

converterFor ()

寄存器转换器处理特定的数据源中的列或字段。开云体育官方注册网址Debezium调用converterFor ()提示方法要调用的转换器登记的转换。的converterFor每一列方法运行一次。
该方法接受以下参数:

一个对象,通过元数据的字段或列处理。列元数据可以包括列或字段的名称,表或集合的名称,数据类型,大小,等等。

登记

一个类型的对象io.开云体育官方注册网址debezium.spi.converter.CustomConverter.ConverterRegistration提供目标模式定义和转换的代码列数据。转换器调用登记参数当源列匹配类型转换器应过程。调用注册每一列的方法来定义转换器模式。使用卡夫卡连接模式表示SchemaBuilderAPI。在将来,一个独立的模式定义API将被添加。

开云体育官方注册网址Debezium自定义转换器的例子

下面的示例实现了一个简单的转换器,执行以下操作:

  • 运行配置方法,该配置转换器基于的价值schema.name属性中指定连接器配置。转换器配置特定于每个实例。

  • 运行converterFor方法,该寄存器转换器处理源列中的值的数据类型设置为国际标准图书编号

    • 确定了目标字符串模式基于指定的值schema.name财产。

    • 将ISBN源列中的数据字符串值。

例1。一个简单的自定义转换器
公共静态类IsbnConverter实现CustomConverter < SchemaBuilder RelationalColumn >{私人SchemaBuilder isbnSchema;@Override公共空间配置(属性道具){isbnSchema = SchemaBuilder.string () . name (props.getProperty (schema.name "));}@Override公共空converterFor (RelationalColumn列,ConverterRegistration < SchemaBuilder >注册){如果(isbn“.equals (column.typeName())){登记。注册(isbnSchema x - > x.toString ());}}}

开云体育官方注册网址Debezium和卡夫卡连接API模块依赖关系

一个自定义转换器Java项目编译依赖Debezium API和卡夫卡连接API库模块。开云体育官方注册网址这些编译依赖必须包含在您的项目中pom.xml,如以下示例所示:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-api < / artifactId > <版本> $ {version.debezium} > < /版本(1)< /依赖> <依赖> < groupId >表示。卡夫卡< / groupId > < artifactId > connect-api < / artifactId > <版本> $ {version.kafka} > < /版本(2)< / >的依赖
1 $ {version开云体育官方注册网址.debezium}代表Debezium连接器的版本。开云体育官方注册网址
2 $ {version.kafka}代表了版本的Apache卡夫卡在您的环境中。

配置和使用转换器

自定义转换器作用于特定的源表中的列或列类型来指定如何在源数据类型转换为卡夫卡连接模式类型。使用一个自定义转换器一个连接器,您部署JAR文件转换器与连接器文件,然后配置连接器使用转换器。

部署自定义转换器

先决条件
  • 你有一个自定义转换器的Java程序。

过程
  • 使用一个自定义转换器Debezium连接器,Java项目导出到一个开云体育官方注册网址JAR文件,并将文件复制到目录,包含每个Debezium连接器的JAR文件,你想使用它。

    例如,在一个典型的部署,Debezium连接器文件存储在子目录的卡夫卡连接目录(开云体育官方注册网址卡夫卡/连接),每个连接器JAR在它自己的子目录(卡夫卡/ / debezium开云体育官方注册网址-connector-db2连接,卡夫卡/ / debezium开云体育官方注册网址-connector-mysql连接等等)。与连接器使用转换器,转换器JAR文件添加到连接器的子目录。

使用一个转换器与多个连接器,您必须将转换器在每个连接器JAR文件的副本子目录。

配置一个连接器使用自定义转换器

让一个连接器使用自定义转换器,将属性添加到连接器配置指定转换器名称和类。如果转换器需要更多信息来定制特定的数据类型的格式,您还可以定义其他coniguration选项来提供这些信息。

过程
  • 使一个转换器连接器实例通过添加以下强制性属性连接器配置:

    转换器:< converterSymbolicName >(1)< converterSymbolicName >.type:< fullyQualifiedConverterClassName >(2)
    1 转换器属性是强制性的,列举了一个以逗号分隔的符号名称的转换器实例和连接器一起使用。这个属性的值列在其他属性的名称作为前缀,您指定的转换器。
    2 < converterSymbolicName >.type属性是强制性的,指定实现converter类的名称。例如,早些时候自定义转换器的例子,你会添加以下属性连接器配置:
    转换器:isbn isbn。类型:io.deb开云体育官方注册网址ezium.test.IsbnConverter
  • 把其他属性和自定义转换器,前缀符号名称的属性名的转换器,其次是一个点()。象征性的名称是一个标签,您指定的值转换器财产。例如,为前添加一个属性国际标准图书编号转换器指定schema.name通过对配置方法转换代码,添加以下属性:

    isbn.schema.name: io.开云体育官方注册网址debezium.postgresql.type.Isbn