开云体育官方注册网址Debezium引擎

开云体育官方注册网址Debezium连接器通常通过将它们部署到Kafka Connect服务,并配置一个或多个连接器来监视上游数据库,并为它们在上游数据库中看到的所有更改产生数据更改事件来操作。开云体育电动老虎机这些数据更改事件被写入Kafka,在那里它们可以被许多不同的应用程序独立使用。Kafka Connect提供了出色的容错性和可伸缩性,因为它作为分布式服务运行,并确保所有注册和配置的连接器始终在运行。例如,即使集群中的一个Kafka Connect端点宕机,其余的Kafka Connect端点将重新启动之前在现在终止的端点上运行的任何连接器,最大限度地减少停机时间并消除管理活动。

并不是每个应用程序都需要这种级别的容错和可靠性,它们可能不希望依赖于Kafka代理和Kafka Connect服务的外部集群。相反,有些应用程序更愿意这样做嵌入开云体育官方注册网址Debezium连接器直接在应用程序空间中。他们仍然想要相同的数据更改事件,但更喜欢让连接器直接将它们发送到应用程序,而不是将它们持久化在Kafka中。

开云体育官方注册网址debezium-api模块定义了一个小API,允许应用程序使用Debezium Engine轻松配置和运行Debezium连接器。开云体育官方注册网址

依赖关系

要使用Debe开云体育官方注册网址zium Engine模块,请添加开云体育官方注册网址debezium-api模块到应用程序的依赖项。中有这个API的一个开箱即用的实现开云体育官方注册网址debezium-embedded模块也应该添加到依赖项中。对于Maven,这需要将以下内容添加到应用程序的POM中:

 io.开云体育官方注册网址debezium debezium-api ${version.debezium}   io.debezium debezium-embedded ${version.debezium} 

在哪里$ {version开云体育官方注册网址.debezium}要么是您正在使用的Debezium版本,要么是Ma开云体育官方注册网址ven属性,其值包含Debezium版本字符串。

同样,为应用程序将使用的每个Debezium连接器添加依赖项。开云体育官方注册网址例如,可以将以下内容添加到应用程序的Maven POM文件中,以便应用程序可以使用MySQL连接器:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-connector-mysql < / artifactId > <版本> $ {version.debezium} < /版本> < / >的依赖

或者对于MongoDB连接器:

 io.开云体育官方注册网址debezium  debezu -connector-mongodb ${version.debezium}  .debezium . io.debezium

本文档的其余部分描述在应用程序中嵌入MySQL连接器。除了特定于连接器的配置、主题和事件外,其他连接器也以类似的方式使用。

守则内

应用程序需要为要运行的每个连接器实例设置一个嵌入式引擎。的io.开云体育官方注册网址debezium.engine.DebeziumEngine < R >类可以作为任何Debezium连接器的易于使用的包装器,并完全管理连接器的生命周期。开云体育官方注册网址创建开云体育官方注册网址DebeziumEngine实例使用它的构建器API,提供以下内容:

  • 您想要接收消息的格式,例如JSON, Avro或Kafka ConnectSourceRecord(见输出消息格式

  • 配置属性(可能从属性文件加载)定义引擎和连接器的环境

  • 连接器产生的每个数据更改事件都将调用该方法

下面是一个配置和运行嵌入式程序的代码示例MySQL连接器

//定义Debezium引擎与MySQL连接器的配置…开云体育官方注册网址final Properties props = config.asProperties();道具。setProperty(“名字”,“引擎”);props.setProperty(“抵消。存储”、“org.apache.kafka.connect.storage.FileOffsetBackingStore”);props.setProperty(“offset.storage.file。文件名”、“/ tmp / offsets.dat”);props.setProperty(“offset.flush.interval.ms”、“60000”);*/ props.setProperty("database. setproperty ")开云体育电动老虎机主机名”、“localhost”);props.setProperty(“开云体育电动老虎机数据库。港”,“3306”); props.setProperty("database.user", "mysqluser"); props.setProperty("database.password", "mysqlpw"); props.setProperty("database.server.id", "85744"); props.setProperty("topic.prefix", "my-app-connector"); props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory"); props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat"); // Create the engine with this configuration ... try (DebeziumEngine> engine = DebeziumEngine.create(Json.class) .using(props) .notifying(record -> { System.out.println(record); }).build() ) { // Run the engine asynchronously ... ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); // Do something else or wait for a signal or an event } // Engine is stopped when the main code is finished

让我们更详细地研究这段代码,从我们在这里重复的前几行开始:

//定义Debezium引擎与MySQL连接器的配置…开云体育官方注册网址final Properties props = config.asProperties();道具。setProperty(“名字”,“引擎”);props.setProperty(“connector.class”、“io.d开云体育官方注册网址ebezium.connector.mysql.MySqlConnector”);props.setProperty(“抵消。存储”、“org.apache.kafka.connect.storage.FileOffsetBackingStore”);props.setProperty(“offset.storage.file。文件名”、“/ tmp / offsets.dat”);props.setProperty(“offset.flush.interval.ms ", 60000);

这创造了一个新的标准属性对象设置引擎所需的几个字段,而不管使用哪个连接器。第一个是将在连接器产生的源记录及其内部状态中使用的引擎名称,因此在应用程序中使用一些有意义的东西。的connector.classfield定义了扩展Kafka Connect的类的名称org.apache.kafka.connect.source.SourceConnector抽象类;在这个例子中,我们指定Debezium的开云体育官方注册网址MySqlConnector类。

当Kafka Connect连接器运行时,它从源读取信息,并定期记录“偏移量”,这定义了它处理了多少信息。如果重新启动连接器,它将使用最后记录的偏移量来确定应该在源信息的哪个位置恢复读取。因为连接器不知道也不关心如何偏移量是存储的,这取决于引擎提供一种存储和恢复这些偏移量的方法。配置的接下来几个字段指定引擎应该使用FileOffsetBackingStore类中存储偏移量/ /存储/ offset.dat /路径本地文件系统上的文件(该文件可以命名为任何名称并存储在任何位置)。此外,尽管连接器记录它产生的每个源记录的偏移量,但引擎会定期将偏移量刷新到备份存储区(在我们的示例中,每分钟刷新一次)。这些字段可以根据应用程序的需要进行调整。

接下来的几行定义特定于连接器的字段(在每个连接器文档中记录),在我们的示例中是MySqlConnector连接器:

*/ props.setProperty("database. setproperty ")开云体育电动老虎机hostname", "localhost") props.setProperty("开云体育电动老虎机数据库。port", "3306") props.setProperty("开云体育电动老虎机数据库。props.setProperty("database. user")开云体育电动老虎机props.setProperty("database.server. passw开云体育电动老虎机ord", "mysqlpw")props.setProperty("topic. id", "85744")prop . setproperty ("schema.history.internal", "io.debezium.storage.file.histo开云体育官方注册网址ry.FileSchemaHistory") props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat")

在这里,我们设置了运行MySQL数据库服务器的主机名和端口号,并定义了将用于连接MySQL数据库的用户名和密码。开云体育电动老虎机注意,对于MySQL,用户名和密码应该对应于已被授予以下MySQL权限的MySQL数据库用户:开云体育电动老虎机

  • 选择

  • 重新加载

  • 显示数据库开云体育电动老虎机

  • 复制的奴隶

  • 复制客户端

在读取数据库的一致快照时,需要前三个特权。开云体育电动老虎机最后两个特权允许数据库读取服务器的binlog(通常用于MySQL复开云体育电动老虎机制)。

属性的数字标识符server.id.由于MySQL的binlog是MySQL复制机制的一部分,为了读取binlog的MySqlConnector实例必须加入MySQL服务器组,这意味着这个服务器ID必须是在组成MySQL服务器组的所有进程中是唯一的是1到2之间的任何整数321。在我们的代码中,我们将它设置为一个相当大但有点随机的值,只用于我们的应用程序。

该配置还为MySQL服务器指定了一个逻辑名称。连接器将此逻辑名称包含在它所产生的每个源记录的主题字段中,使您的应用程序能够识别这些记录的起源。我们的示例使用服务器名“products”,可能是因为数据库包含产品信息。开云体育电动老虎机当然,您可以将其命名为对应用程序有意义的任何名称。

MySqlConnector类运行时,它会读取MySQL服务器的binlog,其中包括对服务器托管的数据库所做的所有数据更改和模式更改。开云体育电动老虎机由于对数据的所有更改都是根据记录更改时所属表的模式进行结构化的,因此连接器需要跟踪所有模式更改,以便正确地解码更改事件。连接器记录模式信息,这样,如果连接器重新启动并恢复从最后记录的偏移量读取数据,它就确切地知道该偏移量处的数据库模式是什么样子。开云体育电动老虎机连接器如何记录数据库模式历史是在配置的最后两个字段中定义的,开云体育电动老虎机即连接器应该使用FileSchemaHistory类中存储数据库模式历史记录更改开云体育电动老虎机/ /存储/ schemahistory.dat /路径本地文件系统上的文件(同样,该文件可以命名为任何名称并存储在任何位置)。

方法构建不可变配置build ()方法。(顺便说一句,我们可以不通过编程来构建它属性文件中的配置Configuration.read(…)方法。)

现在我们有了一个配置,我们可以创建我们的引擎。下面是相关的代码行:

//创建引擎try (开云体育官方注册网址DebeziumEngine>引擎= DebeziumEngine.create(Json.class) .using(props) .notify (record -> {System.out.println(record);}) .build()) {}

类的签名匹配的给定处理程序方法将传递所有更改事件java.util.function.Consumer < R >功能接口,其中< R >必须与调用时指定的格式类型匹配create ().注意,应用程序的处理函数不应该抛出任何异常;如果是这样,引擎将记录该方法抛出的任何异常,并继续对下一个源记录进行操作,但您的应用程序将没有机会再处理导致异常的特定源记录,这意味着您的应用程序可能与数据库不一致。开云体育电动老虎机

在这一点上,我们有一个开云体育官方注册网址DebeziumEngine对象,该对象已配置并准备运行,但它不执行任何操作。的开云体育官方注册网址DebeziumEngine被设计为异步执行遗嘱执行人ExecutorService

//异步运行引擎…ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(引擎);//做点别的事情,或者等待一个信号或事件

您的应用程序可以通过调用它的close ()方法:

//在稍后的时间…engine.close ();

或者作为引擎支撑Closeable接口时自动调用试一试Block在左边。

引擎的连接器将停止从源系统读取信息,将所有剩余的更改事件转发给处理程序函数,并将最新的偏移量刷新到偏移量存储中。只有在所有这些完成后,引擎的run ()方法返回。如果应用程序需要等待引擎完全停止才能退出,则可以使用ExcecutorService关闭而且awaitTermination方法:

尝试{executor.shutdown();而(!遗嘱执行人。awaitTermination(5, TimeUnit.SECONDS)) { logger.info("Waiting another 5 seconds for the embedded engine to shut down"); } } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); }

或者你也可以注册CompletionCallback在创建开云体育官方注册网址DebeziumEngine作为引擎终止时通知的回调。

回想一下,当JVM关闭时,它只等待守护线程。因此,如果应用程序退出,请确保等待引擎完成,或者在守护进程线程上运行引擎。

您的应用程序应该始终正确地停止引擎,以确保优雅和完整的关闭,并且每个源记录只发送给应用程序一次。例如,不要依赖于关闭ExecutorService,因为这会中断正在运行的线程。虽然开云体育官方注册网址DebeziumEngine当它的线程被中断时,引擎确实会终止,引擎可能不会干净地终止,当应用程序重新启动时,它可能会看到一些在关闭之前处理过的相同的源记录。

输出消息格式

开云体育官方注册网址DebeziumEngine # create ()可以接受多个不同的参数,这些参数会影响使用者接收消息的格式。允许取值为:

  • Connect.class-输出值是Kafka Connect的更改事件包装SourceRecord

  • Json.class-输出值是一对键和值编码为JSON字符串

  • Avro.class-输出值是一对键和值编码为Avro序列化记录(见Avro序列化有关详情)

  • CloudEvents.class-输出值是一对键和值编码为云事件消息

在内部,引擎使用适当的Kafka Connect转换器实现,转换被委托给它。转换器可以参数化,使用引擎属性来修改其行为。

一个例子JSON输出格式为

final Properties props = new Properties();...props.setProperty(“converter.schemas。启用”、“假”);// message中不包含schema…final 开云体育官方注册网址DebeziumEngine> engine = DebeziumEngine.create(Json.class) .using(props) .notifying((records, committer) -> {for (ChangeEvent r: records) {System.out. String>> engine = DebeziumEngine.create(Json.class) .using(props) .notifying((records, committer) -> {println(“关键= " + r.key () + " ' value = " + r.value () + "'");committer.markProcessed (r);}……

在哪里ChangeEventDatatype是键/值对。

消息转换

在消息被传递到处理器之前,可以通过Kafka Connect的管道来运行它们简单消息转换(SMT)。每个SMT可以不加修改地传递消息、修改消息或过滤消息。链是使用属性配置的转换.属性包含要应用的转换的逻辑名称的逗号分隔列表。属性变换。< logical_name > .type然后为每个转换和定义实现类的名称变换。< logical_name >。*传递给转换的配置选项。

配置的一个例子是

final Properties props = new Properties();...道具。setProperty(“转换”,“过滤器,路由器”);// (1) props.setProperty("transforms.router. properties ")类型”、“org.apache.kafka.connect.transforms.RegexRouter”);// (2) props.setProperty("transforms.router. properties ")正则表达式 ", "(.*)");// (3) props.setProperty("transforms.router. properties ")替换”、“扶轮基金会1美元”);// (3) props.setProperty("transforms.filter. "类型”、“io.deb开云体育官方注册网址ezium.embedded.ExampleFilterTransform”); // (4)
  1. 定义了两个变换-过滤器而且路由器

  2. 实施路由器转换org.apache.kafka.connect.transforms.RegexRouter

  3. 路由器转换有两个配置选项—正则表达式而且更换

  4. 实施过滤器转换io.开云体育官方注册网址debezium.embedded.ExampleFilterTransform

消息转换谓词

可以将谓词应用于转换,使转换成为可选的。

配置的一个例子是

final Properties props = new Properties();...道具。setProperty(“转换”、“过滤器”);// (1)setProperty(“谓词”、“headerExists”);// (2) props.setProperty("predicates.headerExists. "类型”、“org.apache.kafka.connect.transforms.predicates.HasHeaderKey”);//(3) props.setProperty(" predicments . headerexists .name", "header.name");// (4) props.setProperty("transforms.filter. "type", "io.debezium.embedded.ExampleFilterTransform");// (5) props.setProperty("transforms.filter.predicate", "headerExists"); // (6) props.setProperty("transforms.filter.negate", "true"); // (7)
  1. 一个变换被定义为-过滤器

  2. 一个谓词被定义为-headerExists

  3. 实施headerExists谓词是org.apache.kafka.connect.transforms.predicates.HasHeaderKey

  4. headerExistsPredicate有一个配置选项-的名字

  5. 实施过滤器转换io.开云体育官方注册网址debezium.embedded.ExampleFilterTransform

  6. 过滤器转换需要谓词headerExists

  7. 过滤器转换期望谓词的值被否定,从而使谓词确定头是否不存在

高级记录消费

对于某些用例,例如尝试批量写入记录或针对异步API写入记录时,上面描述的功能接口可能具有挑战性。在这些情况下,可能更容易使用io.开云体育官方注册网址debezium.engine.DebeziumEngine.ChangeConsumer < R >。接口。

该接口功能单一,签名如下:

/** *处理一批记录,为每条记录调用{@link RecordCommitter#markProcessed(Object)} *,当这批记录完成时调用{@link RecordCommitter#markBatchFinished()}。*/ void handleBatch(List records, RecordCommitter committer)抛出InterruptedException;

正如在Javadoc中提到的RecordCommitter对象将在每条记录和每批处理完成时调用。的RecordCommitter接口是线程安全的,这允许灵活地处理记录。

您可以选择覆盖所处理的记录的偏移量。这是通过首先构建一个新的偏移量通过调用RecordCommitter # buildOffsets (),用更新偏移量offset #set(字符串键,对象值),然后呼叫记录提交#markProcessed(SourceRecord记录,offset),更新后的偏移量

使用ChangeConsumerAPI时,必须将接口的实现传递给通知API,如下所示:

类MyChangeConsumer实现了DebeziumEngine开云体育官方注册网址。ChangeConsumer> { public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { ... } } // Create the engine with this configuration ... DebeziumEngine> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) .using(props) .notifying(new MyChangeConsumer()) .build();

如果使用JSON格式(等效的格式也适用于其他格式),那么代码将如下所示:

类JsonChangeConsumer实现了DebeziumEngine开云体育官方注册网址。ChangeConsumer> { public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { ... } } // Create the engine with this configuration ... DebeziumEngine> engine = DebeziumEngine.create(Json.class) .using(props) .notifying(new MyChangeConsumer()) .build();

发动机性能

以下配置属性为要求除非有一个默认值(为了文本格式化,Java类的包名被替换为<…>).

财产

默认的

描述

的名字

连接器实例的唯一名称。

connector.class

连接器的Java类的名称,例如<…>。MySqlConnectorMySQL连接器。

offset.storage

<…>。FileOffsetBackingStore

负责保持连接器偏移量的Java类的名称。它必须实现<…>。OffsetBackingStore接口。

offset.storage.file.filename

""

存储偏移量的文件路径。时需要offset.storage设置为<…>。FileOffsetBackingStore

offset.storage.topic

""

存储偏移量的Kafka主题的名称。时需要offset.storage设置为<…>。KafkaOffsetBackingStore

offset.storage.partitions

""

创建偏移存储主题时使用的分区数。时需要offset.storage设置为<…>。KafkaOffsetBackingStore

offset.storage.replication.factor

""

创建偏移存储主题时使用的复制因子。时需要offset.storage设置为<…>。KafkaOffsetBackingStore

offset.commit.policy

<…>。PeriodicCommitOffsetPolicy

提交策略的Java类的名称。它定义了必须根据处理的事件数量和自上次提交以来所经过的时间来触发偏移量提交的时间。该类必须实现接口<…>。OffsetCommitPolicy.默认为基于时间间隔的定期社区策略。

offset.flush.interval.ms

60000

尝试提交偏移量的间隔。默认为1分钟。

offset.flush.timeout.ms

5000

在取消进程并恢复将来尝试提交的偏移量数据之前,等待记录刷新和分区偏移量数据提交到偏移量存储的最大毫秒数。缺省值是5秒。

errors.max.retries

-1

连接错误失败前的最大重试次数(-1 =无限制,0 =禁用,> 0 =重试次数)。

errors.retry.delay.initial.ms

300

遇到连接错误时重试的初始延迟(毫秒)。该值将在每次重试时加倍,但不会超过errors.retry.delay.max.ms

errors.retry.delay.max.ms

10000

遇到连接错误时重试之间的最大延迟(毫秒)。

开云体育电动老虎机数据库模式历史属性

一些连接器还需要额外的属性集来配置数据库模式历史:开云体育电动老虎机

  • MySQL

  • SQL Server

  • 甲骨文

  • Db2

如果没有正确配置数据库模式历史,连接器将拒绝启动。开云体育电动老虎机默认配置期望Kafka集群可用。对于其他部署,可以使用基于文件的数据库模式历史存储实现。开云体育电动老虎机

财产 默认的 描述

schema.history.internal

<…>。KafkaSchemaHistory

负责持久化数据库模式历史记录的Java类的名称。开云体育电动老虎机
它必须实现<…>。SchemaHistory接口。

schema.history.internal.file.filename

""

存储数据库模式历史记录的文件的路径。开云体育电动老虎机
时需要schema.history.internal设置为<…>。FileSchemaHistory

schema.history.internal.kafka.topic

""

存储数据库模式历史的Kafka主题。开云体育电动老虎机
时需要schema.history.internal设置为<…>。KafkaSchemaHistory

schema.history.internal.kafka.bootstrap.servers

""

要连接的Kafka集群服务器的初始列表。集群提供主题来存储数据库模式历史记录。开云体育电动老虎机
时需要schema.history.internal设置为<…>。KafkaSchemaHistory

处理失败

当引擎执行时,它的连接器会在每个源记录中主动记录源偏移量,并且引擎会定期将这些偏移量刷新到持久存储中。当应用程序和引擎正常关闭或崩溃时,当它们重新启动时,引擎及其连接器将恢复读取源信息从上次记录的偏移量开始

那么,当您的应用程序在嵌入式引擎运行时失败时会发生什么呢?最终的结果是,应用程序可能会在重新启动后收到一些在崩溃前已经处理过的源记录。有多少取决于引擎将偏移量刷新到其存储的频率(通过offset.flush.interval.ms属性)以及在一个批处理中特定连接器返回多少条源记录。最好的情况是每次都刷新偏移量(例如,offset.flush.interval.ms设置为0),但即使这样,嵌入式引擎仍然只在从连接器接收到每批源记录之后才刷新偏移量。

例如,MySQL连接器使用max.batch.size指定可在批处理中出现的源记录的最大数目。即使有offset.flush.interval.ms设置为0,当应用程序重新启动后崩溃,它可能会看到高达n重复,n是批的大小。如果offset.flush.interval.ms属性设置得更高,则应用程序可能看到的最高N * m重复,n这批货的最大尺寸是多少在单个偏移量刷新间隔期间可能累积的批数。(显然,可以将嵌入式连接器配置为不使用批处理并始终刷新偏移量,从而使应用程序永远不会接收任何重复的源记录。然而,这会极大地增加开销并降低连接器的吞吐量。)

底线是,当使用嵌入式连接器时,应用程序在正常操作期间(包括正常关机后的重新启动)只接收一次源记录,但是在崩溃或不正确关机后重新启动后,确实需要容忍立即接收重复事件。如果应用程序需要更严格的“恰好一次”行为,那么它们应该使用完整的Debezium平台,该平台可以提供“恰好一次”保证(即使在崩溃和重新启动之后)。开云体育官方注册网址