如今,为分析、报告或机器学习需求构建数据湖是一种常见的做法。

在这篇博文中,我们将描述一种构建数据湖的简单方法。该解决方案使用基于Debezium的实时数据管道,支持ACID事务、SQL更新,并且具有高度可伸缩性。开云体育官方注册网址它不需要Apache Kafka或Apache Spark应用程序来构建数据提要,从而降低了整个解决方案的复杂性。

让我们从对数据湖概念的简短描述开始数据湖通常是数据的中央存储,包括源系统数据、传感器数据、社会数据等的原始副本。您可以按原样存储数据,而不必首先处理数据,然后运行不同类型的分析。

开云体育官方注册网址Debezium服务器冰山

由于操作数据通常驻留在关系数据库或NoSQL数据存储中,问题是如何将数据传播到数据湖中。开云体育电动老虎机这就是开云体育官方注册网址Debezium服务器冰山基于Debezium和Apache Iceberg,它开云体育官方注册网址允许您处理来自源数据库的实时数据更改事件,并将它们上传到Iceberg支持的任何对象存储。开云体育电动老虎机让我们仔细看看这两个项目。

开云体育官方注册网址是一个用于变更数据捕获的开源分布式平台。开云体育官方注册网址Debezium从数据库的事务日志中提取更改事件,并通过事开云体育电动老虎机件流平台将其交付给消费者,使用不同的格式,如JSON、Apache Avro、谷歌Protocol Buffers等。大多数时候,Debezium与Apa开云体育官方注册网址che Kafka和Kafka Connect一起使用。但是通过Debe开云体育官方注册网址zium服务器,其他消息基础设施(如Kinesis、谷歌Pub/Sub、Pulsar)的用户也可以受益于Debezium的变更数据捕获功能。这里你可以看到目前支持的目的地

Apache的冰山是一个“大型分析数据集的开放表格格式”。冰山为包括Spark、Trino、PrestoDB、Flink和Hive在内的计算引擎添加了表格,使用高性能的表格格式,就像SQL表一样。”它支持ACID插入以及行级删除和更新。它提供了一个Java API来管理表元数据,比如模式和分区规范,以及存储表数据的数据文件。

阿帕奇冰山有一个概念数据和删除文件.数据文件是Iceberg在后台用来保存实际数据的文件。删除文件是对现有数据文件中删除的行进行编码的不可变文件。这就是Iceberg如何在不重写文件的情况下删除/替换不可变数据文件中的单个行。在Debezium服务器冰山的开云体育官方注册网址情况下,这些是不可变的Apache拼花文件,这种格式被设计为“与基于行文件(如CSV或TSV文件)相比,高效且高性能的平柱状数据存储格式”。

Apache冰山用户

开云体育官方注册网址Debezium服务器提供了一个SPI实现新的接收器适配器,这是用于创建Apache Iceberg消费者的扩展点。

图1。架构概述:Debezium服务器和Apache开云体育官方注册网址冰山

冰山使用者将CDC更改事件转换为冰山数据文件,并使用冰山Java API将它们提交到目标表。它将每个Debezium源开云体育官方注册网址主题映射到目标冰山表。

当没有找到给定的冰山目标表时,使用者使用变更事件模式创建它。此外,事件模式用于将更改事件本身映射到等效的Iceberg记录。正因为如此,开云体育官方注册网址debezium.format.value.schemas.enable必须设置配置选项。一旦Debeziu开云体育官方注册网址m更改事件被记录到冰山记录中,模式就会从数据中删除。

在较高的级别上,更改事件处理如下。对于每批接收到的事件:

  • 事件按目标冰山表分组;每个组包含来自单个源表的更改事件列表,共享相同的数据模式

  • 对于每个目的地,事件都转换为冰山记录

  • 冰山记录被保存为冰山数据和删除文件(仅当使用者以upsert模式运行时才创建删除文件)

  • 文件被提交到目标冰山表(即上传到目标存储)

  • 已处理的更改事件标记为已使用Debezium处理开云体育官方注册网址

下面是一个使用Debezium Server和Iceberg适配器的完整配置示例:开云体育官方注册网址

开云体育官方注册网址debezium.sink。类型=冰山#运行附加模式debezu .sink.冰山。开云体育官方注册网址upsert = fals开云体育官方注册网址e debezium.sink.iceberg。upsert-keep-deletes = tru开云体育官方注册网址e debezium.sink.iceberg。表前缀= debezium开云体育官方注册网址cdc_ debezium.sink.iceberg。table-namespace 开云体育官方注册网址= debeziumevents debezium.sink.iceberg.fs.defaultFS = s3a: / / S3_BUCKET);开云体育官方注册网址debezium.sink.iceberg。仓库= s3a: / / S3_BUCKET / iceberg_warehouse d开云体育官方注册网址ebezium.sink.iceberg。类型= hadoop d开云体育官方注册网址ebezium.sink.iceberg。目录名称= mycatalog debeziu开云体育官方注册网址m.sink.iceberg.catalog-impl = org.apache.iceberg.hadoop。HadoopCatalog #启用事件模式debezium.format.开云体育官方注册网址value.schemas。启用= true deb开云体育官方注册网址ezium.format。Value =json #复杂嵌套数据类型不支持,做事件扁平化。打开消息! debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db debezium.transforms.unwrap.delete.handling.mode=rewrite debezium.transforms.unwrap.drop.tombstones=true

上传和追加模式

默认情况下,冰山使用者以upsert模式运行(开云体育官方注册网址debezium.sink.iceberg.upsert设置为真正的).这意味着当源表中的行被更新时,目标行将被新的更新版本替换。当一行从源中删除时,它也会从目标中删除。使用upsert模式时,目标数据与源数据保持一致。upsert模式使用Iceberg相等删除特性,并使用Debezium更改数据事件的键(源自源表的主键)创建删除文件。开云体育官方注册网址为了避免重复数据,每批都执行重复数据删除,只保留记录的最后一个版本。例如,在单个事件批处理中,相同的记录可能出现两次:一次是在插入时,另一次是在更新时。使用upsert模式,记录的最后一个提取版本总是存储在Iceberg中。

注意,当源表没有定义主键,并且也没有其他方式可用的键信息(例如,在Debezium中定义的唯一键或自定义消息键)时,使用者使用开云体育官方注册网址附加此表的模式(见下文)。

使用Upsert模式保存已删除的记录

对于某些用例,将已删除的记录作为软删除保存是有用的。可以通过设置开云体育官方注册网址debezium.sink.iceberg.upsert-keep-deletes选项真正的.此设置将保留目标冰山表中已删除记录的最新版本。将其设置为false将从目标表中删除已删除的记录。

Append模式

这是最直接的操作模式,通过设置启用开云体育官方注册网址debezium.sink.iceberg.upsert.当使用带有追加模式的D开云体育官方注册网址ebezium Server Iceberg时,所有接收到的记录都被追加到目标表中。没有重复数据删除或删除记录。使用追加模式,可以分析记录的整个更改历史。

也可以使用实时事件并执行数据压缩然后用单独的压缩工作。Iceberg支持压缩数据和元数据文件以提高性能。

优化批大小

开云体育官方注册网址Debezium实时提取并交付数据库事件,开云体育电动老虎机这可能会导致对Iceberg中的表过于频繁的提交,从而生成过多的小文件。这对于批处理来说并不是最优的,特别是当接近实时的数据提要足够多时。为了避免这个问题,可以增加每次提交的批处理大小。

启用MaxBatchSizeWait模式下,Iceberg消费者使用Debezium度量来优化批处理开云体育官方注册网址大小。它周期性地检索Debezium内部事件队列的当前大小,并等待直到达到开云体育官方注册网址max.batch.size.在等待期间,Debezium事件被收集到内存开云体育官方注册网址中(在Debezium的内部队列中)。这样,每次提交(处理的事件集)处理更多的记录和一致的批处理大小。最大等待和检查间隔是通过开云体育官方注册网址debezium.sink.batch.batch-size-wait.max-wait-ms而且开云体育官方注册网址debezium.sink.batch.batch-size-wait.wait-interval-ms属性。这些设置应该与Debezium的设置一起配置开云体育官方注册网址开云体育官方注册网址debezium.source.max.queue.size而且开云体育官方注册网址debezium.source.max.batch.size属性。

下面是所有相关设置的示例:

开云体育官方注册网址debezium.sink.batch。batch-size-wait = MaxBatchSizeWai开云体育官方注册网址t debezium.sink.batch.batch-size-wait。max-wait-ms = 6000开云体育官方注册网址0 debezium.sink.batch.batch-size-wait。wait-interval-ms = 1000开云体育官方注册网址0 debezium.sink.batch.metrics.snapshot-mbean = debezium。postgres:type=connector-metrics,context=snapshot,server=testc debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc # increase max.batch.size to receive large number of events per batch debezium.source.max.batch.size=50000 debezium.source.max.queue.size=400000

创建额外的数据湖层

至此,数据湖的原始层已经加载,包括重复数据删除和接近实时的管道特性。在顶部构建策划层(有时称为分析层或数据仓库层)变得非常直接和简单。在分析层,准备原始数据以满足分析需求;通常原始数据会被重新组织、清理、版本化(参见下面的示例)、聚合,并且可能会应用业务逻辑。通过可伸缩的处理引擎使用SQL是进行这种数据转换的最常用方法。

例如,有人可以很容易地使用火花SQL(或PrestoDB, Trino, Flink等)加载一个慢慢改变维度,最常用的数据仓库表类型:

合并dwh。消费者t使用——要插入的新数据选择Customer_id, name, effective - date, to_date(9999-12-31yyyy-MM-dd作为end_date开云体育官方注册网址debezium.consumers联盟所有——更新现有记录。关闭end_date选择T.customer_id, t.name, t.实效日期,s.实效日期作为end_date开云体育官方注册网址debezium.consumerss内心的加入dwh。消费者tS.customer_id = t.customer_idt.current =真正的)S.customer_id = t.customer_ids. effecve_date = t. effecve_date——关闭最近的记录/版本。匹配然后更新t.current =, t.end_date = s.end_date——插入新版本和新数据匹配然后插入(customer_id, name, current, effecve_date, end_date)(s.name s.customer_id真正的, s. effecve_date, s.end_date);

额外的数据湖层可能需要定期使用新数据进行更新。最简单的方法是使用SQL更新或删除语句。这些SQL操作也是冰山支持

插入prod.db.table选择…;删除prod.db.table在哪里ts > =2020-05-01就是而且ts <2020-06-01就是删除prod.db.orders作为t1在哪里存在选择order_idprod.db.returned_orders在哪里t1。Order_id = Order_id;更新prod.db.all_eventssession_time =0,忽略=真正的在哪里Session_time < (选择最小值(session_time)prod.db.good_events));

总结和贡献

基于Debeziu开云体育官方注册网址m和Apache Iceberg,开云体育官方注册网址Debezium服务器冰山为您的数据湖设置低延迟的数据摄取管道非常简单。该项目完全开源,使用Apache 2.0许可证。开云体育官方注册网址Debezium Server Iceberg仍然是一个年轻的项目,还有很多需要改进的地方。请随时测试它,给予反馈,开放功能请求或发送拉请求。您可以看到更多的示例,并开始使用Iceberg和Spark进行试验这个项目

伊斯梅尔•西姆西可

Ismail是一名高级数据工程师,在数据分析领域工作超过10年。他对实时数据和机器学习应用感兴趣。他住在德国慕尼黑。


关于Debe开云体育官方注册网址zium

开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源Apache许可证,版本2.0

参与

我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题

Baidu
map