您正在查看过时的Debezium版本的文档。开云体育官方注册网址
如果您想查看本页最新的稳定版本,请前往在这里

自定义主题自动创建

开云体育官方注册网址Debezium自动创建内部偏移量、连接器状态、配置存储和历史主题的主题。被捕获表的目标主题将由Kafka代理使用默认配置自动创建auto.create.topics.enable设置为真正的
当在代理上禁用主题创建时,例如在生产环境中,或者当主题需要不同的配置时,这些主题必须预先创建,要么在自定义部署过程中自动创建,要么手动创建,直到Kafka Connect 2.6。

自从Kafka 2.6.0以来,Kafka Connect支持自定义主题自动创建。

设置Kafka Connect

Kafka Connect,因为Kafka 2.6.0支持主题创建:

topic. create .enable = true

如果不希望允许连接器自动创建主题,可以将此值设置为在Kafka连接配置(connect-distributed.properties文件或通过环境变量CONNECT_TOPIC_CREATION_ENABLE当使用开云体育官方注册网址Debezium的Kafka Connect容器映像).

配置

主题自动创建基于组。每个自定义组都有一个包括和一个排除属性,这些正则表达式以逗号分隔,与应包含或排除的主题名称匹配。

你可以同时指定两者,包括而且排除参数,但请注意,排除规则优先并覆盖主题的任何包含规则。

您不必指定任何自定义组。当没有已注册的自定义组或已注册的组时包括模式与要创建的主题不匹配,则将使用默认配置。

你可以指定所有主题级配置参数自定义创建主题的方式。

看到配置Debezium主题开云体育官方注册网址章节中关于通用主题配置注意事项开云体育官方注册网址的介绍。

默认组配置

默认配置可以像这样在连接器配置JSON中传递:

{…“topic.creation.default.replication.factor”:3,(1)“topic.creation.default.partitions”:10(2)“topic.creation.default.cleanup.policy”:“紧凑”,(3):“topic.creation.default.compression.type lz4”(4)...}
表1。的连接器配置默认的主题创建组
描述

1

topic.creation.default.replication.factor为默认组创建的主题定义复制因子。
replication.factor对于默认的组,但可选自定义组。自定义组将回退到默认的组的值。使用1使用Kafka代理的默认值。

2

topic.creation.default.partitions定义由默认组创建的主题的分区数。
分区对于默认的组,但可选自定义组。自定义组将回退到默认的组的值。使用1使用Kafka代理的默认值。

3.

topic.creation.default.cleanup.policy映射到cleanup.policy的属性主题级配置参数并定义日志保留策略。

4

topic.creation.default.compression.type映射到compression.type的属性主题级配置参数并定义如何在硬盘上压缩消息。

如你所见,你可以使用每一个主题级配置参数作为财产。

请注意,replication.factor而且分区属性是默认的组和可选的自定义组。自定义组将回退到默认的组的值。使用1使用Kafka代理的默认值。
此回退不适用于其他配置参数。

自定义组配置

可以指定多个组。类似于默认的根据组名将属性分组。在连接器JSON中就像这样:

{…(1)“topic.creation.inventory。包括": "dbserver1\\.inventory\\.*",(2)“topic.creation.inventory。分区": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000,(3)“topic.creation.applicationlogs。包括": "dbserver1\\.logs\\.applog-.*",(4)“topic.creation.applicationlogs。排除": "dbserver1\\.logs\\.applog-old-.*",(5)“topic.creation.applicationlogs.replication。因子“:1,”topic.creation.applicationlogs。分区": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4", ... }
表2。自定义连接器配置库存而且applicationlogs主题创建组
描述

1

属性的配置库存组。
请注意,replication.factor而且分区属性是自定义组的可选属性。自定义组将回退到默认的组的值。使用1使用Kafka代理的默认值。

2

topic.creation.inventory.include定义正则表达式以匹配所有以开头的主题dbserver1.inventory。.属性定义的配置库存组仅在主题名称与给定正则表达式匹配时应用。

3.

然后定义的配置applicationlogs组。
请注意,replication.factor而且分区属性是自定义组的可选属性。自定义组将回退到默认的组的值。使用1使用Kafka代理的默认值。

4

topic.creation.applicationlogs.include定义正则表达式以匹配所有以开头的主题dbserver1.logs.applog -.属性定义的配置applicationlogs组仅在主题名称与给定正则表达式匹配时应用。因为还有排除位置上定义的属性< 5 >所有与此匹配的主题包括正则表达式可能会受到进一步的限制exlude财产。

5

topic.creation.applicationlogs.exclude定义正则表达式以匹配所有以开头的主题dbserver1.logs.applog-old -.属性定义的配置applicationlogs组将仅在主题名称应用时应用匹配给定的正则表达式。因为还有包括属性applicationlogs组将仅应用于名称与包括正则表达式/秒而且匹配排除正则表达式/ s。

注册自定义组

最后,我们需要注册两个已定义的自定义组库存而且applicationlogstopic.creation.groups属性:

{…“topic.creation。组":"目录,应用程序日志",…}

一个完整的连接器JSON配置如下所示:

{…"topic. create .default.replication.factor": 3, "topic. create .default.partitions": 10, "topic. create .default.cleanup.policy": "compact", "topic. create .default.compression.type": "lz4"组:"inventory,applicationlogs", "topic.creation.inventory. log . "包括”:“dbserver1 \ \ .inventory \ \ *”,“topic.creation.inventory。replication.factor": 3, "topic.creation.inventory.partitions": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000, "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", "topic.creation.applicationlogs.replication.factor": 1, "topic.creation.applicationlogs.partitions": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4" }

额外的资源

有关主题自动创建的更多信息,您可以查看这些资源: