自定义Kafka连接自动主题创建

Kafka提供了两种自动创建主题的机制。您可以为Kafka代理启用自动创建主题,并且从Kafka 2.6.0开始,您还可以启用Kafka Connect来创建主题。Kafka代理使用auto.create.topics.enable属性控制自动创建主题。在Kafka Connect中topic.creation.enable属性指定Kafka Connect是否允许创建主题。在这两种情况下,属性的默认设置都允许自动创建主题。

当启用自动主题创建时,如果Debezium源连接器为一个不存在目标主题的表发出更改事件记录,则开云体育官方注册网址当事件记录被摄取到Kafka时,主题将在运行时创建。

在代理和Kafka Connect中自动创建主题的区别

代理创建的主题仅限于共享一个默认配置。代理不能将唯一配置应用于不同的主题或主题集。相比之下,Kafka Connect可以在创建主题、设置复制因子、分区数量和其他在Debezium连接器配置中指定的主题特定设置时应用任何配置。开云体育官方注册网址连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组关联。

代理配置和Kafka Connect配置是相互独立的。Kafka Connect可以创建主题,不管你是否在代理上禁用了主题创建。如果你在代理和Kafka Connect中都启用自动创建主题,那么连接配置优先,只有当Kafka Connect配置中的设置都不适用时,代理才会创建主题。

禁用Kafka代理的自动主题创建

默认情况下,Kafka代理配置允许代理在运行时创建主题(如果主题不存在)。代理创建的主题不能配置自定义属性。如果你使用的Kafka版本低于2.6.0,并且你想要创建带有特定配置的主题,你必须在代理上禁用自动创建主题,然后显式地创建主题,可以手动创建,也可以通过自定义部署过程创建。

过程
  • 在代理配置中,设置值auto.create.topics.enable

设置Kafka Connect

Kafka Connect中的自动主题创建由topic.creation.enable财产。该属性的默认值为真正的,启用自动创建主题,示例如下:

topic. create .enable = true

的设置topic.creation.enable属性应用于Connect集群中的所有worker。

Kafka Connect自动创建主题需要你定义Kafka Connect在创建主题时应用的配置属性。通过定义主题组,然后指定要应用于每个组的属性,可以在Debezium连接器配置中指定主题配置属性。开云体育官方注册网址连接器配置定义了一个默认主题创建组,以及一个或多个自定义主题创建组(可选)。自定义主题创建组使用主题名称模式列表来指定组设置应用于的主题。

有关Kafka Connect如何将主题匹配到主题创建组的详细信息,请参见主题创建组.有关如何将配置属性分配给组的详细信息,请参见主题创建组配置属性

默认情况下,Kafka Connect创建的主题是基于模式命名的server.schema.table,例如,dbserver.myschema.inventory

如果你不想让Kafka Connect自动创建主题,设置的值topic.creation.enable在Kafka连接配置(connect-distributed.properties文件或通过环境变量CONNECT_TOPIC_CREATION_ENABLE当使用开云体育官方注册网址Debezium的Kafka Connect容器映像).

Kafka连接自动主题创建需要replication.factor而且分区属性默认的主题创建组。对于组来说,从Kafka代理的默认值中获取所需属性的值是有效的。

配置

对于Kafka Connect自动创建主题,它需要源连接器在创建主题时应用的配置属性的信息。在每个Debezium连接器的配置中定义控制主题创建的属性。开云体育官方注册网址当Kafka Connect为连接器发出的事件记录创建主题时,产生的主题从适用的组中获取它们的配置。该配置仅适用于该连接器发出的事件记录。

主题创建组

主题属性集与主题创建组相关联。最低限度,您必须定义一个默认的主题创建组,并指定其配置属性。除此之外,您还可以选择定义一个或多个自定义主题创建组,并为每个组指定惟一属性。

创建自定义主题创建组时,可以根据主题名称模式为每个组定义成员主题。您可以指定命名模式,以描述从每个组中包含或排除的主题。的包括而且排除属性包含以逗号分隔的正则表达式列表,这些正则表达式定义了主题名称模式。例如,如果您希望一个组包含以字符串开头的所有主题dbserver1.inventory,设置其值topic.creation.inventory.include财产dbserver1 \ \ .inventory \ \ *

如果同时指定包括而且排除属性时,排除规则优先,并覆盖包含规则。

主题创建组配置属性

默认的主题创建组和每个自定义组都与一组惟一的配置属性相关联。可以将组配置为包含Kafka主题级配置属性.例如,可以指定旧主题段的清理策略保留时间,或主题压缩类型对于主题组。您必须至少定义一组最小属性来描述要创建的主题的配置。

如果没有注册自定义组,或者包括任何注册组的模式都不匹配任何要创建的主题的名称,那么Kafka Connect使用默认的分组以创建主题。

看到配置Debezium主题开云体育官方注册网址在Debezi开云体育官方注册网址um安装指南的通用主题配置注意事项。

默认组配置

在使用Kafka Connect自动创建主题之前,你必须创建一个默认的主题创建组,并为它定义一个配置。缺省主题创建组的配置应用于名称不匹配的任何主题包括自定义主题创建组的列表模式。

过程
  • 属性的属性topic.creation.default将它们添加到连接器配置JSON中,如下例所示:

    {…“topic.creation.default.replication.factor”:3,(1)“topic.creation.default.partitions”:10(2)“topic.creation.default.cleanup.policy”:“紧凑”,(3):“topic.creation.default.compression.type lz4”(4)...}

    你可以包括任何Kafka主题级配置属性的配置中默认的组。

表1。的连接器配置默认的主题创建组
描述

1

topic.creation.default.replication.factor为默认组创建的主题定义复制因子。
replication.factor对于默认的组,但可选自定义组。自定义组将退回到默认的组的值。使用-1使用Kafka代理的默认值。

2

topic.creation.default.partitions定义由默认组创建的主题的分区数。
分区对于默认的组,但可选自定义组。自定义组将退回到默认的组的值。使用-1使用Kafka代理的默认值。

3.

topic.creation.default.cleanup.policy映射到cleanup.policy的属性主题级配置参数并定义日志保留策略。

4

topic.creation.default.compression.type映射到compression.type的属性主题级配置参数并定义如何在硬盘上压缩消息。

自定义组返回到默认的只根据需要进行分组设置replication.factor而且分区属性。如果自定义主题组的配置未定义其他属性,则在默认的组未应用。

自定义组配置

您可以定义多个自定义主题组,每个主题组都有自己的配置。

过程
  • 要定义自定义主题组,请添加topic.creation。< group_name >其中包括属性转换为连接器JSON,并在组名之后列出自定义组的属性。

    的示例配置库存而且applicationlogs自定义主题创建组:

    {…(1)“topic.creation.inventory。包括": "dbserver1\\.inventory\\.*",(2)“topic.creation.inventory。分区": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000,(3)“topic.creation.applicationlogs。包括": "dbserver1\\.logs\\.applog-.*",(4)“topic.creation.applicationlogs。排除": "dbserver1\\.logs\\.applog-old-.*",(5)“topic.creation.applicationlogs.replication。因子“:1,”topic.creation.applicationlogs。分区": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4", ... }
表2。自定义连接器配置库存而且applicationlogs主题创建组
描述

1

属性的配置库存组。
replication.factor而且分区属性是自定义组的可选属性。属性的值。如果未设置值,则自定义组返回为默认的组。将值设置为-1使用为Kafka代理设置的值。

2

topic.creation.inventory.include定义正则表达式以匹配所有以开头的主题dbserver1.inventory。.属性定义的配置库存组仅应用于名称与指定正则表达式匹配的主题。

3.

属性的配置applicationlogs组。
replication.factor而且分区属性是自定义组的可选属性。属性的值。如果未设置值,则自定义组返回为默认的组。将值设置为-1使用为Kafka代理设置的值。

4

topic.creation.applicationlogs.include定义正则表达式以匹配所有以开头的主题dbserver1.logs.applog -.属性定义的配置applicationlogs组仅应用于名称与指定正则表达式匹配的主题。因为一个排除属性也为该组定义了包括正则表达式可能会受到进一步的限制排除财产。

5

topic.creation.applicationlogs.exclude定义正则表达式以匹配所有以开头的主题dbserver1.logs.applog-old -.属性定义的配置applicationlogs组仅应用于名称为匹配给定的正则表达式。因为一个包括属性也为该组定义,则applicationlogs组仅应用于名称与指定的匹配的主题包括正则表达式而且匹配指定的排除正则表达式。

注册自定义组

为任何自定义主题创建组指定配置后,请注册这些组。

过程
  • 控件注册自定义组topic.creation.groups属性转换为连接器JSON,并指定以逗号分隔的组列表。

    下面的示例注册自定义主题创建组库存而且applicationlogs

    {…“topic.creation。组":"目录,应用程序日志",…}
完成配置

属性的配置,下面的示例显示已完成的配置默认的对象的配置库存和一个applicationlogs自定义主题创建组:

示例:默认主题创建组和两个自定义组的配置
{…"topic. create .default.replication.factor": 3, "topic. create .default.partitions": 10, "topic. create .default.cleanup.policy": "compact", "topic. create .default.compression.type": "lz4"组:"inventory,applicationlogs", "topic.creation.inventory. log . "包括”:“dbserver1 \ \ .inventory \ \ *”,“topic.creation.inventory。分区": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000, "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", "topic.creation.applicationlogs.replication.factor": 1, "topic.creation.applicationlogs.partitions": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4" }

额外的资源

有关主题自动创建的更多信息,您可以查看这些资源: