本教程最初由QuestDB,特邀撰稿人,Yitaek黄,向我们展示了如何通过Debezium和Kafka Connect使用变更数据捕获将数据流导入QuestDB。开云体育官方注册网址

现代数据架构在很大程度上已经偏离了ETL(提取-转换-加载)范例到英语教学(Extract-Load-Transform),在应用转换(例如聚合、连接)进行进一步分析之前,将原始数据首先加载到数据湖中。传统的ETL管道很难维护,并且相对不适应不断变化的业务需求。随着新的云技术承诺更便宜的存储和更好的可伸缩性,数据管道可以从预先构建的提取和批量上传转移到更流的架构。

变更数据捕获(CDC)非常适合这种范式转换,在这种转换中,对来自一个源的数据的更改可以流到其他目的地。顾名思义,CDC跟踪数据(通常是数据库)的变化,并提供插件来处理这些变化。开云体育电动老虎机对于事件驱动的体系结构,CDC作为服务边界(例如,发件箱模式).在复杂的微服务环境中,CDC通过将负担减轻到CDC系统,帮助简化数据交付逻辑。

为了说明这一点,让我们以一个参考架构为例,将股票更新从PostgreSQL流到QuestDB。一个简单的Java Spring应用程序根据股票代码轮询股票价格,并将当前价格更新到PostgreSQL数据库。开云体育电动老虎机然后Debezium检测到更新并将其提供给Kafka主题。开云体育官方注册网址最后,Kafka Connect QuestDB连接器监听该主题并将更改流到QuestDB中进行分析。

设计概述

以这种方式构造数据管道可以简化应用程序。Java Spring应用程序只需要获取最新的股票数据并提交给PostgreSQL。由于PostgreSQL是一个优秀的OLTP(事务性)数据库,应用程序可以依赖ACID遵从性来确保只有提交的开云体育电动老虎机数据才能被下游服务看到。应用程序开发人员不需要担心复杂的重试逻辑或不同步的数据集。从数据库的角度来看开云体育电动老虎机,PostgreSQL可以优化到它最擅长的地方——事务性查询。Kafka可以用来可靠地向其他端点提供数据,QuestDB可以用来存储历史数据,以运行分析查询和可视化。

废话不多说,让我们来看看这个例子:

先决条件

  • Git

  • Docker引擎:20.10+

设置

要在本地运行示例,首先克隆QuestDG Kafka连接器回收

$ git克隆https://github.com/questdb/kafka-questdb-connector.git

然后,导航到stocks示例,构建并运行Docker合成文件:

$ CD kafka-questdb-connector/kafka-questdb-connector-samples/stocks/ docker compose build $ docker compose up

在Linux或较旧版本的Docker中组成子命令可能不可用。你可以试着执行docker-compose而不是码头工人组成.如果docker-compose在您的发行版中不可用,您可以吗安装它手动。

这将为Java Spring App/Kafka Connector for QuestDB构建Dockerfile,并下拉PostgreSQL(预配置了Debezium)、Kafka/Zookeeper、QuestDB和Grafana容器。开云体育官方注册网址Kafka和Kafka Connect需要一些初始化。通过检查连接容器等待日志停止。

启动Debezium开云体育官方注册网址连接器

此时,Java应用程序正在不断更新PostgreSQL中的股票表,但是连接还没有建立。通过执行以下命令创建D开云体育官方注册网址ebezium连接器(即PostgreSQL→Debezium→Kafka):

curl -X POST -H "Content-Type: application/json" -d '{"name":"开云体育官方注册网址debezium_source","config":{"task开云体育电动老虎机s.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors

启动QuestDB Kafka Connect接收器

通过创建Kafka Connect端(即Kafka→QuestDB接收器)完成管道:

curl -X POST -H "Content-Type: application/json" -d '{"name":"questdb-connect","config":{"topics":"dbserver1.public. conf "股票","表格":"股票","connector.class":"io.questdb.kafka.QuestDBSinkConnector","tasks.max":"1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","host":"questdb", "transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "include.key": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}' localhost:8083/connectors

最终结果

现在所有写入PostgreSQL表的更新也将反映在QuestDB中。要验证,请导航到http://localhost:19000并从股票表中选择:

选择股票;

你也可以运行聚合来进行更复杂的分析:

选择时间戳的象征,avg(价格),最小值(价格),马克斯(价格)股票在哪里符号=IBM样本通过1m对齐日历;

最后,您可以与Grafana仪表板进行交互以实现可视化http://localhost:3000/d/stocks/stocks?orgId=1&refresh=5s&viewPanel=2

可视化是由Debezium捕获的变化组成的蜡烛图;开云体育官方注册网址每个蜡烛在给定的时间间隔内显示开盘价、收盘价、最高价和最低价。时间间隔可以通过选择左上角的“interval”选项来更改:

格拉夫纳蜡烛图

深潜水

示例应用程序已经启动并运行,现在让我们更深入地研究中的每个组件股票的例子。

我们将查看以下文件:

├──Kafka - QuestDB -connector/ Kafka - QuestDB -connector-samples/stocks/│├──Dockerfile-App | |——包装我们的Java App的Dockerfile |├──Dockerfile- connect | |——组合Debezium容器的Dockerfile | |——image The with Qu开云体育官方注册网址estDB Kafka connector│├──src/main/resources/schema。│├──src/main/java/com/questdb/kafka/connector/samples/StocksApplication.java | |——java Spring应用程序,定期更新PostgreSQL | |的库存表…

制作人(Java App)

生成器是一个简单的Java Spring Boot应用程序。它有两个组件:

  1. schema.sql文件。这个文件用于在PostgreSQL中创建股票表,并用初始数据填充它。它由Spring Boot App拾取并在启动时执行。

    [source,sql] ---- CREATE TABLE IF NOT EXISTS stock (id serial primary key, symbol varchar(10) unique, price float8, last_update timestamp);INSERT INTO stock (symbol, price, last_update) VALUES ('AAPL', 500.0, now())INSERT INTO stock (symbol, price, last_update) VALUES ('IBM', 50.0, now()) ON冲突不做任何操作;INSERT INTO stock (symbol, price, last_update) VALUES ('MSFT', 100.0, now()) ON冲突不做任何操作;INSERT INTO stock (symbol, price, last_update) VALUES ('GOOG', 1000.0, now())对冲突不做任何操作;INSERT INTO stock (symbol, price, last_update) VALUES ('FB', 200.0, now())INSERT INTO stock (symbol, price, last_update) VALUES ('AMZN', 1000.0, now())对冲突不做任何操作;INSERT INTO stock (symbol, price, last_update) VALUES ('TSLA', 500.0, now()) ON冲突不做任何操作;INSERT INTO stock (symbol, price, last_update) VALUES ('NFLX', 500.0, now()) ON冲突不做任何事;INSERT INTO stock (symbol, price, last_update) VALUES ('TWTR', 50.0, now()) ON冲突不做任何操作; INSERT INTO stock (symbol, price, last_update) VALUES ('SNAP', 10.0, now()) ON CONFLICT DO NOTHING; ----
    ' ON CONFLICT DO NOTHING '子句用于在应用程序重启时避免表中的重复条目。
  2. Java代码用随机值更新价格和时间戳。更新并不是完全随机的,应用程序使用一个非常简单的算法来生成更新,这与股票价格的波动非常相似。在实际场景中,应用程序将从外部来源获取价格。

生成器被打包到一个最小的Dockerfile中,Dockerfile-App,并链接到PostgreSQL:

FROM maven:3.8-jdk-11-slim AS builder COPY ./pom.xml /opt/stocks/pom.xml COPY ./src ./opt/stocks/src WORKDIR /opt/stocks RUN mvn clean install - dskiptest FROM azul/zulu-openjdk:11-latest COPY——FROM =builder /opt/stocks/target/kafka-samples-stocks-*.jar /stocks.jar CMD ["java", "-jar", "/stocks.jar"]

Kafka Connect, 开云体育官方注册网址Debezium和QuestDB Kafka连接器

在深入研究Kafka Connect、Debezium和QuestDB Kaf开云体育官方注册网址ka连接器配置之前,我们先来看看它们之间的关系。

Kafka Connect是一个用于构建连接器的框架,用于在Kafka和其他系统之间移动数据。它支持两类连接器:

  1. 源连接器——从源系统读取数据并将其写入Kafka

  2. 接收器连接器——从Kafka读取数据并将其写入接收器系统

开云体育官方注册网址Debezium是Kafka Connect的Source连接器,可以监视和捕获数据库中的行级更改。开云体育电动老虎机这是什么意思?无论何时在数据库中插入、更新或删除一行,Debezium都会捕获更改并将其作为事件写入Kafka。开云体育官方注册网址开云体育电动老虎机

在技术层面上,Debezium是一个运行在K开云体育官方注册网址afka Connect框架中的Kafka Connect连接器。这反映在开云体育官方注册网址Debezium容器图像它将Kafka Connect与预装的Debezium连接器打包在一起。开云体育官方注册网址

QuestDB Kafka连接器也是Kafka Connect连接器。它是一个Sink连接器,从Kafka读取数据并将其写入QuestDB。我们将QuestDB Kafka连接器添加到Debezium容器镜像中,我们得到了一开云体育官方注册网址个同时安装了Debezium和QuestDB Kafka连接器的Kafka连接镜像!

这是我们用来构建映像的Dockerfile:

从ubuntu:最新AS builder WORKDIR /opt运行apt-get update && apt-get install -y curl wget unzip jq RUN curl -s https://api.github.com/repos/questdb/kafka-questdb-connector/releases/latest | jq -r '.assets[]|select(。Content_type == "application/zip")|。browser_download_url'|wget -qi - RUN unzip kafka-questdb-connector-*-bin.zip FROM 开云体育官方注册网址debezium/connect:1.9.6。最终拷贝——from=builder /opt/kafka-questdb-connector/*.jar /kafka/connect/questdb-connector/

Dockerfile下载最新版本的QuestDB Kafka连接器,解压缩并复制到Debezium容器镜像中。开云体育官方注册网址生成的图像同时安装了Debezium和QuestDB Ka开云体育官方注册网址fka连接器:

dockerfile - connect添加了QuestDB Kafka Connector层

整个Kafka连接器由一个源连接器和一个Sink连接器完成:

源和接收器连接器如何与Kafka集群和数据库一起工作开云体育电动老虎机

开云体育官方注册网址Debezium连接器

我们已经知道Debezium是一个Kafk开云体育官方注册网址a Connect连接器,它可以监视和捕获数据库中的行级更改。开云体育电动老虎机我们还有一个安装了Debezium和QuestDB Kafka连接器的Docker映开云体育官方注册网址像。但是,此时两个连接器都没有运行。我们需要配置并启动它们。这是通过CURL命令完成的,该命令向Kafka Connect REST API发送POST请求。

curl -X POST -H "Content-Type: application/json" -d '{"name":"开云体育官方注册网址debezium_source","config":{"task开云体育电动老虎机s.max":1,"database.hostname":"postgres","database.port":5432,"database.user":"postgres","database.password":"postgres","connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.dbname":"postgres","database.server.name":"dbserver1"}} ' localhost:8083/connectors

请求体包含了Debezium连接器的配置,让我们把它分解一下:开云体育官方注册网址

的名字开云体育官方注册网址debezium_source配置: {tasks.max1开云体育电动老虎机database.hostnamepostgres开云体育电动老虎机database.port5432开云体育电动老虎机database.userpostgres开云体育电动老虎机database.passwordpostgresconnector.classio.开云体育官方注册网址debezium.connector.postgresql.PostgresConnector开云体育电动老虎机database.dbnamepostgres开云体育电动老虎机database.server.namedbserver1}}

它监听PostgreSQL数据库中的变化,并将上述配置发布到Kafka。开云体育电动老虎机主题名称默认为<服务器名>。< schema >。<表>。在我们的例子中,它是dbserver1.public.stock.为什么?因为数据库服务器的名称是开云体育电动老虎机dbserver1时,模式为公共我们只有一张桌子股票

所以当我们发送请求后,Debezium会开始监听开云体育官方注册网址股票表并将它们发布到dbserver1.public.stock的话题。

Kafka连接器

至此,我们有了一个PostgreSQL表股票随机的股票价格和卡夫卡的话题dbserver1.public.stock其中包含了更改。下一步是配置QuestDB Kafka连接器从dbserver1.public.stock主题,并将数据写入QuestDB。

的配置,让我们更深入地了解启动Kafka连接接收器

{"name": "questdb-connect", "config": {"topics": "dbserver1.public. "Stock ", "table": " Stock ", "connector.class": "io.questdb.kafka. "QuestDBSinkConnector”、“任务。Max ": "1", "键。转换器”:“org.apache.kafka.connect.storage。StringConverter”、“价值。转换器”:“org.apache.kafka.connect.json。JsonConverter", "host": "questdb", "transforms": "unwrap", "transforms.unwrap "。type": "io.开云体育官方注册网址 debezu .transforms. extractnewrecordstate ", "include。Key ": "false", "symbols": "symbol", "timestamp.field.name": "last_update"}}

这里需要注意的重要事项是:

  • 表格而且主题: QuestDB Kafka连接器将用这个名称创建一个QuestDB表股票并从dbserver1.public.stock主题。

  • 宿主: QuestDB Kafka连接器将连接到运行在questdb宿主这是QuestDB容器的名称。

  • connector.class: QuestDB Kafka连接器类名。这告诉Kafka Connect使用QuestDB Kafka连接器。

  • value.converter: Debe开云体育官方注册网址zium连接器生成JSON格式的数据。这就是为什么我们需要配置QuestDB连接器来使用JSON转换器来读取数据:org.apache.kafka.connect.json.JsonConverter

  • 符号:股票代码转换为QuestDB符号类型,用于低基数的字符串值(例如,枚举)。

  • timestamp.field.name:由于QuestDB对时间戳和基于时间戳的分区有很好的支持,我们可以指定指定的时间戳列。

  • 转换:展开字段的使用io.开云体育官方注册网址debezium.transforms.ExtractNewRecordState类型来提取新数据,而不是Debezium发出的元数据。开云体育官方注册网址换句话说,这是一个过滤器payload.after关于Kafka主题的部分Deb开云体育官方注册网址ezium数据。看到它的文档欲知详情。

ExtractNewRecordState转换可能是配置中最不直观的部分。简而言之,对于PostgreSQL表中的每一个变化,Debezium都会向Kafka主题发出一个JSON消息,如下所示:开云体育官方注册网址

模式这个JSON键包含Debezium消息模式。开云体育官方注册网址这与这个样本不太相关。为简洁省略。有效载荷: {之前: {id8象征reed hastings价格1544.3357414199545last_update1666172978269856}},: {版本1.9.6.Final连接器postgresql的名字dbserver1ts_ms1666172978272快照dbpostgres序列\”87397208\”\”87397208\”模式公共表格股票txId402087lsn87397208xmin},人事处uts_ms1666172978637事务

如果你被这条信息的规模所淹没,不要害怕。大多数字段都是元数据,它们与本示例无关。看到开云体育官方注册网址Debezium文档浏览详情。重要的一点是,我们不能将整个JSON消息推送到QuestDB中,也不希望QuestDB中有所有元数据。我们需要提取payload.after消息的一部分,然后才将其推到QuestDB。这正是ExtractNewRecordStatetransform does:它将大消息转换为只包含信息的小消息payload.after信息的一部分。因此,消息看起来是这样的:

id8象征reed hastings价格1544.3357414199545last_update1666172978269856

这是我们可以推送到QuestDB的消息。QuestDB Kafka连接器将读取此消息并将其写入QuestDB表。如果QuestDB表不存在,QuestDB Kafka连接器也会创建它。QuestDB表将具有与JSON消息相同的模式——其中每个JSON字段将是QuestDB表中的一列。

QuestDB和Grafana

一旦数据被写入QuestDB表,我们就可以更容易地处理时间序列数据。由于QuestDB与PostgreSQL连线协议兼容,我们可以在Grafana上使用PostgreSQL数据源来可视化数据。预配置的仪表板正在使用以下查询:

选择__time (时间戳),最小值(价格)作为低,马克斯(价格)作为高,第一(价格)作为打开,最后(价格)作为关闭股票在哪里__timeFilter (时间戳而且符号=美元的符号样本通过间隔排列日历;

我们已经创建了一个系统,它可以持续跟踪并在PostgreSQL表中存储多个股票的最新价格。然后,这些价格通过Debezium作为事件馈送给Kafka, Debezium捕捉每个价格变化。开云体育官方注册网址QuestDB Kafka连接器从Kafka读取这些事件,并将每个变化作为一个新行存储在QuestDB中,允许我们保留一个全面的股票价格历史。然后,可以使用Grafana等工具对这段历史进行分析和可视化,如蜡烛图所示。

下一个步骤

这个示例项目是一个基本参考体系结构,用于将数据从关系数据库流到优化的时间序列数据库。开云体育电动老虎机对于使用PostgreSQL的现有项目,Debezium可以配置为开始将数据流式传输到QuestD开云体育官方注册网址B,并利用时间序列查询和分区。对于同时开云体育电动老虎机存储原始历史数据的数据库,采用Debezium可能需要对架构进行一些更改。开云体育官方注册网址然而,这是有益的,因为这是一个提高性能和在事务性数据库和分析性、时间序列数据库之间建立服务边界的机会。开云体育电动老虎机

这个参考架构也可以扩展到配置Kafka Connect,也可以流到其他数据仓库进行长期存储。在检查数据之后,还可以配置QuestDB以降低数据的采样,以便进行更长期的存储分离分区以节省空间

给这个样例应用程序一试,加入QuestDB Slack社区如果你有任何问题。

Yitaek黄

Yitaek是NYDIG的软件工程师,也是QuestDB的客座撰稿人。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题