(弃用)嵌入Debezium连接器开云体育官方注册网址

本页面描述内部API,向后不兼容的更改在将来的版本中。请使用新的公众支持开云体育官方注册网址Debezium引擎API

开云体育官方注册网址Debezium连接器通常由部署到卡夫卡连接服务,并配置一个或多个连接器上游数据库和监控生产数据更改事件的所有更改,他们看到的上游数据库。开云体育电动老虎机这些数据变化事件写入卡夫卡,他们可以独立使用的许多不同的应用程序。卡夫卡连接提供了出色的容错性和可伸缩性,因为它运行作为一个分布式服务并确保所有注册和配置连接器总是运行。例如,即使一个卡夫卡连接的端点在集群下降,其余卡夫卡连接端点将重启任何以前的连接器目前已被终止端点上运行,最大限度地减少停机时间和消除行政活动。

不是每个应用程序都需要这种程度的容错性和可靠性,他们可能不希望依赖外部卡夫卡经纪人和卡夫卡连接服务的集群。相反,有些应用程序会更喜欢嵌入开云体育官方注册网址Debezium连接器直接在应用程序空间。他们还想要相同的数据改变事件,但希望连接器将直接发送到应用程序而不是坚持他们在卡夫卡。

开云体育官方注册网址debezium-embedded模块定义了一个小型图书馆,使应用程序可以轻松地配置和运行Debezium连接器。开云体育官方注册网址

依赖关系

使用这个模块,添加开云体育官方注册网址debezium-embedded模块应用程序的依赖项。对于Maven,这需要在您的应用程序中添加以下的POM:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-embedded < / artifactId > <版本> $ {version.debezium} < /版本> < /依赖>

在哪里$ {version开云体育官方注册网址.debezium}Debezium你使用的版本或Maven属性的值包开云体育官方注册网址含Debezium版本字符串。

同样地,添加依赖关系的每个Debezium连接器应用程序将使用。开云体育官方注册网址例如,下面可以添加到您的应用程序的Maven POM文件,所以您的应用程序可以使用MySQL连接器:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-connector-mysql < / artifactId > <版本> $ {version.debezium} < /版本> < /依赖>

或MongoDB连接器:

<依赖> < groupId > io.debez开云体育官方注册网址ium < / groupId > < artifactId > debezium-connector-mongodb < / artifactId > <版本> $ {version.debezium} < /版本> < /依赖>

本文的剩余部分描述了在应用程序中嵌入MySQL连接器。其他连接器中使用类似的方式,除了连接器特有配置,主题和事件。

在代码中

您的应用程序需要建立嵌入式引擎为每个您想要运行连接器实例。的io.开云体育官方注册网址debezium.embedded.EmbeddedEngine类是一个易于使用的包装任何标准卡夫卡连接连接器,连接器完全管理的生命周期。基本上,您创建的EmbeddedEngine与一个配置(可能从一个属性文件加载),定义了环境对发动机和连接器。你也为引擎提供一个函数,它将要求每个数据更改事件产生的连接器。

这里有一个例子的代码配置并运行嵌入式MySQL连接器:

/ /定义嵌入的配置和MySQL连接器…配置配置= Configuration.create() / *开始引擎属性* /(“连接器。类”、“io.debez开云体育官方注册网址ium.connector.mysql.MySqlConnector”)(“偏移量。存储”、“org.apache.kafka.connect.storage.FileOffsetBackingStore”) (“offset.storage.file。文件名”、“/道路/ /存储/ offset.dat”) (“offset.flush.interval.ms”, 60000) / *开始连接器属性* /;(“名字”,“my-sql-connector”);(“数据库。开云体育电动老虎机主机名”、“localhost”)(“数据库。开云体育电动老虎机端口",3306);(“数据库。开云体育电动老虎机用户”、“mysqluser”)(“数据库。开云体育电动老虎机密码”、“mysqlpw”) (“database.ser开云体育电动老虎机ver。id ", 85744);(“主题。前缀”、“my-app-connector”) (“schema.history.internal”、“io.debezium.st开云体育官方注册网址orage.file.history.FileSchemaHistory”); (“schema.history.internal.file.filename”、“/道路/ /存储/ schemahistory.dat”) .build ();/ /创建的引擎配置…EmbeddedEngine引擎= EmbeddedEngine.create()型(配置).notifying (:: handleEvent) .build ();/ /异步运行引擎… Executor executor = Executors.newSingleThreadExecutor(); executor.execute(engine); // At some later time ... engine.stop();

让我们更详细地看看这段代码,首先我们在这里重复的前几行:

/ /定义嵌入的配置和MySQL连接器…配置配置= Configuration.create() / *开始引擎属性* /(“连接器。类”、“io.debez开云体育官方注册网址ium.connector.mysql.MySqlConnector”)(“偏移量。存储”、“org.apache.kafka.connect.storage.FileOffsetBackingStore”) (“offset.storage.file。文件名”、“/道路/ /存储/ offset.dat”) (“offset.flush.interval.ms ", 60000);

这将创建一个新的配置对象,并使用一个fluent-style builder API来设置几个字段所需的发动机不管使用的是哪一种连接器。首先是引擎的名称将在源记录由使用连接器和其内部状态,所以在应用程序中使用一些有意义的事情。的connector.class字段定义了类的名称,扩展了卡夫卡连接org.apache.kafka.connect.source.SourceConnector抽象类;在这个示例中,我们指定Debezium开云体育官方注册网址MySqlConnector类。

当卡夫卡连接的连接器,它读取信息从源和定期记录“抵消”定义多少的信息处理。连接器应该重新启动,它将使用最后一个记录抵消知道源信息应该阅读简历。由于连接器不知道或关心如何偏移量存储,它是由发动机提供一种方式来存储和恢复这些偏移量。接下来的几个字段的配置指定我们的引擎应该使用FileOffsetBackingStore类存储的补偿/ /存储/ offset.dat /路径本地文件系统上的文件(可以任意命名这个文件和存储任何地方)。此外,尽管连接器产生的偏移量与每一个原始记录》,记录发动机定期刷新补偿支持存储(在我们的例子中,每分钟一次)。这些字段为您的应用程序可以根据需要定制。

接下来的几行定义字段特定连接器,在我们的示例中是MySqlConnector连接器:

/ *开始连接器属性* /;(“名字”,“mysql-connector”);(“数据库。开云体育电动老虎机主机名”、“localhost”)(“数据库。开云体育电动老虎机端口",3306);(“数据库。开云体育电动老虎机用户”、“mysqluser”)(“数据库。开云体育电动老虎机密码”、“mysqlpw”) (“database.ser开云体育电动老虎机ver。id ", 85744);(“主题。前缀”、“产品”)(“schema.history.internal”、“io.debezium.storage.f开云体育官方注册网址ile.history.FileSchemaHistory”); (“schema.history.internal.file.filename”、“/道路/ /存储/ schemahistory.dat”) .build ();

这里,我们设置的主机名称和端口号的MySQL数据库服务器正在运行,我们定义用户名和密码将用于连接到MySQL数据库。开云体育电动老虎机注意,MySQL用户名和密码应该对应一个MySQL数据库MySQL用户被授予以下权限:开云体育电动老虎机

  • 选择

  • 重新加载

  • 显示数据库开云体育电动老虎机

  • 复制的奴隶

  • 复制客户端

前三个权限需要阅读时的一个一致的快照数据库。开云体育电动老虎机最后两个特权允许数据库读取服务器的binlog通常用于MySQL复制开云体育电动老虎机。

的配置还包括一个数字标识符server.id。由于MySQL的binlog MySQL复制机制的一部分,以阅读binlogMySqlConnector实例必须加入MySQL服务器组,这意味着这个服务器ID必须独特的内所有进程的MySQL服务器组1和2之间的任何整数321。在我们的代码,我们将它设置为一个相当大的但是有些随机值我们将为我们的应用程序只使用。

的配置还指定了一个逻辑名MySQL服务器。连接器包括这个逻辑名称主题领域内的每个源记录它产生,使应用程序能够辨别这些记录的起源。我们的示例使用一个服务器名称的“产品”,大概是因为数据库包含产品信息。开云体育电动老虎机当然,你可以命名任何有意义的应用程序。

MySqlConnector类运行时,它读取MySQL服务器的binlog,包括所有数据更改和模式变化主办的数据库服务器。开云体育电动老虎机因为所有数据更改结构的拥有表的模式变化时被记录,连接器需要跟踪所有的模式变化,以便它可以正确解码更改事件。连接器记录模式信息,这样,连接器应该重启和恢复阅读上次记录的偏移量,它知道什么数据库模式看起来像抵消。开云体育电动老虎机连接器如何记录中定义的数据库模式历史是我们最后的两个字段配置开云体育电动老虎机,即我们的连接器应该使用吗FileSchemaHistory类来存储数据库模式的变化历史开云体育电动老虎机/ /存储/ schemahistory.dat /路径本地文件系统上的文件(这个文件可以在任何地方叫什么和存储)。

最后,建立使用不可变的配置build ()方法。(顺便说一下,而不是构建以编程的方式,我们可以配置从一个属性文件使用的一个Configuration.read (…)方法。)

现在我们已经配置,我们可以创建我们的引擎。这里是相关的代码:

/ /创建的引擎配置…EmbeddedEngine引擎= EmbeddedEngine.create()型(配置).notifying (:: handleEvent) .build ();

fluent-style builder API用于创建一个使用我们的引擎配置对象和发送所有数据更改记录handleEvent (SourceRecord)相匹配的方法,它可以是任何方法的签名java.util.function.Consumer < SourceRecord >功能界面,SourceRecordorg.apache.kafka.connect.source.SourceRecord类。注意,应用程序的处理函数不应该抛出任何异常;如果是这样,引擎将日志抛出的任何异常的方法,将继续操作接下来的原始记录,但您的应用程序不会有另一个机会来处理特定的源记录导致异常,意味着您的应用程序可能会与数据库不一致。开云体育电动老虎机

在这一点上,我们有一个现有的EmbeddedEngine对象配置和准备好运行,但它不会做任何事情。的EmbeddedEngine被设计成异步执行的吗遗嘱执行人ExecutorService:

/ /异步运行引擎…遗嘱执行人执行人= Executors.newSingleThreadExecutor ();executor.execute(引擎);

您的应用程序可以阻止发动机安全、优雅地通过调用它stop ()方法:

/ /稍后…engine.stop ();

引擎的连接器将停止阅读信息从源系统,所有剩余SourceRecord对象处理程序函数,并刷新最新offets抵消存储。只有这一切完成之后将发动机的run ()方法返回。如果应用程序需要等待引擎完全停止在退出前,你可以做这个引擎的等待(…)方法:

{虽然(试试!引擎。TimeUnit.SECONDS等待(30日)){logger.info(“等候另一个30秒的嵌入式引擎关闭”);}}赶上(InterruptedException e) {Thread.currentThread () .interrupt ();}

回想一下,当JVM关闭,只等待守护进程线程。因此,如果您的应用程序退出时,一定要等待完成发动机或者运行引擎在一个守护线程。

您的应用程序应该适当停止发动机,确保优雅和完全关闭,每个源记录发送到应用程序的一个时间。例如,不依赖关闭ExecutorService,因为这中断正在运行的线程。虽然EmbeddedEngine确实会终止它的线程被中断时,引擎不会终止干净,当重新启动您的应用程序可能会看到一些相同的源记录,处理前就关闭。

先进的消费记录

对于一些用例,例如当试图写记录批次或反对一个异步API,上述功能界面可能是一个挑战。在这些情况下,它可能是更容易使用io.开云体育官方注册网址debezium.embedded.EmbeddedEngine.ChangeConsumer接口。

这个接口有单一功能与以下签名:

/ * * *处理一批记录,调用{@link RecordCommitter # markProcessed (SourceRecord)} *为每一个记录和{@link RecordCommitter # markBatchFinished()}当这批处理完成。* @param记录要处理的记录* @param提交者提交者,指示系统,我们完成* /空白handleBatch (< SourceRecord >记录列表,RecordCommitter提交者)抛出InterruptedException;

在Javadoc中提到,RecordCommitter对象是呼吁每个记录,每批处理完成。的RecordCommitter接口是线程安全的,这允许灵活处理的记录。

使用ChangeConsumerAPI,您必须通过接口的一个实现通知API,如所示:

类MyChangeConsumer实现EmbeddedEngine。ChangeConsumer{ public void handleBatch(List records, RecordCommitter committer) throws InterruptedException { ... } } // Create the engine with this configuration ... EmbeddedEngine engine = EmbeddedEngine.create() .using(config) .notifying(new MyChangeConsumer()) .build();

发动机性能

以下配置属性要求除非一个默认值是可用的(为了文本格式替换Java类的包名<…>)。

财产 默认的 描述

的名字

独特的连接器实例的名称。

connector.class

连接器的Java类的名称,例如<…> .MySqlConnectorMySQL连接器。

offset.storage

<…> .FileOffsetBackingStore

Java类的名称负责持久性连接器的补偿。它必须实现<…> .OffsetBackingStore接口。

offset.storage.file.filename

”“

路径文件偏移量的存储。时需要offset.storage设置为<…> .FileOffsetBackingStore

offset.storage.topic

”“

卡夫卡的名称主题偏移量在哪里存储。时需要offset.storage设置为<…> .KafkaOffsetBackingStore

offset.storage.partitions

”“

在创建使用的分区数量抵消存储主题。时需要offset.storage设置为<…> .KafkaOffsetBackingStore

offset.storage.replication.factor

”“

复制因子创建抵消存储主题时使用。时需要offset.storage设置为<…> .KafkaOffsetBackingStore

offset.commit.policy

<…> .PeriodicCommitOffsetPolicy

提交政策的Java类的名称。它定义当补偿提交必须基于触发事件处理的数量和时间自上次提交。这个类必须实现的接口<…> .OffsetCommitPolicy。默认是一个周期性的堪萨斯州策略基于时间间隔。

offset.flush.interval.ms

60000年

尝试提交补偿间隔。默认是1分钟。

offset.flush.timeout.ms

5000年

最大等待的毫秒数记录冲洗和分区抵消数据提交,以抵消存储之前取消和恢复过程偏移量数据在未来的尝试。默认是5秒。

开云体育电动老虎机数据库模式历史属性

一些连接器还需要额外的组属性,配置数据库模式历史:开云体育电动老虎机

  • MySQL

  • SQL Server

  • 甲骨文

  • Db2

没有适当的配置数据库模式历史连接器将拒绝开始。开云体育电动老虎机缺省配置预计卡夫卡集群可用。对于其他的部署,基于文件的数据库模式存储实现的历史。开云体育电动老虎机

财产 默认的 描述

schema.history.internal

<…> .KafkaSchemaHistory

Java类的名称负责持久性数据库模式的历史。开云体育电动老虎机
它必须实现<…> .SchemaHistory接口。

schema.history.internal.file.filename

”“

通往一个数据库模式历史存储的文件。开云体育电动老虎机
时需要schema.history.internal设置为<…> .FileSchemaHistory

schema.history.internal.kafka.topic

”“

卡夫卡主题数据库模式历史存储的地方。开云体育电动老虎机
时需要schema.history.internal设置为<…> .KafkaSchemaHistory

schema.history.internal.kafka.bootstrap.servers

”“

最初的卡夫卡集群服务器连接的列表。集群提供了主题存储数据库模式的历史。开云体育电动老虎机
时需要schema.history.internal设置为<…> .KafkaSchemaHistory

处理失败

当引擎执行时,它的连接器是积极记录源抵消内部每个源记录,和引擎定期冲洗这些补偿持久存储。当应用程序和发动机正常关闭或崩溃,当他们重新启动引擎和它的连接器将恢复阅读源信息从最后一个记录的偏移量

所以,当嵌入式引擎运行时应用程序失败?净效应是,应用程序可能会获得一些源记录重新启动后,它已经在崩溃前处理。多少取决于发动机的频率将抵消其存储(通过吗offset.flush.interval.ms属性)和多少源记录特定连接器返回一批。最好的情况是抵消每次都刷新(例如,offset.flush.interval.ms设置为0),但即使这样嵌入式引擎将仍然只冲洗后补偿来源记录收到的每一批连接器。

例如,MySQL连接器使用max.batch.size指定源记录的最大数量,可以出现在一个批处理。即使有offset.flush.interval.ms设置为0,当应用程序启动时撞击后可以看到吗n重复,n是批量的大小。如果offset.flush.interval.ms属性设置较高,则应用程序可能会看到n * m重复,n的最大大小批量和吗批次的数量可能会积累在一个抵消冲洗间隔。(显然可以配置嵌入式连接器使用批处理,总是冲偏移,导致应用程序从不接受任何重复记录来源。然而,这大大增加了开销,减少连接器的吞吐量)。

底线是使用嵌入式连接器时,应用程序将接收每个源记录完全一旦在正常操作(包括优雅的关机后重启),但需要宽容接收复制事件后立即崩溃或不当关闭后重新启动。如果应用程序需要更严格的只有一次行为,那么他们应该使用完整Debezium平台,可以提供只有一次担保(甚至崩溃后重启)。开云体育官方注册网址