[弃用]嵌入Debezium连接器开云体育官方注册网址

本页描述了一个内部API,该API在未来版本中可能会发生向后不兼容的更改。请使用新支持的公共文件开云体育官方注册网址Debezium引擎API

开云体育官方注册网址Debezium连接器通常通过将它们部署到Kafka Connect服务,并配置一个或多个连接器来监视上游数据库,并为它们在上游数据库中看到的所有更改产生数据更改事件来操作。开云体育电动老虎机这些数据更改事件被写入Kafka,在那里它们可以被许多不同的应用程序独立使用。Kafka Connect提供了出色的容错性和可伸缩性,因为它作为分布式服务运行,并确保所有注册和配置的连接器始终在运行。例如,即使集群中的一个Kafka Connect端点宕机,其余的Kafka Connect端点将重新启动之前在现在终止的端点上运行的任何连接器,最大限度地减少停机时间并消除管理活动。

并不是每个应用程序都需要这种级别的容错和可靠性,它们可能不希望依赖于Kafka代理和Kafka Connect服务的外部集群。相反,有些应用程序更愿意这样做嵌入开云体育官方注册网址Debezium连接器直接在应用程序空间中。他们仍然想要相同的数据更改事件,但更喜欢让连接器直接将它们发送到应用程序,而不是将它们持久化在Kafka中。

开云体育官方注册网址debezium-embedded模块定义了一个小型库,允许应用程序轻松配置和运行Debezium连接器。开云体育官方注册网址

依赖关系

要使用此模块,请添加开云体育官方注册网址debezium-embedded模块到应用程序的依赖项。对于Maven,这需要将以下内容添加到应用程序的POM中:

 io.开云体育官方注册网址debezium debezium-embedded ${version.debezium}  .debezium

在哪里$ {version开云体育官方注册网址.debezium}要么是您正在使用的Debezium版本,要么是Ma开云体育官方注册网址ven属性,其值包含Debezium版本字符串。

同样,为应用程序将使用的每个Debezium连接器添加依赖项。开云体育官方注册网址例如,可以将以下内容添加到应用程序的Maven POM文件中,以便应用程序可以使用MySQL连接器:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-connector-mysql < / artifactId > <版本> $ {version.debezium} < /版本> < / >的依赖

或者对于MongoDB连接器:

 io.开云体育官方注册网址debezium  debezu -connector-mongodb ${version.debezium}  .debezium . io.debezium

本文档的其余部分描述在应用程序中嵌入MySQL连接器。除了特定于连接器的配置、主题和事件外,其他连接器也以类似的方式使用。

在代码中

应用程序需要为要运行的每个连接器实例设置一个嵌入式引擎。的io.开云体育官方注册网址debezium.embedded.EmbeddedEngine类作为一个易于使用的包装器,围绕任何标准的Kafka Connect连接器,并完全管理连接器的生命周期。基本上,你创建EmbeddedEngine使用为引擎和连接器定义环境的配置(可能从属性文件加载)。您还为引擎提供了一个函数,它将在连接器产生的每个数据更改事件时调用该函数。

下面是一个配置和运行嵌入式程序的代码示例MySQL连接器

//定义嵌入式和MySQL连接器的配置…配置配置= Configuration.create() /*开始引擎属性*/ .with("connector.class", "io. debezui .connector.mysql. mysqlconn开云体育官方注册网址ector ") .with("offset. class")"org.apache.kafka.connect.storage.FileOffsetBackingStore") .with("offset.storage.file. "文件名","/path/to/storage/offset.dat") .with("offset.flush.interval.ms", 60000) /*开始连接器属性*/ .with("name", "my-sql-connector") .with("database. dat") .with("data开云体育电动老虎机base. dat") . /Hostname ", "localhost") .with(开云体育电动老虎机"database. Hostname ", "localhost")。Port ", 3306) .with(开云体育电动老虎机"数据库。User ", "mysqluser") .with(开云体育电动老虎机"database. User ")。Password ", "mysqlpw") .with(开云体育电动老虎机"database.server. Password ", "mysqlpw")Id ", 85744) .with("主题。prefix", "my-app-connector") .with("schema.history.internal", "io.开云体育官方注册网址debezium.storage.file.history.FileSchemaHistory") .with("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat") .build();//创建引擎EmbeddedEngine引擎= EmbeddedEngine.create() .using(config) .notifying(this::handleEvent) .build();//异步运行引擎…Executor Executor = Executors.newSingleThreadExecutor(); executor.execute(engine); // At some later time ... engine.stop();

让我们更详细地研究这段代码,从我们在这里重复的前几行开始:

//定义嵌入式和MySQL连接器的配置…配置配置= Configuration.create() /*开始引擎属性*/ .with("connector.class", "io. debezui .connector.mysql. mysqlconn开云体育官方注册网址ector ") .with("offset. class")"org.apache.kafka.connect.storage.FileOffsetBackingStore") .with("offset.storage.file. "文件名","/path/to/storage/offset.dat") .with("offset.flush.interval.ms", 60000);

这会创建一个新的配置对象,并使用流畅风格的构建器API来设置引擎所需的几个字段,而不管使用哪个连接器。第一个是将在连接器产生的源记录及其内部状态中使用的引擎名称,因此在应用程序中使用一些有意义的东西。的connector.classfield定义了扩展Kafka Connect的类的名称org.apache.kafka.connect.source.SourceConnector抽象类;在这个例子中,我们指定Debezium的开云体育官方注册网址MySqlConnector类。

当Kafka Connect连接器运行时,它从源读取信息,并定期记录“偏移量”,这定义了它处理了多少信息。如果重新启动连接器,它将使用最后记录的偏移量来确定应该在源信息的哪个位置恢复读取。因为连接器不知道也不关心如何偏移量是存储的,这取决于引擎提供一种存储和恢复这些偏移量的方法。配置的接下来几个字段指定引擎应该使用FileOffsetBackingStore类中存储偏移量/ /存储/ offset.dat /路径本地文件系统上的文件(该文件可以命名为任何名称并存储在任何位置)。此外,尽管连接器记录它产生的每个源记录的偏移量,但引擎会定期将偏移量刷新到备份存储区(在我们的示例中,每分钟刷新一次)。这些字段可以根据应用程序的需要进行调整。

接下来的几行定义特定于连接器的字段,在我们的示例中是MySqlConnector连接器:

/*开始连接器属性*/ .with("name", "mysql-connector") .with("database. connector")开云体育电动老虎机Hostname ", "localhost") .with(开云体育电动老虎机"database. Hostname ", "localhost")。Port ", 3306) .with(开云体育电动老虎机"数据库。User ", "mysqluser") .with(开云体育电动老虎机"database. User ")。Password ", "mysqlpw") .with(开云体育电动老虎机"database.server. Password ", "mysqlpw")Id ", 85744) .with("主题。("schema.history.internal", "io.debezium.storage.file.hist开云体育官方注册网址ory.FileSchemaHistory") .with("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat") .build();

在这里,我们设置了运行MySQL数据库服务器的主机名和端口号,并定义了将用于连接MySQL数据库的用户名和密码。开云体育电动老虎机注意,对于MySQL,用户名和密码应该对应于已被授予以下MySQL权限的MySQL数据库用户:开云体育电动老虎机

  • 选择

  • 重新加载

  • 显示数据库开云体育电动老虎机

  • 复制的奴隶

  • 复制客户端

在读取数据库的一致快照时,需要前三个特权。开云体育电动老虎机最后两个特权允许数据库读取服务器的binlog(通常用于MySQL复开云体育电动老虎机制)。

属性的数字标识符server.id.由于MySQL的binlog是MySQL复制机制的一部分,为了读取binlog的MySqlConnector实例必须加入MySQL服务器组,这意味着这个服务器ID必须是在组成MySQL服务器组的所有进程中是唯一的是1到2之间的任何整数321。在我们的代码中,我们将它设置为一个相当大但有点随机的值,只用于我们的应用程序。

该配置还为MySQL服务器指定了一个逻辑名称。连接器将此逻辑名称包含在它所产生的每个源记录的主题字段中,使您的应用程序能够识别这些记录的起源。我们的示例使用服务器名“products”,可能是因为数据库包含产品信息。开云体育电动老虎机当然,您可以将其命名为对应用程序有意义的任何名称。

MySqlConnector类运行时,它会读取MySQL服务器的binlog,其中包括对服务器托管的数据库所做的所有数据更改和模式更改。开云体育电动老虎机由于对数据的所有更改都是根据记录更改时所属表的模式进行结构化的,因此连接器需要跟踪所有模式更改,以便正确地解码更改事件。连接器记录模式信息,这样,如果连接器重新启动并恢复从最后记录的偏移量读取数据,它就确切地知道该偏移量处的数据库模式是什么样子。开云体育电动老虎机连接器如何记录数据库模式历史是在配置的最后两个字段中定义的,开云体育电动老虎机即连接器应该使用FileSchemaHistory类中存储数据库模式历史记录更改开云体育电动老虎机/ /存储/ schemahistory.dat /路径本地文件系统上的文件(同样,该文件可以命名为任何名称并存储在任何位置)。

方法构建不可变配置build ()方法。(顺便说一句,我们可以不通过编程来构建它属性文件中的配置Configuration.read(…)方法。)

现在我们有了一个配置,我们可以创建我们的引擎。下面是相关的代码行:

//创建引擎EmbeddedEngine引擎= EmbeddedEngine.create() .using(config) .notifying(this::handleEvent) .build();

一个流畅风格的构建器API用于创建一个使用我们的配置对象,并将所有数据更改记录发送到handleEvent (SourceRecord)方法的签名匹配的任何方法java.util.function.Consumer < SourceRecord >功能接口,其中SourceRecordorg.apache.kafka.connect.source.SourceRecord类。注意,应用程序的处理函数不应该抛出任何异常;如果是这样,引擎将记录该方法抛出的任何异常,并继续对下一个源记录进行操作,但您的应用程序将没有机会再处理导致异常的特定源记录,这意味着您的应用程序可能与数据库不一致。开云体育电动老虎机

在这一点上,我们有一个EmbeddedEngine对象,该对象已配置并准备运行,但它不执行任何操作。的EmbeddedEngine被设计为异步执行遗嘱执行人ExecutorService

//异步运行引擎…Executor Executor = Executors.newSingleThreadExecutor();executor.execute(引擎);

您的应用程序可以通过调用它的stop ()方法:

//在稍后的时间…engine.stop ();

引擎连接器将停止从源系统读取信息,并转发所有剩余信息SourceRecord对象添加到处理程序函数中,并将最新偏移量刷新到偏移量存储中。只有在所有这些完成后,引擎的run ()方法返回。如果您的应用程序需要等待引擎完全停止才能退出,您可以使用引擎的等待(…)方法:

尝试{while (!engine。await(30, TimeUnit.SECONDS)) {logger.info(“等待另一个30秒的嵌入式引擎关闭”);}} catch (InterruptedException e) {Thread.currentThread().interrupt();}

回想一下,当JVM关闭时,它只等待守护线程。因此,如果应用程序退出,请确保等待引擎完成,或者在守护进程线程上运行引擎。

您的应用程序应该始终正确地停止引擎,以确保优雅和完整的关闭,并且每个源记录只发送给应用程序一次。例如,不要依赖于关闭ExecutorService,因为这会中断正在运行的线程。虽然EmbeddedEngine当它的线程被中断时,引擎确实会终止,引擎可能不会干净地终止,当应用程序重新启动时,它可能会看到一些在关闭之前处理过的相同的源记录。

高级记录消费

对于某些用例,例如尝试批量写入记录或针对异步API写入记录时,上面描述的功能接口可能具有挑战性。在这些情况下,可能更容易使用io.开云体育官方注册网址debezium.embedded.EmbeddedEngine.ChangeConsumer接口。

该接口功能单一,签名如下:

/** *处理一批记录,为每条记录调用{@link RecordCommitter#markProcessed(SourceRecord)} *,当这批记录完成时调用{@link RecordCommitter#markBatchFinished()}。*/ void handleBatch(List records, RecordCommitter committer)抛出InterruptedException异常;

正如在Javadoc中提到的RecordCommitter对象将在每条记录和每批处理完成时调用。的RecordCommitter接口是线程安全的,这允许灵活地处理记录。

使用ChangeConsumerAPI时,必须将接口的实现传递给通知API,如下所示:

类MyChangeConsumer实现了EmbeddedEngine。ChangeConsumer{ public void handleBatch(List records, RecordCommitter committer) throws InterruptedException { ... } } // Create the engine with this configuration ... EmbeddedEngine engine = EmbeddedEngine.create() .using(config) .notifying(new MyChangeConsumer()) .build();

发动机性能

以下配置属性为要求除非有一个默认值(为了文本格式化,Java类的包名被替换为<…>).

财产 默认的 描述

的名字

连接器实例的唯一名称。

connector.class

连接器的Java类的名称,例如<…>。MySqlConnectorMySQL连接器。

offset.storage

<…>。FileOffsetBackingStore

负责保持连接器偏移量的Java类的名称。它必须实现<…>。OffsetBackingStore接口。

offset.storage.file.filename

""

存储偏移量的文件路径。时需要offset.storage设置为<…>。FileOffsetBackingStore

offset.storage.topic

""

存储偏移量的Kafka主题的名称。时需要offset.storage设置为<…>。KafkaOffsetBackingStore

offset.storage.partitions

""

创建偏移存储主题时使用的分区数。时需要offset.storage设置为<…>。KafkaOffsetBackingStore

offset.storage.replication.factor

""

创建偏移存储主题时使用的复制因子。时需要offset.storage设置为<…>。KafkaOffsetBackingStore

offset.commit.policy

<…>。PeriodicCommitOffsetPolicy

提交策略的Java类的名称。它定义了必须根据处理的事件数量和自上次提交以来所经过的时间来触发偏移量提交的时间。该类必须实现接口<…>。OffsetCommitPolicy.默认为基于时间间隔的定期社区策略。

offset.flush.interval.ms

60000

尝试提交偏移量的间隔。默认为1分钟。

offset.flush.timeout.ms

5000

在取消进程并恢复将来尝试提交的偏移量数据之前,等待记录刷新和分区偏移量数据提交到偏移量存储的最大毫秒数。缺省值是5秒。

开云体育电动老虎机数据库模式历史属性

一些连接器还需要额外的属性集来配置数据库模式历史:开云体育电动老虎机

  • MySQL

  • SQL Server

  • 甲骨文

  • Db2

如果没有正确配置数据库模式历史,连接器将拒绝启动。开云体育电动老虎机默认配置期望Kafka集群可用。对于其他部署,可以使用基于文件的数据库模式历史存储实现。开云体育电动老虎机

财产 默认的 描述

schema.history.internal

<…>。KafkaSchemaHistory

负责持久化数据库模式历史记录的Java类的名称。开云体育电动老虎机
它必须实现<…>。SchemaHistory接口。

schema.history.internal.file.filename

""

存储数据库模式历史记录的文件的路径。开云体育电动老虎机
时需要schema.history.internal设置为<…>。FileSchemaHistory

schema.history.internal.kafka.topic

""

存储数据库模式历史的Kafka主题。开云体育电动老虎机
时需要schema.history.internal设置为<…>。KafkaSchemaHistory

schema.history.internal.kafka.bootstrap.servers

""

要连接的Kafka集群服务器的初始列表。集群提供主题来存储数据库模式历史记录。开云体育电动老虎机
时需要schema.history.internal设置为<…>。KafkaSchemaHistory

处理失败

当引擎执行时,它的连接器会在每个源记录中主动记录源偏移量,并且引擎会定期将这些偏移量刷新到持久存储中。当应用程序和引擎正常关闭或崩溃时,当它们重新启动时,引擎及其连接器将恢复读取源信息从上次记录的偏移量开始

那么,当您的应用程序在嵌入式引擎运行时失败时会发生什么呢?最终的结果是,应用程序可能会在重新启动后收到一些在崩溃前已经处理过的源记录。有多少取决于引擎将偏移量刷新到其存储的频率(通过offset.flush.interval.ms属性)以及在一个批处理中特定连接器返回多少条源记录。最好的情况是每次都刷新偏移量(例如,offset.flush.interval.ms设置为0),但即使这样,嵌入式引擎仍然只在从连接器接收到每批源记录之后才刷新偏移量。

例如,MySQL连接器使用max.batch.size指定可在批处理中出现的源记录的最大数目。即使有offset.flush.interval.ms设置为0,当应用程序重新启动后崩溃,它可能会看到高达n重复,n是批的大小。如果offset.flush.interval.ms属性设置得更高,则应用程序可能看到的最高N * m重复,n这批货的最大尺寸是多少在单个偏移量刷新间隔期间可能累积的批数。(显然,可以将嵌入式连接器配置为不使用批处理并始终刷新偏移量,从而使应用程序永远不会接收任何重复的源记录。然而,这会极大地增加开销并降低连接器的吞吐量。)

底线是,当使用嵌入式连接器时,应用程序在正常操作期间(包括正常关机后的重新启动)只接收一次源记录,但是在崩溃或不正确关机后重新启动后,确实需要容忍立即接收重复事件。如果应用程序需要更严格的“恰好一次”行为,那么它们应该使用完整的Debezium平台,该平台可以提供“恰好一次”保证(即使在崩溃和重新启动之后)。开云体育官方注册网址