当我们的MySQL连接器正在读取MySQL服务器或集群的binlog,它解析日志中的DDL语句,并在每个表的模式随时间发展的过程中构建内存模型。这个过程很重要,因为连接器在每个事件发生时使用表的定义为每个表生成事件。我们不能使用数据库的开云体育电动老虎机当前的模式,因为自连接器读取的时间点(或日志中的位置)以来,它可能已经更改了。
解析MySQL或任何其他主要关系数据库的DDL似乎是一项艰巨的任务。开云体育电动老虎机通常每个DBMS都有高度定制的SQL语法,尽管数据操作语言(DML)的语句通常与标准相当接近数据定义语言(DDL)语句通常不是这样,涉及更多特定于dbms的特性。
既然如此,我们为什么要为MySQL编写自己的DDL解析器呢?让我们首先看看Debezium需要DDL解析器来做开云体育官方注册网址什么。
在Debezium MySQL连接器中开云体育官方注册网址解析DDL
MySQL binlog包含各种事件。例如,当一行插入到表中时,binlog事件包含对表和表中每列的值的间接引用,但是没有关于组成表的列的信息。binlog中唯一引用表结构的是MySQL在处理用户提供的DDL语句时生成的SQL DDL语句。
连接器还使用Kafka Connect schema生成消息,这是一种简单的数据结构,定义了每个字段的各种名称和类型,以及字段的组织方式。所以,当我们为表插入生成事件消息时,我们首先必须有一个Kafka Connect模式
对象中所有合适的字段,然后我们必须将有序的列值数组转换为Kafka Connect结构体
使用表插入事件中的字段和个别列值初始化。
幸运的是,当我们遇到DDL语句时,我们可以更新内存模型,然后使用它来生成一个模式
对象。同时,我们可以创建一个使用这个的组件模式
对象来创建结构体
从事件中出现的列值的有序数组中获取。所有这些操作只需执行一次,并用于该表上的所有行事件,直到遇到另一条DDL语句更改表的模式,此时我们再次更新模型。
因此,所有这些都需要解析所有DDL语句,尽管为了我们的目的,我们只需要这样做理解DDL语法的一个小子集。然后,我们必须使用该语句子集来更新我们的表的内存模型。并且由于我们的内存表模型不是特定于MySQL的,其余的功能将生成模式
对象和组件,用于将值数组转换为结构体
消息中使用的对象都是通用的。
现有DDL库
不幸的是,对于MySQL、PostgreSQL或其他流行的rdbms,并没有那么多第三方开源库来解析DDL语句。JSqlParser常被引用,但它有一个单一的语法它是多个DBMS语法的组合,因此不是任何特定DBMS的严格解析器。通过更新复合语法来增加对其他dbms的支持可能很困难。
其他库,例如PrestoDB他们定义了自己的SQL语法,无法处理MySQL DDL语法的复杂和细微差别。Antlr解析器生成器项目有一个MySQL 5.6语法,但这仅限于DML的一个小子集,并且不支持DDL或更新的5.7特性。有Antlr 3的旧sql相关语法,但它们通常是巨大的,有bug,并且仅限于特定的dbms。的Teiid项目是一个位于各种dbms和数据源之上的数据虚拟化引擎,它的工具有一系列的DDL解析器在一个特殊的存储库中构建ast(作者实际上帮助开发了这些)。还有Ruby库,比如Square的MySQL解析器库.还有一个专有商业产品.
我们的DDL解析器框架
因为我们找不到一个有用的第三方开源库,所以我们选择创建自己的DDL解析器框架来满足我们的需求:
解析DDL语句并更新我们的内存模型。
专注于使用那些基本语句(例如,创建、修改和删除表和视图),而完全忽略其他语句,而不必解析它们。
解析器代码的结构与MySQL DDL语法文档并使用与语法规则相对应的方法名。随着时间的推移,这将使它更容易维护。
允许根据需要为PostgreSQL、Oracle、SQLServer和其他dbms创建解析器。
通过子类化支持自定义:能够轻松覆盖逻辑的狭窄部分,而无需复制大量代码。
使解析器易于开发、调试和测试。
结果框架包括一个标记器,它将字符串中的一个或多个DDL语句转换为可重放的标记序列,其中每个标记表示标点符号、带引号的字符串、不分大小写的单词和符号、数字、关键字、注释和终止字符(例如;
MySQL)。然后,DDL解析器使用简单易读的流畅API遍历令牌流,查找模式,并调用自身的方法来处理各种令牌集。解析器还使用内部的数据类型解析器用于处理SQL数据类型表达式,例如INT
,VARCHAR (64)
,数字(32岁,3)
,时间戳(8),带时区
.
的MySqlDdlParser类扩展了基类并提供所有特定于mysql的解析逻辑。例如,DDL语句:
#创建和填充我们的产品使用一个单一的插入多行创建表格产品(id)整数不零AUTO_INCREMENT主要的关键、名称VARCHAR(255)不零、描述VARCHAR(512),重量浮动);改变表格产品AUTO_INCREMENT=101;#使用多个插件创建和填充手头的产品创建表格Products_on_hand (product_id . on_hand整数不零主要的关键、数量整数不零,外国关键(product_id)参考文献产品(id));
可以很容易地解析:
字符串ddlStatements =…DdlParser解析器=新MySqlDdlParser ();表=新表();解析器。解析(ddl, tables);
在这里,表
对象是命名表定义的内存表示形式。解析器处理DDL语句,将每个DDL语句应用于表
对象。
它是如何工作的
每一个DdlParser
implementation有以下公共方法来解析提供String中的语句:
公共最后无效解析(字符串ddlContent, Tables 开云体育电动老虎机databaseTables) {Tokenizer Tokenizer =新DdlTokenizer (! skipComments (),这:: determineTokenType);TokenStream流=新TokenStream (ddlContent记号赋予器,假);stream.start ();解析(流,databaset开云体育电动老虎机able);}
在这里,该方法创建了一个newTokenStream
从内容中使用DdlTokenizer
它知道如何将字符串中的字符分隔为各种类型的令牌对象。然后调用另一个解析
方法来完成大部分工作:
公共最后无效解析(TokenStream ddlContent, Tables dat开云体育电动老虎机abaseTables)抛出ParsingException,IllegalStateException{这.tokens = ddlContent;这.开云体育电动老虎机databaseTables = databaseTables;标记标记= ddlContent.mark();试一试{而(ddlContent.hasNext()) {parseNextStatement(ddlContent.mark());//如果语句结束符仍然存在,则消耗它…tokens.canConsume (DdlTokenizer.STATEMENT_TERMINATOR);}}抓(ParsingException e) {ddlContent.rewind(标记);扔e;}抓(Throwablet) {parsingFailed(ddlContent.nextPosition(),"意外异常("+ t.getMessage() +")解析"t);}}
这将设置一些本地状态,标记当前起点,并尝试解析DDL语句,直到找不到更多的DDL语句为止。如果解析逻辑未能找到匹配,它将生成一个ParsingException
有问题的行和列,加上一个消息,表明找到了什么和期望什么。在这种情况下,该方法将返回令牌流(以防调用者希望尝试另一种不同的解析器)。
每次parseNextStatement
方法时,将该语句的起始位置传递给该方法,使其获得语句的起始位置。我们的MySqlDdlParser
子类重写parseNextStatement
方法,使用语句中的第一个标记来确定MySQL DDL语法中允许的语句类型:
@Override受保护的无效parseNextStatement(标记标记){如果(tokens.matches(DdlTokenizer.COMMENT)) {parseccomment(标记);}其他的如果(tokens.matches ("创建")) {parseCreate(标记);}其他的如果(tokens.matches ("改变")) {parseAlter(标记);}其他的如果(tokens.matches ("下降")) {parseDrop(标记);}其他的如果(tokens.matches ("重命名")) {parseRename(标记);}其他的{parseUnknownStatement(标记);}}
当找到匹配的令牌时,该方法调用适当的方法。例如,如果语句以创建表…
,则parseCreate
方法调用时使用标识语句起始位置的相同标记:
@Override受保护的无效parseCreate(标记标记){tokens.consume("创建");如果(tokens.matches ("表格"|| token .matches("临时","表格")) {parseCreateTable(标记);}其他的如果(tokens.matches ("视图")) {parseCreateView(标记);}其他的如果(tokens.matchesAnyOf ("开云体育电动老虎机","模式")) {parseCreateUnknown(标记);}其他的如果(tokens.matchesAnyOf ("事件")) {parseCreateUnknown(标记);}其他的如果(tokens.matchesAnyOf ("函数","过程")) {parseCreateUnknown(标记);}其他的如果(tokens.matchesAnyOf ("独特的","全文","空间","指数")) {parseCreateIndex(标记);}其他的如果(tokens.matchesAnyOf ("服务器")) {parseCreateUnknown(标记);}其他的如果(tokens.matchesAnyOf ("表空间")) {parseCreateUnknown(标记);}其他的如果(tokens.matchesAnyOf ("触发")) {parseCreateUnknown(标记);}其他的{//它可以是几种可能的东西(包括more .//这些匹配的复杂形式),顺序(这:: parseCreateView,这:: parseCreateUnknown);}}
方法首先使用令牌创建
,然后尝试将这些标记与各种标记字面量模式相匹配。如果找到匹配,则此方法将委托给其他更特定的解析方法。请注意,框架的流畅API使理解匹配模式变得非常容易。
让我们更进一步。假设我们的DDL语句以创建表产品(
,解析器将调用parseCreateTable
方法,再次使用相同的标记表示语句的开始:
受保护的无效parseCreateTable(标记开始){token . canconsume ("临时");tokens.consume ("表格");布尔onlyIfNotExists = token . canconsume ("如果","不","存在");TableId = parseQualifiedTableName(start);如果(tokens.canConsume ("就像")) {TableId originalId = parseQualifiedTableName(start);Table original = 开云体育电动老虎机databastables . fortable (originalId);如果(原来!=零) {d开云体育电动老虎机atabaseTables。overwriteTable(tableId, original.columns(), original.primaryKeyColumnNames());} consumeRemainingStatement(开始);debugParsed(开始);返回;}如果(onlyIfNotExists && 开云体育电动老虎机databastables . fortable (tableId) !=零){//这个表是存在的,所以我们什么都不做…consumeRemainingStatement(开始);debugParsed(开始);返回;} TableEditor table = 开云体育电动老虎机databaseTables.editOrCreateTable(tableId);// create_definition…如果(tokens.matches ('('parseCreateDefinitionList(start, table);// table_options…parseTableOptions(开始、表);// partition_options…如果(tokens.matches ("分区")) {parsePartitionOptions(start, table);}/ / select_statement如果(tokens.canConsume ("作为"|| token . canconsume ("忽略","作为"|| token . canconsume ("取代","作为")) {parseAsSelectStatement(开始,表);}//更新表定义…开云体育电动老虎机databaseTables.overwriteTable (table.create ());debugParsed(开始);}
此方法尝试镜像MySQL创建表
语法规则,以:
创建(临时)表格[如果不存在tbl_name (create_definition,…)[table_options] [partition_options]创建(临时)表格[如果不存在[tbl_name [(create_definition,…)][table_options] [partition_options] select_statement .创建(临时)表格[如果不存在tbl_name {就像Old_tbl_name | (就像Old_tbl_name} create_definition:…
的创建
文字在我们之前就已经被消费了parseCreateTable
开始,因此它首先尝试使用临时
字面值(如果可用)表格
文字,如果不存在
片段(如果可用),然后使用并解析表的限定名。如果语句中包含像otherTable
,它使用开云体育电动老虎机databasetable
(这是指我们的表
对象)来用引用表的定义覆盖命名表的定义。否则,它为新表获取一个编辑器,然后(像语法规则一样)解析一个列表create_definition片段,然后是table_options,partition_options,并可能select_statement.
看看完整的MySqlDdlParser类查看更多细节。
总结
这篇文章详细介绍了为什么MySQL连接器在binlog中使用DDL语句,尽管我们只触及了表面如何连接器使用它的框架进行DDL解析,以及如何在未来的其他DBMS方言的解析器中重用它。
试试我们教程查看MySQL连接器的实际操作,并继续关注更多连接器、版本和新闻。
关于Debe开云体育官方注册网址zium
开云体育官方注册网址Debezium是一个开源的分布式平台,它将现有数据库转换为事件流,因此应用程序几乎可以立即看到并响应数据库中提交的每一个行级更改。开云体育电动老虎机开云体育官方注册网址Debezium是建立在卡夫卡并提供卡夫卡连接监控特定数据库管理系统的兼容连接器。开云体育电动老虎机开云体育官方注册网址Debezium在Kafka日志中记录了数据更改的历史,所以你的应用程序可以在任何时候停止和重新启动,并且可以很容易地使用它没有运行时错过的所有事件,确保所有事件都被正确和完整地处理。开云体育官方注册网址Debezium是开源下Apache许可证,版本2.0.
参与
我们希望您觉得Debezium有趣开云体育官方注册网址且有用,并愿意尝试一下。在Twitter上关注我们@开云体育官方注册网址debezium,在Zulip上和我们聊天,或加入我们的邮件列表与社区对话。所有的代码都是开源的GitHub上,因此在本地构建代码并帮助我们改进现有连接器并添加更多连接器。如果您发现了问题或对我们如何改进Debezium有想法,请告诉我们开云体育官方注册网址记录问题.