我们祝愿Debezium社区在2018年一切顺利!开云体育官方注册网址

当我们正在开发0.7.2版本时,我们认为应该发布另一篇文章,描述基于Debezium的端到端数据流用例。开云体育官方注册网址我们已经看到了如何设置到下游数据库的变更数据流开云体育电动老虎机几周前.在这篇博客文章中,我们将遵循相同的方法将数据流传输到Elasticsearch服务器利用其出色的能力对我们的数据进行全文搜索。但是为了让事情变得更有趣,我们将把数据流传输到PostgreSQL数据库和Elasticsearch,因此我们将通过SQL查询语言和全文搜索优化对数据的访问。开云体育电动老虎机

拓扑结构

这是一个图表,展示了数据是如何流经我们的分布式系统的。首先,Debezium开云体育官方注册网址 MySQL连接器不断地从MySQL数据库中捕获更改,并将每个表的更改发送到单独的Kafka主题中。开云体育电动老虎机然后是ConfluentJDBC接收器连接器就是不断地读取这些主题并将事件写入PostgreSQL数据库。开云体育电动老虎机与此同时,汇合Elasticsearch连接器持续阅读这些相同的主题,并将事件写入Elasticsearch。


图1:一个通用拓扑


我们将把这些组件部署到几个不同的流程中。在这个例子中,我们将把所有三个连接器部署到一个Kafka Connect实例中,该实例将代表所有连接器写入和读取Kafka(在生产中,您可能需要保持连接器分离以获得更好的性能)。


图2:简化的拓扑

配置

我们会用到这个Docker撰写文件用于快速部署演示。部署由以下Docker镜像组成:

  • Apache管理员

  • Apache卡夫卡

  • 一个丰富Kafka Connect / 开云体育官方注册网址Debezium图像改动如下:

    • 放入PostgreSQL JDBC驱动程序卡夫卡/ libs /目录

    • 放入的Confluent JDBC连接器卡夫卡/ / kafka-connect-jdbc连接目录

  • 预填充MySQL,如在我们的教程

  • 空PostgreSQL

  • 空Elasticsearch

Debezium源连接器、JDBC和Elasticsearch连接器的消息格式是不一样开云体育官方注册网址的,因为它们是单独开发的,并且各自关注的目标略有不同。开云体育官方注册网址Debezium发出了一个更复杂的事件结构,因此它可以捕获所有可用的信息。特别是,更改事件包含已更改记录的旧状态和新状态。另一方面,两个接收器连接器都希望得到一个简单的消息,该消息仅表示要写入的记录状态。

开云体育官方注册网址Debezium的UnwrapFromEnvelope单个消息转换(SMT)将复杂的更改事件结构分解为两个接收器连接器所期望的相同的基于行的格式,并有效地充当消息翻译在前面提到的两种格式之间。

例子

让我们直接转向我们的示例,因为在这里可以看到变化。首先,我们需要部署所有组件:

export 开云体育官方注册网址DEBEZIUM_VERSION=0.7 docker-compose up

当所有组件启动时,我们将注册Elasticsearch Sink连接器写入Elasticsearch实例。我们希望在PostgreSQL和Elasticsearch中使用相同的键(主id):

curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d @es-sink.json

我们正在使用这个注册请求:

{{的名字elastic-sink配置: {connector.classio.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max1主题客户connection.urlhttp://elastic:9200转换打开、关键transforms.unwrap.typeio.开云体育官方注册网址debezium.transforms.UnwrapFromEnvelope1transforms.key.typeorg.apache.kafka.connect.transforms.ExtractField美元关键2transforms.key.fieldid2key.ignore3.type.name客户4}}}

请求配置这些选项:

  1. 只从Debezium的更改数据消息中提取新行状态开云体育官方注册网址

  2. 提取id字段。结构体,则源和两个目的地使用相同的密钥。这是为了解决Elasticsearch连接器只支持数字类型和字符串作为键。如果我们不提取id由于密钥类型未知,连接器将过滤掉消息。

  3. 使用事件中的键,而不是生成一个合成的键

  4. 在Elasticsearch中,事件将在哪种类型下注册

接下来,我们将注册JDBC Sink连接器写入PostgreSQL数据库:开云体育电动老虎机

curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d @jdbc-sink.json

最后,必须设置源连接器:

curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d @source.json

让我们检查数据库和搜索服务器是否同步。开云体育电动老虎机所有的行客户表应该在源数据库(MySQL)以及目标数据库(Postgres)和Ela开云体育电动老虎机sticsearch中找到:

docker-compose exec mysql bash - c的mysql - u MYSQL_USER - p MYSQL_PASSWORD美元库存- e”从顾客选择* "' +------+------------+-----------+-----------------------+ | id | first_name | last_name |电子邮件  | +------+------------+-----------+-----------------------+ | 1001 sally.thomas@acme.com托马斯萨利| | | | | 1002年乔治贝利| | | gbailey@foobar.com | | 1003年爱德华·沃克| | | ed@walker.com | | 1004 | |安妮Kretchmar | annek@noanswer.org |+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"' last_name | id | first_name | email -----------+------+------------+----------------------- Thomas | 1001 | Sally | sally.thomas@acme.com Bailey | 1002 | George | gbailey@foobar.com Walker | 1003 | Edward | ed@walker.com Kretchmar | 1004 | Anne | annek@noanswer.org
curl http://localhost: 9200 /客户/ _search吗?漂亮的{“了”:42岁,“timed_out”:假的,“_shards”:{“总”:5,“成功”:5,“失败”:0},“打击”:{“总”:4,“max_score”:1.0,“支安打 " : [ { "_ 指数”:“客户”、“_type”:“客户”、“_id”:“1001”、“_score _source“:1.0:{" id ": 1001年,“first_name”:“莎莉”,“last_name”:“托马斯”、“电子邮件”:“sally.thomas@acme.com " } }, { "_ 指数”:“客户”、“_type”:“客户”、“_id”:“1004”、“_score _source“:1.0:{" id ": 1004年,“first_name”:“安妮”,“last_name”:“Kretchmar”、“电子邮件”:“annek@noanswer.org " } }, { "_ 指数”:“客户”、“_type”:“客户”、“_id”:“1002”、“_score _source“:1.0:{" id ": 1002年,“first_name”:“乔治”、“last_name”:“贝利”、“电子邮件”:“gbailey@foobar.com " } }, { "_ 指数”:“客户”、“_type”:“客户”、“_id”:“1003”、“_score _source“:1.0:{" id ": 1003年,“first_name”:“爱德华”、“last_name”:“沃克”、“电子邮件”:“ed@walker.com " } } ] } }

在连接器仍在运行的情况下,我们可以向MySQL数据库添加一个新行,然后检查它是否被复制到PostgreSQL数据库和Elasticsear开云体育电动老虎机ch中:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory' mysql> insert into customers values(默认为'John', 'Doe', 'john.doe@example.com');查询确定,影响1行(0.02秒)
docker-compose exec postgres bash - c”psql - u POSTGRES_USER POSTGRES_DB美元- c“select *客户”last_name | | id first_name |电子邮件  -----------+------+------------+----------------------- ...Doe | 1005 | John | john.doe@example.com(5行)
curl http://localhost: 9200 /客户/ _search吗?漂亮的…{“_index”:“顾客”,“_type”:“客户”、“_id”:“1005”,“_score”:1.0,“_source”:{" id ": 1005年,“first_name”:“约翰”,“last_name”:“母鹿”、“电子邮件”:“john.doe@example.com”}}…

总结

我们建立了一个复杂的流数据管道来同步一个MySQL数据库和另一个数据库以及一个Elasticsearch实例。开云体育电动老虎机我们设法在所有系统中保持相同的标识符,这允许我们在整个系统中关联记录。

几乎实时地将数据更改从主数据库传播到Elasticsearch等搜索引擎,可以实开云体育电动老虎机现许多有趣的用例。除了全文搜索的不同应用程序,例如,还可以考虑创建仪表板和各种可视化使用Kibana,以进一步了解数据。

如果你想尝试这个设置自己,只需克隆从我们的项目例子回购.如果你需要帮助,有功能需求,或者想分享你的经验,请在下面的评论中告诉我们。

雅罗西克Pechanec

Jiri是Red Hat的软件开发人员(前质量工程师)。他职业生涯的大部分时间都花在Java和系统集成项目和任务上。他住在捷克共和国布尔诺附近。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题