创建新的主题/管道

当你在使用Kafka Connect分布式时,你可能已经意识到,一旦你开始使用Kafka Connect,就已经有一些内部的Kafka Connect相关主题为你创建:

$ kafka-topic .sh——bootstrap-server $HOSTNAME:9092——list connect_configs connect_offset connect_status

这是由Kafka Connect自动完成的,它有一个合理的、自定义的默认主题配置,可以满足这些内部主题的需求。

当你启动一个Debezium连接器开云体育官方注册网址时,被捕获事件的主题是由Kafka代理根据代理中的默认(可能是自定义的)配置创建的Auto.create.topics.enable = true在代理配置中启用:

Auto.create.topics.enable = true default.replication.factor = 1 num.partitions = 1压缩。Type = producer log.cleanup.policy = delete log.retention.ms = 604800000 ## 7天

但通常,当你在生产环境中使用Debezium和K开云体育官方注册网址afka时,你可能会选择禁用Kafka的主题自动创建功能Auto.create.topics.enable = false,或者希望连接器主题的配置与默认值不同。在这种情况下,您必须预先为Debezium捕获的数据源创建主题。开云体育官方注册网址
但也有好消息!从Kafka Connect 2.6.0版本开始,这可以自动完成kip - 158实现了Kafka Connect的自定义主题创建功能。

卡夫卡连接

Kafka Connect,因为Kafka 2.6.0支持主题创建:

topic. create .enable = true

如果不希望允许连接器自动创建主题,可以将此值设置为在Kafka连接配置(connect-distributed.properties文件或通过环境变量CONNECT_TOPIC_CREATION_ENABLE当使用开云体育官方注册网址Debezium的Kafka Connect容器映像).

更新连接器配置

Kafka连接主题创建工作与组。总有一个默认的组,当没有定义与主题匹配的其他组时使用。

每个组都可以指定一组主题配置属性,以及配置应该应用于的主题名称的正则表达式列表。

你可以指定所有主题级配置参数自定义如何创建组中匹配的主题。

让我们看看如何扩展这个Postgres配置来创建Kafka Connect主题:

{"name": "inventory-connector", "config": {"connector.class": "io. d开云体育官方注册网址ebezum .connector.postgresql. postgresconnector ", "任务。Max“:1,”数据开云体育电动老虎机库。主机名":"postgres", "数据库。开云体育电动老虎机端口“:5432,”数据库。开云体育电动老虎机用户":"postgres", "数据库开云体育电动老虎机。密码":"postgres", "databas开云体育电动老虎机e. "Dbname ": "postgres", "开云体育电动老虎机database.server.name": "dbserver1", "schema.include. name"。List ": "inventory"}}

默认配置

所有主题都不匹配topic.creation团体将申请默认的组配置。
作为默认值复制。因子= 3Partitions = 10时,主题应重点紧凑清理。Policy = "compact",并且所有的消息都应该用LZ4压缩到硬盘上压缩。Type = "lz4"
所以我们为默认组配置:

{"name": "inventory-connector", "config": {"connector.class": "io. d开云体育官方注册网址ebezum .connector.postgresql. postgresconnector ", "任务。Max“:1,”数据开云体育电动老虎机库。主机名":"postgres", "数据库。开云体育电动老虎机端口“:5432,”数据库。开云体育电动老虎机用户":"postgres", "数据库开云体育电动老虎机。密码":"postgres", "databas开云体育电动老虎机e. "Dbname ": "postgres", "开云体育电动老虎机database.server.name": "dbserver1", "schema.include. name"。List ": "inventory", "topic. create .default.replication.factor": 3, "topic. create .default.partitions": 10, "topic. create .default.cleanup.policy": "compact", "topic. create .default.compression.type": "lz4"}}

Productlog配置

在数据库中开云体育电动老虎机库存Schema是表的开头产品作为表名。
默认情况下,完全限定的表名被捕获到与Debezium同名的主题,例如表开云体育官方注册网址产品库存模式的dbserver1是否捕获到主题dbserver1.inventory.products

我们希望所有到表名开头的主题的消息产品存储在一个主题中,保留时间为3个月/ 90天清理。政策”:“删除”而且保留。Ms = 7776000000复制。因子= 1分区= 20,并且只使用生产者使用的压缩格式压缩。类型”:“生产商”
你可以省略与集群默认值匹配的属性,但要注意,一旦你在Kafka代理上更改了默认配置,结果的主题配置可能会有所不同!

首先我们需要注册一个productlog组使用topic.creation.groups财产。
然后,我们可以定义应该在该组中包括哪些主题名称,并指定组的配置,就像我们对默认的组:

{"name": "inventory-connector", "config": {"connector.class": "io. d开云体育官方注册网址ebezum .connector.postgresql. postgresconnector ", "任务。Max“:1,”数据开云体育电动老虎机库。主机名":"postgres", "数据库。开云体育电动老虎机端口“:5432,”数据库。开云体育电动老虎机用户":"postgres", "数据库开云体育电动老虎机。密码":"postgres", "databas开云体育电动老虎机e. "Dbname ": "postgres", "开云体育电动老虎机database.server.name": "dbserver1", "schema.include. name"。List ": "inventory", "topic. create .default.replication.factor": 3, "topic. create .default.partitions": 10, "topic. create .default.cleanup.policy": "compact", "topic. create .default.compression.type": "lz4", "topic.creation. replication.factor": 3, "topic. create .default.partitions": 10, "组”:“productlog”,(1)“topic.creation.productlog。包括”:“dbserver1 \ \ .inventory \ \ . product . *”,(2)“topic.creation.productlog.replication。因子“:1,”topic.creation.productlog。分区“:20,”topic.creation.productlog.cleanup。政策”:“删除”,“topic.creation.productlog。保留。Ms ": 7776000000, "topic.creation.productlog.compression.type": "producer"}}
表1。用于自定义自动创建主题的连接器配置
描述

1

topic.creation.groups定义以逗号分隔的其他组名列表。这里我们只定义productlog组。

2

topic.creation.productlog.include字段保存一个以逗号分隔的正则表达式列表,该列表与主题名称相匹配productlog应该应用组配置。的productlog组匹配以。开头的所有主题dbserver1.inventory.product

探索结果

当我们现在启动连接器并使用kafka-topics.sh要查看主题是如何创建的,我们可以看到所有的都按照定义工作:

## dbserver1.inventory。产品` topic has the config from the `productlog` group: $ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.products Topic: dbserver1.inventory.products PartitionCount: 20 ReplicationFactor: 1 Configs: compression.type=producer,cleanup.policy=delete,retention.ms=7776000000,segment.bytes=1073741824 ## the `dbserver1.inventory.orders` topic has the config from the `default` group: $ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.orders Topic: dbserver1.inventory.orders PartitionCount: 10 ReplicationFactor: 3 Configs: compression.type=lz4,cleanup.policy=compact,segment.bytes=1073741824,delete.retention.ms=2592000000

结论

在很多情况下,特别是在生产环境中,我们通常不希望Kafka代理端启用主题自动创建,或者我们需要一个不同于默认主题配置的配置。
在Kafka 2.6之前,这只能在预先手动创建主题或通过一些自定义设置过程(可能在部署期间)时实现。

由于Kafka 2.6 Kafka Connect带有内置的连接器主题创建功能,本文将展示如何在Debezium中使用它。开云体育官方注册网址

你可以找到一个例子在这里在GitHub开云体育官方注册网址上的Debezium示例库中。

雷内·肯纳

René是红帽公司的软件工程师。在此之前,他曾在trivago担任软件架构师和工程师,并在cocentric担任顾问。现在他是Debezium团队的一员开云体育官方注册网址。他住在德国Mönchengladbach。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题

Baidu
map