在这篇博文中,我们将创建一个简单的流数据管道来连续捕捉MySQL数据库中的变化,并将其近乎实时地复制到PostgreSQL数据库中。开云体育电动老虎机我们将展示如何在不编写任何代码的情况下做到这一点,而是通过使用和配置Kafka Connect、Debezium MySQL源连接器、Confluent JDBC接收器连接器和一些单个消息转换(smt)来实现。开云体育官方注册网址
这种通过Kafka复制数据的方法本身确实很有用,但是当我们可以将近乎实时的数据变更流与其他流、连接器和流处理应用程序结合在一起时,这种方法会变得更加有利。最近的一次Confluent博客文章系列显示了类似的流数据管道,但使用不同的连接器和smt。Kafka Connect的伟大之处在于,你可以混合和匹配连接器来在多个系统之间移动数据。
我们还将演示附带发布的一个新功能开云体育官方注册网址Debezium 0.6.0:用于的单个消息转换事件扁平化.
拓扑结构
该场景的拓扑图如下图所示:
图1:一个通用拓扑
为了简化设置,我们将只使用一个Kafka Connect实例,该实例将包含所有连接器。例如,这个实例将作为事件生产者和事件消费者:
图2:简化的拓扑
配置
我们会用到这个组成用于快速部署演示。部署由以下Docker镜像组成:
Debe开云体育官方注册网址zium MySQL连接器被设计为专门捕获数据库更改,并提供关于这些事件的尽可能多的信息,而不仅仅是每行的新状态。开云体育电动老虎机同时,Confluent JDBC Sink Connector被设计为根据消息的结构简单地将每个消息转换为数据库插入/更新。开云体育电动老虎机因此,这两个连接器具有不同的消息结构,但它们也使用不同的主题命名约定和表示已删除记录的行为。
在使用未设计为协同工作的连接器时,这种结构和行为上的不匹配很常见。但这是我们很容易处理的问题,我们将在接下来的几节中讨论如何处理。
事件格式
开云体育官方注册网址Debezium以一种复杂的格式发出事件,其中包含关于捕获的数据更改的所有信息:操作类型、源元数据、连接器处理事件的时间戳,以及更改前后行的状态。开云体育官方注册网址Debezium称这种结构为an“信封”:
{"人事处":"u","源": {...},"ts_ms":"...","之前": {"field1":"oldvalue1","field2":"oldvalue2"},"后": {"field1":"newvalue1","field2":"newvalue2"}}
许多其他的Kafka Connect源连接器没有这样的奢侈,知道这么多的变化,而是使用一个更简单的模型,其中每个消息直接表示行的后状态。这也是许多接收器连接器所期望的,Confluent JDBC接收器连接器也不例外:
{"field1":"newvalue1","field2":"newvalue2"}
虽然我们认为Debezium CDC连接器提供尽可能多的细节实际上是一件很棒的事情,但我们也让您开云体育官方注册网址可以轻松地转换Debezium的CDC连接器“信封”格式到“行”许多其他连接器所期望的格式。开云体育官方注册网址Debezium以一种形式在这两种格式之间提供了桥梁单个消息转换.的ExtractNewRecordState
转换自动提取新的行记录,从而有效地趋于平缓将复杂的记录转换为可由其他连接器使用的简单记录。
您可以在源连接器上使用此SMT来转换消息之前它被写入Kafka,或者你也可以存储源连接器的rich文件“信封”在Kafka中使用消息的SMT形式,并在接收器连接器上使用这个SMT来转换消息后它是从Kafka读取的,在它被传递给接收器连接器之前。这两种方法都可以工作,只是取决于您是否发现邮件的信封形式对其他目的有用。
在我们的例子中,我们使用以下配置属性在接收器连接器上应用SMT:
"transforms": "unwrap", "transforms.unwrap。"类型”:“io.deb开云体育官方注册网址ezium.transforms.ExtractNewRecordState”,
删除记录
当Debezium开云体育官方注册网址连接器检测到一行被删除时,它会创建两个事件消息删除事件和墓碑上消息。的删除信息中已删除行状态的信封之前
字段和后
即场零
.的墓碑上消息包含与删除消息,但整个消息值为零
, Kafka的日志压缩利用了这一点,知道它可以删除任何具有相同键的早期消息。许多接收器连接器(包括Confluent的JDBC接收器连接器)并不期待这些消息,如果它们看到任何一种消息,它们都会失败。的ExtractNewRecordState
SMT将在默认情况下过滤掉两者删除而且墓碑上但是,如果您正在使用SMT并希望保留其中一种或两种消息,则可以更改此设置。
主题命名
最后但并非最不重要的是,主题的命名存在差异。开云体育官方注册网址Debezium对表示它管理的每个表的目标主题使用完全限定命名。命名遵循这个模式<逻辑名称>。<数据库名称>。<开云体育电动老虎机表名称>
.Kafka Connect JDBC连接器使用简单的名称<表名称>
.
在更复杂的场景中,用户可以部署卡夫卡流在源路由和目标路由之间建立详细路由的框架。在我们的例子中,我们将使用股票RegexRouter
SMT,它将Debezium创建的记录路由到根据JDBC连接器模式命名的主题。开云体育官方注册网址同样,我们可以在源连接器或接收器连接器中使用这个SMT,但在本例中,我们将在源连接器中使用它,以便我们可以选择写入记录的Kafka主题的名称。
"transforms": "route", "transforms.route "。”:“org.apache.kafka.connect.transforms类型。RegexRouter”、“transforms.route。正则表达式 ": "([^.]+)\\.([^.]+)\\.([^.]+)", " transforms.route。替换”:“3美元”
例子
仔细研究一下,让我们来试试我们的例子!
首先,我们需要部署所有组件。
export 开云体育官方注册网址DEBEZIUM_VERSION=0.6 docker-compose up
当所有组件启动后,我们将注册JDBC Sink连接器写入PostgreSQL数据库:开云体育电动老虎机
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
使用这个注册请求:
{"的名字":"jdbc-sink","配置": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","主题":"客户","connection.url":"jdbc: postgresql: / / postgres: 5432 /库存吗?用户= postgresuser&password = postgrespw","转换":"打开",(1)"transforms.unwrap.type":"io.开云体育官方注册网址debezium.transforms.ExtractNewRecordState",(1)"auto.create":"真正的",(2)"insert.mode":"插入",(3.)"pk.fields":"id",(4)"pk.mode":"record_value"(4)}}
请求配置这些选项:
将Debezium复杂开云体育官方注册网址的格式分解为简单的格式
自动创建目标表
如果该行不存在,则插入该行或更新现有行
识别存储在Kafka的记录值字段中的主键
然后必须设置源连接器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
使用这个注册请求:
{"的名字":"inventory-connector","配置": {"connector.class":"io.开云体育官方注册网址debezium.connector.mysql.MySqlConnector","tasks.max":"1","开云体育电动老虎机database.hostname":"mysql","开云体育电动老虎机database.port":"3306","开云体育电动老虎机database.user":"开云体育官方注册网址","开云体育电动老虎机database.password":"dbz","开云体育电动老虎机database.server.id":"184054","开云体育电动老虎机database.server.name":"dbserver1",(1)"开云体育电动老虎机database.whitelist":"库存",(2)"开云体育电动老虎机database.history.kafka.bootstrap.servers":"卡夫卡:9092","开云体育电动老虎机database.history.kafka.topic":"schema-changes.inventory","转换":"路线",(3.)"transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",(3.)"transforms.route.regex":"([^] +)\ \([^] +)。\ \([^] +)。",(3.)"transforms.route.replacement":"3美元"(3.)}}
请求配置这些选项:
数据库的逻辑名称开云体育电动老虎机
我们要监开云体育电动老虎机视的数据库
定义与主题名称匹配的正则表达式的SMT
<逻辑名称>。<数据库名称>。<开云体育电动老虎机表名称>
并提取它的第三部分作为最终的主题名称
让我们检查数据库是否同步。开云体育电动老虎机所有的行客户
表应该在源数据库(MySQL)和目标数据库(Postgres)中找到:开云体育电动老虎机
docker-compose exec mysql bash - c的mysql - u MYSQL_USER - p MYSQL_PASSWORD美元库存- e”从顾客选择* "' +------+------------+-----------+-----------------------+ | id | first_name | last_name |电子邮件 | +------+------------+-----------+-----------------------+ | 1001 sally.thomas@acme.com托马斯萨利| | | | | 1002年乔治贝利| | | gbailey@foobar.com | | 1003年爱德华·沃克| | | ed@walker.com | | 1004 | |安妮Kretchmar | annek@noanswer.org |+------+------------+-----------+-----------------------+ docker-compose exec postgres bash - c”psql - u POSTGRES_USER POSTGRES_DB美元- c“select *客户”last_name | | id first_name |电子邮件 -----------+------+------------+----------------------- 托马斯·萨利| 1001 | | sally.thomas@acme.com贝利| 1002 |乔治爱德华| | 1003 | | gbailey@foobar.com沃克ed@walker.com Kretchmar安妮| 1004 | | annek@noanswer.org
在连接器仍在运行的情况下,我们可以向MySQL数据库添加一个新行,然后检查它是否被复制到PostgreSQL数据库中:开云体育电动老虎机
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory' mysql> insert into customers values(默认为'John', 'Doe', 'john.doe@example.com');查询好了,1行影响(0.02秒)docker-compose exec postgres bash - c”psql - u POSTGRES_USER POSTGRES_DB美元- c“select *客户”last_name | | id first_name |电子邮件 -----------+------+------------+----------------------- ...Doe | 1005 | John | john.doe@example.com(5行)
总结
我们建立了一个简单的流数据管道,近乎实时地将数据从MySQL数据库复制到PostgreSQL数据库。开云体育电动老虎机我们使用Kafka Connect、Debezium MySQL源连接器、Confluent开云体育官方注册网址 JDBC接收器连接器和一些smt实现了这一点——所有这些都不需要编写任何代码。由于它是一个流系统,它将继续捕获对MySQL数据库所做的所有更改,并几乎实时地复制它们。开云体育电动老虎机
接下来是什么?
在以后的博客文章中,我们将用Elasticsearch作为事件的目标重现相同的场景。
关于Debe开云体育官方注册网址zium
开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源下Apache许可证,版本2.0.
参与
我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium,在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题.