开云体育官方注册网址Debezium引擎

开云体育官方注册网址Debezium连接器通常由部署到卡夫卡连接服务,并配置一个或多个连接器上游数据库和监控生产数据更改事件的所有更改,他们看到的上游数据库。开云体育电动老虎机这些数据变化事件写入卡夫卡,他们可以独立使用的许多不同的应用程序。卡夫卡连接提供了出色的容错性和可伸缩性,因为它运行作为一个分布式服务并确保所有注册和配置连接器总是运行。例如,即使一个卡夫卡连接的端点在集群下降,其余卡夫卡连接端点将重启任何以前的连接器目前已被终止端点上运行,最大限度地减少停机时间和消除行政活动。

不是每个应用程序都需要这种程度的容错性和可靠性,他们可能不希望依赖外部卡夫卡经纪人和卡夫卡连接服务的集群。相反,有些应用程序会更喜欢嵌入开云体育官方注册网址Debezium连接器直接在应用程序空间。他们还想要相同的数据改变事件,但希望连接器将直接发送到应用程序而不是坚持他们在卡夫卡。

开云体育官方注册网址debezium-api模块定义了一个小的API,允许应用程序很容易配置和运行Debezium连接器使用Debezium引擎。开云体育官方注册网址

依赖关系

使用Debez开云体育官方注册网址ium引擎模块,添加开云体育官方注册网址debezium-api模块应用程序的依赖项。有一个开箱即用的API实现开云体育官方注册网址debezium-embedded应该添加模块的依赖关系。对于Maven,这需要在您的应用程序中添加以下的POM:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-api < / artifactId > <版本> $ {version.debezium} < /版本> < /依赖> <依赖> < groupId > io.debezium < / groupId > < artifactId > debezium-embedded < / artifactId > <版本> $ {version.debezium} < /版本> < /依赖>

在哪里$ {version开云体育官方注册网址.debezium}Debezium你使用的版本或Maven属性的值包开云体育官方注册网址含Debezium版本字符串。

同样地,添加依赖关系的每个Debezium连接器应用程序将使用。开云体育官方注册网址例如,下面可以添加到您的应用程序的Maven POM文件,所以您的应用程序可以使用MySQL连接器:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-connector-mysql < / artifactId > <版本> $ {version.debezium} < /版本> < /依赖>

或MongoDB连接器:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-connector-mongodb < / artifactId > <版本> $ {version.debezium} < /版本> < /依赖>

本文的剩余部分描述了在应用程序中嵌入MySQL连接器。其他连接器中使用类似的方式,除了连接器特有配置,主题和事件。

在代码中

您的应用程序需要建立嵌入式引擎为每个您想要运行连接器实例。的io.开云体育官方注册网址debezium.engine.DebeziumEngine < R >类是一个易于使用的包装任何Debezium连接器,连接器完全管理的生命周期。开云体育官方注册网址您将创建开云体育官方注册网址DebeziumEngine实例利用其构建器API,提供以下东西:

  • 你想接收消息的格式,例如JSON, Avro或作为卡夫卡连接SourceRecord(见输出消息格式)

  • 配置属性(可能从一个属性文件加载),定义了环境对发动机和连接器

  • 这种方法将呼吁每个数据更改事件产生的连接器

这里有一个例子的代码配置并运行嵌入式MySQL连接器:

/ /定义Debezium引擎的配置与MySQL连接器…开云体育官方注册网址最终属性道具= config.asProperties ();道具。setProperty(“名字”,“引擎”);props.setProperty(“抵消。存储”、“org.apache.kafka.connect.storage.FileOffsetBackingStore”);props.setProperty (“offset.storage.file。文件名”、“/ tmp / offsets.dat”);props.setProperty (“offset.flush.interval.ms”、“60000”);/ *开始连接器属性* / props.setProperty(“数据库。开云体育电动老虎机主机名”、“localhost”);props.setProperty(“开云体育电动老虎机数据库。港”,“3306”); props.setProperty("database.user", "mysqluser"); props.setProperty("database.password", "mysqlpw"); props.setProperty("database.server.id", "85744"); props.setProperty("topic.prefix", "my-app-connector"); props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory"); props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat"); // Create the engine with this configuration ... try (DebeziumEngine> engine = DebeziumEngine.create(Json.class) .using(props) .notifying(record -> { System.out.println(record); }).build() ) { // Run the engine asynchronously ... ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); // Do something else or wait for a signal or an event } // Engine is stopped when the main code is finished

让我们更详细地看看这段代码,首先我们在这里重复的前几行:

/ /定义Debezium引擎的配置与MySQL连接器…开云体育官方注册网址最终属性道具= config.asProperties ();道具。setProperty(“名字”,“引擎”);props.setProperty(“连接器。类”、“io.debez开云体育官方注册网址ium.connector.mysql.MySqlConnector”);props.setProperty(“抵消。存储”、“org.apache.kafka.connect.storage.FileOffsetBackingStore”);props.setProperty (“offset.storage.file。文件名”、“/ tmp / offsets.dat”);props.setProperty (“offset.flush.interval.ms ", 60000);

这将创建一个新标准属性对象设置几个字段所需的发动机不管使用的是哪一种连接器。首先是引擎的名称将在源记录由使用连接器和其内部状态,所以在应用程序中使用一些有意义的事情。的connector.class字段定义了类的名称,扩展了卡夫卡连接org.apache.kafka.connect.source.SourceConnector抽象类;在这个示例中,我们指定Debezium开云体育官方注册网址MySqlConnector类。

当卡夫卡连接的连接器,它读取信息从源和定期记录“抵消”定义多少的信息处理。连接器应该重新启动,它将使用最后一个记录抵消知道源信息应该阅读简历。由于连接器不知道或关心如何偏移量存储,它是由发动机提供一种方式来存储和恢复这些偏移量。接下来的几个字段的配置指定我们的引擎应该使用FileOffsetBackingStore类存储的补偿/ /存储/ offset.dat /路径本地文件系统上的文件(可以任意命名这个文件和存储任何地方)。此外,尽管连接器产生的偏移量与每一个原始记录》,记录发动机定期刷新补偿支持存储(在我们的例子中,每分钟一次)。这些字段为您的应用程序可以根据需要定制。

接下来的几行定义字段特定连接器(per-connector文档记录),在我们的示例中是MySqlConnector连接器:

/ *开始连接器属性* / props.setProperty(“数据库。开云体育电动老虎机主机名”、“localhost”) props.setProperty(“数据库。开云体育电动老虎机港”,“3306”)props.setProperty(“数据库。开云体育电动老虎机用户”、“mysqluser”) props.setProperty(“数据库开云体育电动老虎机。密码”、“mysqlpw”) props.setProperty (“databa开云体育电动老虎机se.server。id”,“85744”) props.setProperty(“主题。前缀”、“my-app-connector”) props.setProperty props.setProperty (“schema.history.i开云体育官方注册网址nternal”、“io.debezium.storage.file.history.FileSchemaHistory”) (“schema.history.internal.file.filename”、“/道路/ /存储/ schemahistory.dat”)

这里,我们设置的主机名称和端口号的MySQL数据库服务器正在运行,我们定义用户名和密码将用于连接到MySQL数据库。开云体育电动老虎机注意,MySQL用户名和密码应该对应一个MySQL数据库MySQL用户被授予以下权限:开云体育电动老虎机

  • 选择

  • 重新加载

  • 显示数据库开云体育电动老虎机

  • 复制的奴隶

  • 复制客户端

前三个权限需要阅读时的一个一致的快照数据库。开云体育电动老虎机最后两个特权允许数据库读取服务器的binlog通常用于MySQL复制开云体育电动老虎机。

的配置还包括一个数字标识符server.id。由于MySQL的binlog MySQL复制机制的一部分,以阅读binlogMySqlConnector实例必须加入MySQL服务器组,这意味着这个服务器ID必须独特的内所有进程的MySQL服务器组1和2之间的任何整数321。在我们的代码,我们将它设置为一个相当大的但是有些随机值我们将为我们的应用程序只使用。

的配置还指定了一个逻辑名MySQL服务器。连接器包括这个逻辑名称主题领域内的每个源记录它产生,使应用程序能够辨别这些记录的起源。我们的示例使用一个服务器名称的“产品”,大概是因为数据库包含产品信息。开云体育电动老虎机当然,你可以命名任何有意义的应用程序。

MySqlConnector类运行时,它读取MySQL服务器的binlog,包括所有数据更改和模式变化主办的数据库服务器。开云体育电动老虎机因为所有数据更改结构的拥有表的模式变化时被记录,连接器需要跟踪所有的模式变化,以便它可以正确解码更改事件。连接器记录模式信息,这样,连接器应该重启和恢复阅读上次记录的偏移量,它知道什么数据库模式看起来像抵消。开云体育电动老虎机连接器如何记录中定义的数据库模式历史是我们最后的两个字段配置开云体育电动老虎机,即我们的连接器应该使用吗FileSchemaHistory类来存储数据库模式的变化历史开云体育电动老虎机/ /存储/ schemahistory.dat /路径本地文件系统上的文件(这个文件可以在任何地方叫什么和存储)。

最后,建立使用不可变的配置build ()方法。(顺便说一下,而不是构建以编程的方式,我们可以配置从一个属性文件使用的一个Configuration.read (…)方法。)

现在我们已经配置,我们可以创建我们的引擎。这里是相关的代码:

/ /创建的引擎配置…试(Deb开云体育官方注册网址eziumEngine < ChangeEvent <字符串,字符串> >引擎= DebeziumEngine.create (Json.class)型(道具).notifying(记录- > {System.out.println(记录);}).build ()) {}

所有更改事件将被传递到处理程序方法,它必须匹配的签名java.util.function.Consumer < R >功能界面,< R >当调用指定必须匹配的类型格式吗create ()。注意,应用程序的处理函数不应该抛出任何异常;如果是这样,引擎将日志抛出的任何异常的方法,将继续操作接下来的原始记录,但您的应用程序不会有另一个机会来处理特定的源记录导致异常,意味着您的应用程序可能会与数据库不一致。开云体育电动老虎机

在这一点上,我们有一个现有的开云体育官方注册网址DebeziumEngine对象配置和准备好运行,但它不会做任何事情。的开云体育官方注册网址DebeziumEngine被设计成异步执行的吗遗嘱执行人ExecutorService:

/ /异步运行引擎…ExecutorService执行人= Executors.newSingleThreadExecutor ();executor.execute(引擎);/ /做其他事情或等待一个信号或一个事件

您的应用程序可以阻止发动机安全、优雅地通过调用它close ()方法:

/ /稍后…engine.close ();

或作为引擎支持Closeable接口将被称为时自动试一试块了。

引擎的连接器将停止阅读信息从源系统,提出你所有剩余的更改事件处理函数,并刷新最新offets抵消存储。只有这一切完成之后将发动机的run ()方法返回。如果应用程序需要等待引擎完全停止在退出前,你可以这样做ExcecutorService关闭awaitTermination方法:

尝试{executor.shutdown ();而(!遗嘱执行人。awaitTermination(5, TimeUnit.SECONDS)) { logger.info("Waiting another 5 seconds for the embedded engine to shut down"); } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); }

或者你可以注册CompletionCallback在创建开云体育官方注册网址DebeziumEngine作为一个回调时要通知引擎终止。

回想一下,当JVM关闭,只等待守护进程线程。因此,如果您的应用程序退出时,一定要等待完成发动机或者运行引擎在一个守护线程。

您的应用程序应该适当停止发动机,确保优雅和完全关闭,每个源记录发送到应用程序的一个时间。例如,不依赖关闭ExecutorService,因为这中断正在运行的线程。虽然开云体育官方注册网址DebeziumEngine确实会终止它的线程被中断时,引擎不会终止干净,当重新启动您的应用程序可能会看到一些相同的源记录,处理前就关闭。

输出消息格式

开云体育官方注册网址DebeziumEngine # create ()可以接受多个不同参数影响消费者在收到消息的格式。允许的值是:

  • Connect.class——输出值变化事件包装卡夫卡的连接SourceRecord

  • Json.class——输出值是一对键和值编码JSON字符串

  • JsonByteArray.class——输出值是一对键和值的格式JSON和编码为utf - 8字节数组

  • Avro.class——输出值是一对键和值编码为Avro序列化记录(见Avro序列化更多细节)

  • CloudEvents.class——输出值是一对键和值编码云事件消息

当调用指定的标题格式也可以开云体育官方注册网址DebeziumEngine # create ()。允许的值是:

  • Json.class——头值进行编码JSON字符串

  • JsonByteArray.class——头值都格式化JSON和编码为utf - 8字节数组

在内部,引擎使用适当的卡夫卡连接转换器实现转换的委托。转换器可以使用发动机参数化属性来修改其行为。

的一个例子JSON输出格式是

最终属性道具= new属性();…props.setProperty (“converter.schemas。启用”、“假”);/ /不包括模式消息……最后Debe开云体育官方注册网址ziumEngine < ChangeEvent <字符串,字符串> >引擎= DebeziumEngine.create (Json.class)型(道具).notifying((记录,提交者)- > {(ChangeEvent <字符串,字符串> r:记录){system . out。println(“关键=”+ r.key () + " " value = ' " + r.value () + " ' ");committer.markProcessed (r);}……

在哪里ChangeEvent数据类型是键/值对。

消息转换

之前的消息交付给处理程序可以运行它们通过卡夫卡的管道连接简单的信息转换(SMT)。每个贴片可以通过信息不变,修改它或过滤出来。链式配置使用属性转换。属性包含一个以逗号分隔的逻辑名称应用的转换。属性变换。< logical_name > .type然后定义的名称为每个转换和实现类变换。< logical_name >。*传递到转换的配置选项。

配置的一个示例

最终属性道具= new属性();…道具。setProperty(“转换”、“过滤路由器”);/ / (1)props.setProperty (“transforms.router。类型”、“org.apache.kafka.connect.transforms.RegexRouter”);/ / (2)props.setProperty (“transforms.router。正则表达式"、" (. *)");/ / (3)props.setProperty (“transforms.router。替换”、“扶轮基金会1美元”);/ / (3)props.setProperty (“transforms.filter。类型”、“io.deb开云体育官方注册网址ezium.embedded.ExampleFilterTransform”); // (4)
  1. 定义了两个转换-过滤器路由器

  2. 的实现路由器转换org.apache.kafka.connect.transforms.RegexRouter

  3. 路由器转换——有两个配置选项正则表达式更换

  4. 的实现过滤器转换io.开云体育官方注册网址debezium.embedded.ExampleFilterTransform

消息转换谓词

谓词可以应用到转换的转换可选的。

配置的一个示例

最终属性道具= new属性();…道具。setProperty(“转换”、“过滤器”);/ /(1)道具。setProperty(“谓词”、“headerExists”);/ / (2)props.setProperty (“predicates.headerExists。类型”、“org.apache.kafka.connect.transforms.predicates.HasHeaderKey”);/ / (3)props.setProperty (“predicates.headerExists.name”、“header.name”);/ / (4)props.setProperty (“transforms.filter。type", "io.debezium.embedded.ExampleFilterTransform");// (5) props.setProperty("transforms.filter.predicate", "headerExists"); // (6) props.setProperty("transforms.filter.negate", "true"); // (7)
  1. 一个转换,定义过滤器

  2. 一个谓词定义-headerExists

  3. 的实现headerExists谓词是org.apache.kafka.connect.transforms.predicates.HasHeaderKey

  4. headerExists谓语有一个配置选项的名字

  5. 的实现过滤器转换io.开云体育官方注册网址debezium.embedded.ExampleFilterTransform

  6. 过滤器转换需要谓词headerExists

  7. 过滤器转换预计值的谓词是否定,使谓词确定标题并不存在

先进的消费记录

对于一些用例,例如当试图写记录批次或反对一个异步API,上述功能界面可能是一个挑战。在这些情况下,它可能是更容易使用io.开云体育官方注册网址debezium.engine.DebeziumEngine.ChangeConsumer < R >。接口。

这个接口有单一功能与以下签名:

/ * * *处理一批记录,调用{@link RecordCommitter # markProcessed(对象)}*为每一个记录和{@link RecordCommitter # markBatchFinished()}当这批处理完成。* @param记录要处理的记录* @param提交者提交者,指示系统,我们完成* /空白handleBatch (< R >记录列表,RecordCommitter < R >提交者)抛出InterruptedException;

在Javadoc中提到,RecordCommitter对象是呼吁每个记录,每批处理完成。的RecordCommitter接口是线程安全的,这允许灵活处理的记录。

您可以选择覆盖的补偿处理的记录。这是通过首先建立一个新的偏移量对象通过调用RecordCommitter # buildOffsets (),更新补偿补偿#集(字符串键,对象值),然后调用RecordCommitter # markProcessed (SourceRecord记录,抵消sourceOffsets),更新偏移量

使用ChangeConsumerAPI,您必须通过接口的一个实现通知API,如所示:

类MyChangeConsumer实现DebeziumEngine。开云体育官方注册网址ChangeConsumer> { public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { ... } } // Create the engine with this configuration ... DebeziumEngine> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) .using(props) .notifying(new MyChangeConsumer()) .build();

如果使用JSON格式(一个等效为其他格式)工作的代码看起来像:

类JsonChangeConsumer实现DebeziumEngine。开云体育官方注册网址ChangeConsumer> { public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { ... } } // Create the engine with this configuration ... DebeziumEngine> engine = DebeziumEngine.create(Json.class) .using(props) .notifying(new MyChangeConsumer()) .build();

发动机性能

以下配置属性要求除非一个默认值是可用的(为了文本格式替换Java类的包名<…>)。

财产

默认的

描述

的名字

独特的连接器实例的名称。

connector.class

连接器的Java类的名称,例如<…> .MySqlConnectorMySQL连接器。

offset.storage

<…> .FileOffsetBackingStore

Java类的名称负责持久性连接器的补偿。它必须实现<…> .OffsetBackingStore接口。

offset.storage.file.filename

”“

路径文件偏移量的存储。时需要offset.storage设置为<…> .FileOffsetBackingStore

offset.storage.topic

”“

卡夫卡的名称主题偏移量在哪里存储。时需要offset.storage设置为<…> .KafkaOffsetBackingStore

offset.storage.partitions

”“

在创建使用的分区数量抵消存储主题。时需要offset.storage设置为<…> .KafkaOffsetBackingStore

offset.storage.replication.factor

”“

复制因子创建抵消存储主题时使用。时需要offset.storage设置为<…> .KafkaOffsetBackingStore

offset.commit.policy

<…> .PeriodicCommitOffsetPolicy

提交政策的Java类的名称。它定义当补偿提交必须基于触发事件处理的数量和时间自上次提交。这个类必须实现的接口<…> .OffsetCommitPolicy。默认是一个周期性的堪萨斯州策略基于时间间隔。

offset.flush.interval.ms

60000年

尝试提交补偿间隔。默认是1分钟。

offset.flush.timeout.ms

5000年

最大等待的毫秒数记录冲洗和分区抵消数据提交,以抵消存储之前取消和恢复过程偏移量数据在未来的尝试。默认是5秒。

errors.max.retries

1

在连接错误重试失败的最大数目(1 =没有限制,0 =禁用,> 0 = num重试)。

errors.retry.delay.initial.ms

300年

初始延迟(女士)重试当遇到连接错误。这个值将会在每一次重试但不会超过时翻了一倍errors.retry.delay.max.ms

errors.retry.delay.max.ms

10000年

马克斯重试之间的延迟时间(女士)当遇到连接错误。

开云体育电动老虎机数据库模式历史属性

一些连接器还需要额外的组属性,配置数据库模式历史:开云体育电动老虎机

  • MySQL

  • SQL Server

  • 甲骨文

  • Db2

没有适当的配置数据库模式历史连接器将拒绝开始。开云体育电动老虎机缺省配置预计卡夫卡集群可用。对于其他的部署,基于文件的数据库模式存储实现的历史。开云体育电动老虎机

财产 默认的 描述

schema.history.internal

<…> .KafkaSchemaHistory

Java类的名称负责持久性数据库模式的历史。开云体育电动老虎机
它必须实现<…> .SchemaHistory接口。

schema.history.internal.file.filename

”“

通往一个数据库模式历史存储的文件。开云体育电动老虎机
时需要schema.history.internal设置为<…> .FileSchemaHistory

schema.history.internal.kafka.topic

”“

卡夫卡主题数据库模式历史存储的地方。开云体育电动老虎机
时需要schema.history.internal设置为<…> .KafkaSchemaHistory

schema.history.internal.kafka.bootstrap.servers

”“

最初的卡夫卡集群服务器连接的列表。集群提供了主题存储数据库模式的历史。开云体育电动老虎机
时需要schema.history.internal设置为<…> .KafkaSchemaHistory

处理失败

当引擎执行时,它的连接器是积极记录源抵消内部每个源记录,和引擎定期冲洗这些补偿持久存储。当应用程序和发动机正常关闭或崩溃,当他们重新启动引擎和它的连接器将恢复阅读源信息从最后一个记录的偏移量

所以,当嵌入式引擎运行时应用程序失败?净效应是,应用程序可能会获得一些源记录重新启动后,它已经在崩溃前处理。多少取决于发动机的频率将抵消其存储(通过吗offset.flush.interval.ms属性)和多少源记录特定连接器返回一批。最好的情况是抵消每次都刷新(例如,offset.flush.interval.ms设置为0),但即使这样嵌入式引擎将仍然只冲洗后补偿来源记录收到的每一批连接器。

例如,MySQL连接器使用max.batch.size指定源记录的最大数量,可以出现在一个批处理。即使有offset.flush.interval.ms设置为0,当应用程序启动时撞击后可以看到吗n重复,n是批量的大小。如果offset.flush.interval.ms属性设置较高,则应用程序可能会看到n * m重复,n的最大大小批量和吗批次的数量可能会积累在一个抵消冲洗间隔。(显然可以配置嵌入式连接器使用批处理,总是冲偏移,导致应用程序从不接受任何重复记录来源。然而,这大大增加了开销,减少连接器的吞吐量)。

底线是使用嵌入式连接器时,应用程序将接收每个源记录完全一旦在正常操作(包括优雅的关机后重启),但需要宽容接收复制事件后立即崩溃或不当关闭后重新启动。如果应用程序需要更严格的只有一次行为,那么他们应该使用完整Debezium平台,可以提供只有一次担保(甚至崩溃后重启)。开云体育官方注册网址