自定义转换器

该功能目前处于孵化状态,即准确的语义,配置选项等可能会根据我们收到的反馈在未来的版本中改变。请让我们知道,如果你遇到任何问题,而使用这个扩展。

数据类型转换

Debezium更改事件记录中的开云体育官方注册网址每个字段表示源表或数据集合中的一个字段或列。当连接器向Kafka发出更改事件记录时,它将源文件中每个字段的数据类型转换为Kafka Connect模式类型。列值同样被转换为匹配目标字段的模式类型。对于每个连接器,默认映射指定连接器如何转换每种数据类型。这些默认映射在每个连接器的数据类型文档中进行了描述。

虽然默认映射通常就足够了,但对于某些应用程序,您可能希望应用替代映射。例如,如果默认映射使用自UNIX纪元以来的毫秒格式导出列,那么您可能需要一个自定义映射,但是您的下游应用程序只能将列值作为格式化的字符串使用。您可以通过开发和部署自定义转换器来定制数据类型映射。您可以配置自定义转换器,使其作用于某种类型的所有列,或者您可以缩小它们的作用域,使它们只应用于特定的表列。转换器函数拦截匹配指定条件的任何列的数据类型转换请求,然后执行指定的转换。转换器忽略不匹配指定条件的列。

自定义转换器是实现Debezium服务提供程序接口(SPI)的Java类。开云体育官方注册网址属性启用和配置自定义转换器转换器属性。的转换器属性指定连接器可用的转换器,并且可以包括进一步修改转换行为的子属性。

启动连接器后,在连接器配置中启用的转换器将被实例化并添加到注册中心。注册中心将每个转换器与要处理的列或字段关联起来。每当Debeziu开云体育官方注册网址m处理一个新的更改事件时,它都会调用已配置的转换器来转换它所注册的列或字段。

实现自定义转换器

下面的示例展示了实现该接口的Java类的转换器实现io.开云体育官方注册网址debezium.spi.converter.CustomConverter

公共接口CustomConverter {@FunctionalInterface接口转换器{(1)对象转换(对象输入);ConverterRegistration {(2)void寄存器(S fieldSchema, Converter);(3)} void配置(属性道具);void converterFor(F字段,ConverterRegistration注册);(4)
1 将数据从一种类型转换为另一种类型的函数。
2 用于注册转换器的回调。
3. 为当前字段注册给定的模式和转换器。对于同一个字段不应该调用多次。
4 注册自定义值和模式转换器,以便与特定字段一起使用。

自定义转换器方法

的实现CustomConverter接口必须包含以下方法:

配置()

将连接器配置中指定的属性传递给转换器实例。的配置方法在初始化连接器时运行。您可以使用具有多个连接器的转换器,并根据连接器的属性设置修改其行为。
配置方法接受以下参数:

道具

包含要传递给转换器实例的属性。每个属性指定转换特定类型列的值的格式。

converterFor ()

注册转换器以处理数据源中的特定列或字段。开云体育官方注册网址Debezium调用converterFor ()方法来提示转换器调用登记为了转换。的converterFor方法为每个列运行一次。
该方法接受以下参数:

传递关于所处理的字段或列的元数据的对象。列元数据可以包括列或字段的名称、表或集合的名称、数据类型、大小等等。

登记

类型的对象io.开云体育官方注册网址debezium.spi.converter.CustomConverter.ConverterRegistration它提供了目标模式定义和转换列数据的代码。转换器调用登记当源列与转换器应处理的类型匹配时,则指定。调用注册方法为模式中的每一列定义转换器。模式是使用Kafka Connect来表示的SchemaBuilderAPI。将来,将添加一个独立的模式定义API。

开云体育官方注册网址Debezium自定义转换器示例

下面的例子实现了一个简单的转换器,它执行以下操作:

  • 运行配置属性的值配置转换器schema.name属性在连接器配置中指定的。转换器配置特定于每个实例。

  • 运行converterFor方法,该方法将转换器注册为处理数据类型设置为的源列中的值国际标准图书编号

    • 确定目标字符串属性指定的值schema.name财产。

    • 将源列中的ISBN数据转换为字符串值。

例1。一个简单的自定义转换器
公共静态类IsbnConverter实现CustomConverter{私有SchemaBuilder isbnSchema;@覆盖公共无效配置(属性道具){isbnSchema = SchemaBuilder.string().name(props. getproperty ("schema.name"));} @覆盖公共无效converterFor(RelationalColumn列,ConverterRegistration注册){if ("isbn".equals(column. typename())){注册。寄存器(isbnSchema, x -> x.toString());}}}

开云体育官方注册网址Debezium和Kafka连接API模块依赖关系

自定义转换器Java项目编译依赖于Debezium API和Kafka Connect API库模块。开云体育官方注册网址这些编译依赖项必须包含在项目的编译依赖项中pom.xml,示例如下:

 io.开云体育官方注册网址debezium debezium-api ${version.debezium} .debezium(1)  org.apache。kafka connect-api ${version.kafka}(2)< / >的依赖
1 $ {version开云体育官方注册网址.debezium}表示Debezium连接器的版本。开云体育官方注册网址
2 $ {version.kafka}表示Apache Kafka在您的环境中的版本。

配置和使用转换器

自定义转换器作用于源表中的特定列或列类型,以指定如何将源表中的数据类型转换为Kafka Connect模式类型。要使用带有连接器的自定义转换器,您需要将转换器JAR文件部署在连接器文件旁边,然后配置连接器以使用转换器。

部署自定义转换器

先决条件
  • 您有一个自定义转换器Java程序。

过程
  • 要使用带有Debezium连接器的自定义转换器,请将Java项目导开云体育官方注册网址出到JAR文件,并将该文件复制到包含您想要使用它的每个Debezium连接器的JAR文件的目录中。

    例如,在典型部署中,Debezium连接器文件存储在Kafka Connect目录的开云体育官方注册网址子目录中(卡夫卡/连接),每个连接器JAR都在它自己的子目录(卡夫卡/ / debezium开云体育官方注册网址-connector-db2连接卡夫卡/ / debezium开云体育官方注册网址-connector-mysql连接,等等)。要使用带有连接器的转换器,请将转换器JAR文件添加到连接器的子目录中。

要使用带有多个连接器的转换器,必须在每个连接器子目录中放置转换器JAR文件的副本。

配置连接器以使用自定义转换器

要使连接器能够使用自定义转换器,可以向连接器配置中添加指定转换器名称和类的属性。如果转换器需要进一步的信息来定制特定数据类型的格式,您还可以定义其他配置选项来提供该信息。

过程
  • 通过向连接器配置中添加以下强制属性,为连接器实例启用转换器:

    转换器:< converterSymbolicName >(1)< converterSymbolicName >.type:< fullyQualifiedConverterClassName >(2)
    1 转换器属性是必选的,它枚举要与连接器一起使用的转换器实例的符号名称的逗号分隔列表。为此属性列出的值用作您为转换器指定的其他属性名称的前缀。
    2 < converterSymbolicName >.type属性为必填项,并指定实现转换器的类的名称。例如,之前的自定义转换器示例,您将添加以下属性到连接器配置:
    转换器:isbn isbn。类型:io.deb开云体育官方注册网址ezium.test.IsbnConverter
  • 若要将其他属性与自定义转换器关联起来,请在属性名称前加上转换器的符号名称,后面再加一个点().类的值所指定的符号名称是一个标签转换器财产。例如,为前面的添加一个属性国际标准图书编号转换器来指定schema.name传递给配置方法,在转换器代码中添加以下属性:

    isbn.schema.name: io.开云体育官方注册网址debezium.postgresql.type.Isbn