最后更新于2018年11月21日(调整为新的KSQL Docker图像)

去年,我们看到了一个新的开源项目的开始Apache卡夫卡宇宙中,KSQL,这是一个流SQL引擎构建在上面卡夫卡流。在这篇文章中,我们将尝试用Debezium从MySQL数据库生成的数据更改事件进行KSQL查询。开云体育官方注册网址开云体育电动老虎机

作为数据来源,我们将使用数据库和设置从我们的开云体育电动老虎机教程。这项工作的结果应该与最近的类似帖子关于事件的聚合领域驱动聚合

实体关系图

首先,让我们看看数据库中的实体以及它们之间的关系。开云体育电动老虎机

图1:示例实体的实体图


上图显示了示例MySQL实例中库存数据库的完整ER图。开云体育电动老虎机我们将重点关注两个实体:

  • 客户-系统中的客户列表

  • 订单-系统中的订单列表

有一个1: n之间的关系客户而且订单,由购买者中的列。订单表的外键客户表格

配置

我们用aDocker撰写文件用于环境的部署。部署由以下Docker镜像组成:

例子

首先,我们需要启动Debezium和Kafka基础设施开云体育官方注册网址。要做到这一点,克隆开云体育官方注册网址debezium-examplesGitHub存储库,并使用提供的Compose文件启动所需的组件:

export 开云体育官方注册网址DEBEZIUM_VERSION=0.8 git克隆https://github.com/debezium/debezium-examples.git cd debezium_examples /ksql/ docker-compose up

接下来,我们必须注册一个Debezium MySQL连接器的实例来监听数据库中的变开云体育官方注册网址化:开云体育电动老虎机

curl -i - x POST - h "Accept:application/json" - h "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<- eof {"name": "inventory-connector", "config": {"connector.class": "i开云体育官方注册网址o. debezui .connector.mysql. mysqlconnector ", "tasks. debezui .connector.mysql. mysqlconnector ", "任务。Max ": "1", 开云体育电动老虎机"数据库。主机名:“mysql”,“数据库”。开云体育电动老虎机端口:“3306”,“数据库”。开云体育电动老虎机用户":"deb开云体育官方注册网址ez开云体育电动老虎机ium", "数据库"。密码":"dbz", "databas开云体育电动老虎机e.server. "id": "184055", "database.server.name": "dbserver", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } } EOF

现在我们应该让所有组件都启动并运行,初始数据更改事件已经流到Kafka主题中。有多个属性对我们的用例特别重要:

  • UnwrapFromEnvelope SMT使用。属性中直接映射字段部分更改记录到KSQL语句中。没有它,我们就需要EXTRACTJSONFIELD属性中提取的每个字段信息的一部分。

  • JSON转换器禁用模式。原因与上述相同。启用模式后,对于JSON,记录被封装在包含字段的JSON结构中模式(包含模式信息)和有效载荷(与实际数据本身)。我们还需要用到EXTRACTJSONFIELD去相关的领域。Avro转换器没有这样的问题,所以使用Avro时不需要设置此选项。

接下来,我们将启动KSQL命令shell。我们将在CLI中运行一个本地引擎。另外请注意——净参数。这保证了KSQL容器运行在与Debezium容器相同的网络中,并允许正确的DNS解析。开云体育官方注册网址

Docker-compose exec KSQL -cli KSQL http://ksql-server:8088

首先,我们将列出代理中存在的所有Kafka主题:

列出>的主题卡夫卡话题|注册| |分区分区副本  ------------------------------------------------------------------------------------ connect-status假| 5 | 1 | dbserver假| 1 | 1 | dbserver.inventory.addresses假| 1 | 1 | dbserver.inventory.customers假| 1 | 1 | dbserver.inventory.orders假| 1 | 1 | dbserver.inventory.products |假| 1 | 1 dbserver.inventory。Products_on_hand | false | 1 | 1 ksql__commands | true | 1 | 1 my_connect_configs | false | 1 | 1 my_connect_offset | false | 25 | 1 schema-changes。库存|假| 1 | 1

我们感兴趣的题目是dbserver.inventory.orders而且dbserver.inventory.customers

默认情况下,KSQL处理从最新的偏移量。我们希望处理主题中已经存在的事件,因此可以从最早的偏移量。

ksql> SET 'auto.offset。重置' = '最早';成功更改本地属性auto.offset。将'null'重置为'最早'

首先,我们需要从包含Debezium数据更改事件的主题创建流。开云体育官方注册网址一个在KSQL和Kafka流术语是一个无状态的无界传入数据集。

ksql> CREATE STREAM orders_from_开云体育官方注册网址debezium (order_number integer, order_date string, buyer integer, quantity integer, product_id integer) WITH (KAFKA_TOPIC='dbserver.inventory.orders',VALUE_FORMAT='json');消息---------------- Stream created ksql> ksql> CREATE Stream customers_from_debezi开云体育官方注册网址um (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='dbserver.inventory.customers',VALUE_FORMAT='json');消息----------------流已创建

分区

我们的部署每个主题只使用一个分区。在生产系统中,每个主题可能有多个分区,我们需要确保属于聚合对象的所有事件最终都在同一个分区中。在我们的例子中,自然划分是每个客户id。我们要重新划分orders_from_开云体育官方注册网址debezium根据购买者字段,该字段包含客户id。重新分区的数据被写入一个新的主题ORDERS_REPART

CREATE STREAM order WITH (KAFKA_TOPIC='ORDERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM orders_from_debeziu开云体育官方注册网址m PARTITION BY buyer;消息  ---------------------------- 流创建和运行ksql >主题列表;卡夫卡话题|注册| |分区分区副本  ------------------------------------------------------------------------------------ ...ORDERS_REPART | true | 1 | 1…

我们也会对客户进行同样的操作。这是必要的,有两个原因:

  • 当前键是一个结构体,它包含一个名为id使用客户id。这与重新分区的顺序主题不同,后者仅包含id值作为键,因此分区将不匹配。

  • 当我们稍后创建JOIN时,有一个限制要求键与表中的键字段具有相同的值。表字段包含一个普通值,但键包含一个结构体,因此它们不匹配。看到这个KSQL问题欲知详情。

CREATE STREAM customers_stream WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * FROM customers_from_debezi开云体育官方注册网址um;消息  ---------------------------- 流创建和运行ksql >主题列表;卡夫卡话题|注册| |分区分区副本  ------------------------------------------------------------------------------------ ...CUSTOMERS_REPART | true | 1 | 1…

为了验证记录是否有一个新键并因此被重新分配,我们可以发出一些语句来比较结果:

SELECT * FROM orders_from_debezi开云体育官方注册网址um LIMIT 1;1524034842810 | {"order_number":10001} | 10001 | 16816 | 1001 | 1 | 102达到分区限制。查询已终止的ksql> SELECT * FROM orders LIMIT 1;1524034842810 | 1001 | 10001 | 16816 | 1001 | 1 | 102分区达到LIMIT。查询终止

第二列包含ROWKEY这是信息的关键。

客户/订单加入

到目前为止,我们只是将流声明为无界无状态数据集。在我们的用例中订单是一个来去匆匆的事件。但客户是一个可以更新的实体,通常是系统状态的一部分。这样的质量在KSQL或Kafka流中表示为表。我们将根据主题创建一个客户表,其中包含重新分区的客户。

ksql> CREATE TABLE customers (id integer, first_name string, last_name string, email string) WITH (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',KEY='id');日志信息---------------表已创建

现在,我们已经具备了在客户与其订单之间建立连接的一切条件,并创建了一个查询,该查询将监视进入的订单,并将它们与相关的客户字段一起列出。

ksql> SELECT order_number,quantity,customers.first_name,customers. first_namelast_name FROM left orders. buyer =customers.id;10002 | 2 |乔治|贝利10004 | 1 |爱德华|沃克

让我们对数据库进行一些更改,这将导致Debezium发出相应的CD开云体育电动老虎机C事件:开云体育官方注册网址

docker-compose mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory' mysql> INSERT INTO orders VALUES(默认值,NOW(), 1003,5,101);mysql> UPDATE customers SET first_name='Annie' WHERE id=1004;查询OK, 1 row affected(0.02秒)Rows matched: 1 Changed: 1 Warnings: 0 mysql> UPDATE orders SET quantity=20 WHERE order_number=10004;查询确定,1行受影响(0.02秒)匹配的行:1已更改:1警告:0

您可能会注意到,只有订单表触发了已连接流中的更改。这是流/表连接的产物。如果任何输入流被修改,我们将需要一个流/流连接来触发更改。

因此,修改数据库后的选择的最终结果为开云体育电动老虎机

10001 | 1 |萨利| 20 |爱德华|沃克10005 |乔治|贝利10003 | 2 |贝利10004 | 1 |爱德华|沃克10004 | 20 |爱德华|沃克

总结

我们已经成功启动了一个KSQL实例。我们已经将KSQL流映射到Debezium填充的Debeziu开云体育官方注册网址m主题,并在它们之间进行了连接。我们还讨论了流应用程序中的修复问题。

如果你想用Avro编码和模式注册表来尝试这个例子,那么你可以使用我们的Avro例子。更多的细节和更高级的用法请参考KSQL语法参考

如果你需要帮助,有功能需求,或者想分享你对这个例子的经验,请在下面的评论中告诉我们。

雅罗西克Pechanec

Jiri是Red Hat的软件开发人员(前质量工程师)。他职业生涯的大部分时间都花在Java和系统集成项目和任务上。他住在捷克共和国布尔诺附近。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题

Baidu
map