自定义Kafka连接自动主题创建
Kafka提供了两种自动创建主题的机制。您可以为Kafka代理启用自动创建主题,并且从Kafka 2.6.0开始,您还可以启用Kafka Connect来创建主题。Kafka代理使用auto.create.topics.enable
属性控制自动创建主题。在Kafka Connect中topic.creation.enable
属性指定Kafka Connect是否允许创建主题。在这两种情况下,属性的默认设置都允许自动创建主题。
当启用自动主题创建时,如果Debezium源连接器为一个不存在目标主题的表发出更改事件记录,则开云体育官方注册网址当事件记录被摄取到Kafka时,主题将在运行时创建。
代理创建的主题仅限于共享一个默认配置。代理不能将唯一配置应用于不同的主题或主题集。相比之下,Kafka Connect可以在创建主题、设置复制因子、分区数量和其他在Debezium连接器配置中指定的主题特定设置时应用任何配置。开云体育官方注册网址连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组关联。
代理配置和Kafka Connect配置是相互独立的。Kafka Connect可以创建主题,不管你是否在代理上禁用了主题创建。如果你在代理和Kafka Connect中都启用自动创建主题,那么连接配置优先,只有当Kafka Connect配置中的设置都不适用时,代理才会创建主题。
禁用Kafka代理的自动主题创建
默认情况下,Kafka代理配置允许代理在运行时创建主题(如果主题不存在)。代理创建的主题不能配置自定义属性。如果你使用的Kafka版本低于2.6.0,并且你想要创建带有特定配置的主题,你必须在代理上禁用自动创建主题,然后显式地创建主题,可以手动创建,也可以通过自定义部署过程创建。
在代理配置中,设置值
auto.create.topics.enable
来假
.
设置Kafka Connect
Kafka Connect中的自动主题创建由topic.creation.enable
财产。该属性的默认值为真正的
,启用自动创建主题,示例如下:
topic. create .enable = true
的设置topic.creation.enable
属性应用于Connect集群中的所有worker。
Kafka Connect自动创建主题需要你定义Kafka Connect在创建主题时应用的配置属性。通过定义主题组,然后指定要应用于每个组的属性,可以在Debezium连接器配置中指定主题配置属性。开云体育官方注册网址连接器配置定义了一个默认主题创建组,以及一个或多个自定义主题创建组(可选)。自定义主题创建组使用主题名称模式列表来指定组设置应用于的主题。
默认情况下,Kafka Connect创建的主题是基于模式命名的server.schema.table
,例如,dbserver.myschema.inventory
.
如果你不想让Kafka Connect自动创建主题,设置的值 |
Kafka连接自动主题创建需要 |
配置
对于Kafka Connect自动创建主题,它需要源连接器在创建主题时应用的配置属性的信息。在每个Debezium连接器的配置中定义控制主题创建的属性。开云体育官方注册网址当Kafka Connect为连接器发出的事件记录创建主题时,产生的主题从适用的组中获取它们的配置。该配置仅适用于该连接器发出的事件记录。
主题创建组
主题属性集与主题创建组相关联。最低限度,您必须定义一个默认的
主题创建组,并指定其配置属性。除此之外,您还可以选择定义一个或多个自定义主题创建组,并为每个组指定惟一属性。
创建自定义主题创建组时,可以根据主题名称模式为每个组定义成员主题。您可以指定命名模式,以描述从每个组中包含或排除的主题。的包括
而且排除
属性包含以逗号分隔的正则表达式列表,这些正则表达式定义了主题名称模式。例如,如果您希望一个组包含以字符串开头的所有主题dbserver1.inventory
,设置其值topic.creation.inventory.include
财产dbserver1 \ \ .inventory \ \ *
.
如果同时指定 |
主题创建组配置属性
的默认的
主题创建组和每个自定义组都与一组惟一的配置属性相关联。可以将组配置为包含Kafka主题级配置属性.例如,可以指定旧主题段的清理策略,保留时间,或主题压缩类型对于主题组。您必须至少定义一组最小属性来描述要创建的主题的配置。
如果没有注册自定义组,或者包括
任何注册组的模式都不匹配任何要创建的主题的名称,那么Kafka Connect使用默认的
分组以创建主题。
看到配置Debezium主题开云体育官方注册网址在Debezi开云体育官方注册网址um安装指南的通用主题配置注意事项。
默认组配置
在使用Kafka Connect自动创建主题之前,你必须创建一个默认的主题创建组,并为它定义一个配置。缺省主题创建组的配置应用于名称不匹配的任何主题包括
自定义主题创建组的列表模式。
属性的属性
topic.creation.default
将它们添加到连接器配置JSON中,如下例所示:{…“topic.creation.default.replication.factor”:3,(1)“topic.creation.default.partitions”:10(2)“topic.creation.default.cleanup.policy”:“紧凑”,(3):“topic.creation.default.compression.type lz4”(4)...}
你可以包括任何Kafka主题级配置属性的配置中
默认的
组。
项 | 描述 |
---|---|
1 |
|
2 |
|
3. |
|
4 |
|
自定义组返回到 |
自定义组配置
您可以定义多个自定义主题组,每个主题组都有自己的配置。
要定义自定义主题组,请添加
topic.creation。< group_name >其中包括
属性转换为连接器JSON,并在组名之后列出自定义组的属性。的示例配置
库存
而且applicationlogs
自定义主题创建组:{…(1)“topic.creation.inventory。包括": "dbserver1\\.inventory\\.*",(2)“topic.creation.inventory。分区": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000,(3)“topic.creation.applicationlogs。包括": "dbserver1\\.logs\\.applog-.*",(4)“topic.creation.applicationlogs。排除": "dbserver1\\.logs\\.applog-old-.*",(5)“topic.creation.applicationlogs.replication。因子“:1,”topic.creation.applicationlogs。分区": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4", ... }
项 | 描述 |
---|---|
1 |
属性的配置 |
2 |
|
3. |
属性的配置 |
4 |
|
5 |
|
注册自定义组
为任何自定义主题创建组指定配置后,请注册这些组。
控件注册自定义组
topic.creation.groups
属性转换为连接器JSON,并指定以逗号分隔的组列表。下面的示例注册自定义主题创建组
库存
而且applicationlogs
:{…“topic.creation。组":"目录,应用程序日志",…}
属性的配置,下面的示例显示已完成的配置默认的
对象的配置库存
和一个applicationlogs
自定义主题创建组:
{…"topic. create .default.replication.factor": 3, "topic. create .default.partitions": 10, "topic. create .default.cleanup.policy": "compact", "topic. create .default.compression.type": "lz4"组:"inventory,applicationlogs", "topic.creation.inventory. log . "包括”:“dbserver1 \ \ .inventory \ \ *”,“topic.creation.inventory。分区": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000, "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", "topic.creation.applicationlogs.replication.factor": 1, "topic.creation.applicationlogs.partitions": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4" }
额外的资源
有关主题自动创建的更多信息,您可以查看这些资源:
开云体育官方注册网址Debezium博客:自动创建Debezium更改开云体育官方注册网址数据主题
Kafka关于在Kafka Connect中添加主题自动创建的改进建议:KIP-158 Kafka Connect应该允许源连接器为新主题设置特定主题的设置