这是Apache脉冲星PMC成员和提交者翟佳的客座文章。

开云体育官方注册网址是一个用于变更数据捕获(CDC)的开源项目。它是建立在Apache Kafka Connect支持MySQL、MongoDB、Postgr开云体育电动老虎机eSQL、Oracle、SQL Server等多种数据库。Apache脉冲星包括一套内置连接器基于Pulsar IO框架,是Apache Kafka Connect的对应部分。

从2.3.0版本开始,Pulsar IO支持开云体育官方注册网址Debezium源连接器因此,您可以利用Debezium将数据库中的更改流到Apache Pul开云体育官方注册网址sar中。开云体育电动老虎机本教程将引导您使用Pulsar IO为MySQL设置Debezium连接器。开云体育官方注册网址

教程的步骤

本教程类似于开云体育官方注册网址Debezium教程,只是事件流的存储从Kafka改为Pulsar。主要包括六个步骤:

  1. 启动MySQL服务器;

  2. 启动独立脉冲星服务;

  3. 在Pulsar IO开云体育官方注册网址中启动Debezium连接器。Pulsar IO读取MySQL开云体育电动老虎机服务器中存在的数据库更改;

  4. 订阅Pulsar主题来监控MySQL的变化;

  5. 在MySQL服务器中进行更改,并立即验证更改是否记录在Pulsar主题中;

  6. 清理。

步骤1:启动MySQL服务器

启动一个包含数据库示例的MySQL服务器,Debezium将从中捕获更改。开云体育电动老虎机开云体育官方注册网址打开一个新终端,启动一个运行MySQL数据库服务器的新容器,该容器预先配置了一个名为inventory的数据库:开云体育电动老虎机

docker运行——rm——name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debe开云体育官方注册网址zium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

屏幕回显如下:

2019-03-25T14:12:41.178670Z 0[注]mysqld: ready for connections。socket: /var/run/mysqld/mysqld. logsock' port: 3306 MySQL社区服务器

步骤2:启动独立的Pulsar服务

以单机模式本地启动Pulsar服务。Pulsar 2.3.0引入了在Puls开云体育官方注册网址ar IO中运行Debezium连接器的支持。下载2.3.0版本的脉冲星双星而且pulsar-io-kafka-connect-adaptor-2.3.0。2.3.0版本的Nar.在Pulsar中,所有Pulsar IO连接器都被包装为单独的NAR文件。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz $ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar $ tar ZXF apache-脉冲星-2.3.0-bin.tar.gz $ CD apache-脉冲星-2.3.0 $ mkdir连接器$ cp ../脉冲星-io-kafka-connect-适配器-2.3.0。Nar连接器$ bin/脉冲星独立
启动脉冲星独立]

步骤3:在Pulsar IO中启动D开云体育官方注册网址ebezium MySQL连接器

在Pulsar IO开云体育官方注册网址中以本地运行模式在另一个终端选项卡中启动Debezium MySQL连接器。“debe开云体育官方注册网址zium-mysql-source-config。Yaml”文件包含了所有的配置,主要参数都列在“configs”节点下。yaml文件包含“task.class”参数。配置文件还包括MySQL相关参数(如服务器、端口、用户、密码)和用于“历史”和“偏移量”存储的Pulsar主题的两个名称。

$ bin/ pulse -admin source localrun——sourceConfigFile 开云体育官方注册网址debezum -mysql-source-config.yaml

“debezium-mysql-sour开云体育官方注册网址ce-config. conf”文件中的内容。Yaml的文件如下所示。

tenant: "test" namespace: "test-namespace" name: "开云体育官方注册网址 debezu -kafka-source" topicName: "kafka-connect-topic" archive: "connectors/ pulse -io-kafka-connect- adapter -2.3.0. "nar" parallelism: 1 configs: ## sourceTask task.class: "io.开云体育官方注册网址debezium.connector.mysql.MySqlConnectorTask" ##配置mysql, docker镜像:debezium/example-mysql:0.8数据库。开云体育电动老虎机主机名:“localhost”数据库。开云体育电动老虎机端口:“3306”数据库。开云体育电动老虎机用户:“deb开云体育官方注册网址开云体育电动老虎机ezium”数据库。密码:“dbz”database开云体育电动老虎机.server.id:“184054”database.server.name:“dbserver1”数据库。白名单:“库存”数据库。开云体育电动老虎机database.history.pulsar.topic: 开云体育官方注册网址"hist开云体育电动老虎机ory-topic" database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG键。转换器:“org.apache.kafka.connect.json。JsonConverter”价值。转换器:“org.apache.kafka.connect.json。JsonConverter" ## PULSAR_SERVICE_URL_CONFIG脉冲星。service.url: "pulsar://127.0.0.1:6650"

表是在前面提到的MySQL服务器中自动创建的。所以D开云体育官方注册网址ebezium连接器从MySQL binlog文件开始读取历史记录。在输出中,您将发现连接器已经在47条记录中被触发和处理。

连接器启动过程记录

有关如何管理连接器的更多信息,请参见脉冲星IO文件

Debezium捕获和读取的记录将自动发布到Pulsar主题。开云体育官方注册网址当你启动一个新的终端时,你会用下面的命令在Pulsar中找到当前的主题:

$ bin/ pulse -admin主题列表public/default
列出脉冲星主题

对于每个已更改的表,更改数据存储在单独的Pulsar主题中。除了数据库表相开云体育电动老虎机关的主题外,另外两个主题“history-topic”和“offset-topic”用于存储历史和偏移量相关的数据。

持久性:/ /公共/违约/ history-topic持久:/ /公共/ / offset-topic违约

步骤4:订阅Pulsar主题来监视MySQL的变化

持久性:/ /公共/ / dbserver1.inventory.products违约主题为例。使用CLI命令使用该主题,并在“products”表更改时监视更改。

$ bin/ pulse -client consume -s "sub-products" public/default/dbserver1.inventory。产品-n 0

回显如下:

…22:17:41.201 [pular -client-io-1-1] INFO org.apache. pular .client.impl. consumerimpl - [public/default/dbserver1.inventory. xml][sub-products]订阅cnx上的topic [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650] 22:17:41.223 [pular -client-io-1-1] INFO org.apache. pular .client.impl. consumerimpl - [public/default/dbserver1.inventory. conf . INFO]订阅主题在localhost/127.0.0.1:6650——消费者:0

类中存储表更改时,还可以使用偏移量主题来监视偏移量更改持久性:/ /公共/ / dbserver1.inventory.products违约脉冲星的话题。

$ bin/ pulse -client consume -s "sub-offset" offset-topic -n 0

步骤5:在MySQL服务器中进行更改,并验证更改是否立即记录在Pulsar主题中

启动MySQL CLI docker连接器,可以对MySQL服务器中的“products”表进行更改。

$docker run -它——rm——name mysqlterm——link mysql——rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -P"$ MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

执行该命令后,进入MySQL命令行,可以修改“products”表中这两项的名称。

Mysql >使用库存;Mysql >显示表;mysql> SELECT * FROM product;mysql> UPDATE product SET name='1111111111' WHERE id=101;mysql> UPDATE product SET name='1111111111' WHERE id=107
mysql更新

在消费产品主题的终端中,您会发现添加了两个更改。

表主题存储mysql更新

在使用偏移量主题的终端中,您发现添加了两个偏移量。

偏移主题得到更新

在本地运行连接器的终端中,您发现又处理了两条记录。

表主题获取更多记录

第六步:清理。

使用“Ctrl + C”关闭终端。使用“docker ps”和“docker kill”停止MySQL相关容器。

mysql> quit $ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 84d66c2f591d 开云体育官方注册网址debezium/example-mysql:0.8 "docker-entrypoint. shmysql $ docker kill 84d66c2f591d .0:3306->3306/tcp, 33060/tcp

如果要删除Pulsar数据,请删除Pulsar二进制目录中的data目录。

$ pwd /Users/jia/ws/releases/apache- pulse -2.3.0 $ rm -rf数据

结论

Pulsar IO框架允许运行Debezium连接器进行数据变化捕获,将不同数据库中开云体育官方注册网址的数据变化流到Apache Pulsar中。开云体育电动老虎机在本教程中,您学习了如何捕获MySQL数据库中的数据更改并将其传播到Pulsar。开云体育电动老虎机我们正在不断改进对使用Apache Pulsar运行Debezium连接器的支持,开云体育官方注册网址在Pulsar 2.4.0发布后,它将更容易使用。

贾翟

Jia是StreamNative的核心软件工程师,也是Apache BookKeeper和Apache Pulsar的PMC成员,并不断为这两个项目做出贡献。他住在中国北京。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题

Baidu
map