详解 Calcite JDBC查询流程的实现
ztj100 2024-10-28 21:10 27 浏览 0 评论
导读:这篇文章会详细分析Calcite JDBC查询的整个流程实现,理解了这整个流程就对Calcite的核心功能和Calcite能做什么都会有深入的了解。
@Test
public void singleSourceTest() throws SQLException {
Properties config = new Properties();
config.put("model", TestUtil.resourcePath("singleSource.json"));
config.put("lex", "MYSQL");
config.put("forceDecorrelate", "false");
config.put("caseSensitive", "false");
String sql = "select s.name,c.name from student as s join colleage as c on s.cid = c.id";
try (Connection con = DriverManager.getConnection("jdbc:calcite:", config)) {
try (Statement stmt = con.createStatement()) {
try (ResultSet rs = stmt.executeQuery(sql)) {
printRs(rs);
}
}
}
}
Calcite JDBC的实现类
Calcite不仅实现了它的核心功能,也为开发者提供了一个称为Avatica的构建JDBC和ODBC Driver的框架,如果希望实现自己的JDBC Driver可以详细学习一下这个框架。 从上图可知,Calcite core模块中的JDBC的类实现都是基于Avatica框架。我们在使用Calcite进行JDBC查询时Connection,Statement和ResultSet实例其实就是CalciteJdbc41Connection,CalciteJdbc41Statement,CalciteResultSet。
Calcite的JDBC Driver
在使用Calcite的JDBC Driver时,我们无需手动注册。如下图所示,Calcite核心模块会利用SPI方式注册。
关于 SPI 可参考之前的一篇文章 详解 SPI 机制——ServiceLoader.load
Avatica框架在创建JDBC的接口实例的时候要使用一个工场类来创建对应的实例。Calcite的Driver实现中通过getFactoryClassName方法获得这个工场类。
@Override protected String getFactoryClassName(JdbcVersion jdbcVersion) {
switch (jdbcVersion) {
case JDBC_30:
case JDBC_40:
throw new IllegalArgumentException("JDBC version not supported: "
+ jdbcVersion);
case JDBC_41:
default:
return "org.apache.calcite.jdbc.CalciteJdbc41Factory";
}
}
如下图所示,通过CalciteJdbc41Factory提供的方法,可以获得Connection, Statement, ResultSet等实例对象。
获取Calcite的JDBC Conection
当我们通过DriverManager获取Calcite的Connection的时候,大致过程如下:
DriverManager会调用Calcite的Driver的connect方法来创建Calcite Connection实例:
public Connection connect(String url, Properties info) throws SQLException {
if (!acceptsURL(url)) {
return null;
}
final String prefix = getConnectStringPrefix();
assert url.startsWith(prefix);
final String urlSuffix = url.substring(prefix.length());
final Properties info2 = ConnectStringParser.parse(urlSuffix, info);
final AvaticaConnection connection =
factory.newConnection(this, factory, url, info2);
handler.onConnectionInit(connection);
return connection;
}
上面的connect方法实现中,Calcite的Connection是通过前面说的CalciteJdbc41Factory创建的,Connection实例的类是CalciteJdbc41Connection。 另外在返回Connection之前,Calcite还会根据配置的model来解析model格式(json或yaml)生成JdbcSchema。
{
"defaultSchema": "db1",
"schemas": [
{
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"name": "db1",
"operand": {
"jdbcDriver": "com.mysql.cj.jdbc.Driver",
"jdbcPassword": "changeme",
"jdbcUrl": "jdbc:mysql://localhost:3306/test",
"jdbcUser": "root"
},
"type": "custom"
}
],
"version": "1.0"
}
JdbcSchema在创建的过程主要是根据model中jdbc的信息建立与后端数据库的连接池。
这里补充一下,在 Calcite 中 Schema 可以理解为 MySQL 中的库, Schema 中拥有多个 Table 则与 MySQL 库存多表一致
创建Statement
当我们调用connection的createStatement方法创建Statement实例的时候,Calcite的执行流程如上图。整个过程也很简单,Caclite的connection对象也是委托CalciteJdbc41Factory工场类来创建Statement对象,这个Statement是CalciteJdbc41Statement实例。
执行executeQuery方法创建ResultSet过程
当我们获得Statement对象后,调用executeQuery(sql)方法时。Calcite在这一步的时候,内部会做很多事情。这里也是整个Calcite JDBC调用工作最核心的地方。 在开始分析这一步的工作之前,假设后端数据库是mysql,要查询的sql如下:
SELECT "ENAME", "EMPNO"
FROM "SCOTT"."EMP"
ORDER BY "EMPNO" NULLS LAST
在获得ResultSet对象之前,executeQuery方法内部的执行过程大致分为下面几个阶段:
- SQL 解析:calcite首先会使用它实现的SqlParser解析sql字符串,生成SqlNode为节点的AST(抽象语法树)。
- 生成逻辑关系表达式:得到SqlNode的AST后,Calcite会使用SqlToRelConverter将SqlNode转换为RelNode组成的逻辑关系表达式树。
- 优化:Calcite会使用planner(优化器)优化逻辑关系表达式,生成最优的物理关系表达式树(带有具体的算法的算子)。
- 生成 Linq4j 表达式:在获得物理关系表达式后,Calcite的EnumerableRelImplentor可以通过物理关系表达式树转换为Linq4j表达式树,通过Linq4j表达式树可以生成一个Java Class。这个Java Class的实例知道如何与后端数据库进行JDBC连接,并且如果是多数据源查询并且需要join、sort、filter等,它也知道这些操作要如何处理。
- 编译:面生成的Java Class会通过运行时编译(Calcite使用Janino编译器),编译后生成对象实例,这个实例实现了Iterator接口。会保存在要返回的ResultSet中,后面ResultSet调用相应的方法取数据时,最终是调用这个实例对象获取数据
下图较详细的显示了整个executeQuery的流程。
Sql解析
final CalciteConnectionConfig config = context.config();
final SqlParser.ConfigBuilder parserConfig = createParserConfig()
.setQuotedCasing(config.quotedCasing())
.setUnquotedCasing(config.unquotedCasing())
.setQuoting(config.quoting())
.setConformance(config.conformance())
.setCaseSensitive(config.caseSensitive());
final SqlParserImplFactory parserFactory =
config.parserFactory(SqlParserImplFactory.class, null);
if (parserFactory != null) {
parserConfig.setParserFactory(parserFactory);
}
SqlParser parser = createParser(query.sql, parserConfig);
SqlNode sqlNode;
try {
sqlNode = parser.parseStmt();
statementType = getStatementType(sqlNode.getKind());
} catch (SqlParseException e) {
throw new RuntimeException(
"parse failed: " + e.getMessage(), e);
}
/** Factory method for SQL parser with a given configuration. */
protected SqlParser createParser(String sql,
SqlParser.ConfigBuilder parserConfig) {
return SqlParser.create(sql, parserConfig.build());
}
/** Factory method for SQL parser configuration. */
protected SqlParser.ConfigBuilder createParserConfig() {
return SqlParser.configBuilder();
}
上面的代码片段显示了Calcite的JDBC如何创建SqlParser,并且通过SqlParser将sql转换成SqlNode的AST,Calcite的Parser是通过JavaCC实现的,要如何扩展Parser可以参考官方文档。如果需要单独使用Calcite的SqlParser可以按照上面的方式获得。当例子的sql经过解析后,变为如下结构:
SqlValidator验证
Calcite在获得SqlNode之后要将它转换为RelNode树,用关系代数来表示查询过程。在转换之前,还会使用SqlValidator验证SQL中的标识对象是否合法,比如表是否存在,字段是否存在,数据类型等是否正确。SqlVlidator验证需要的数据是通过CalciteCatalogReader去后端数据库获取的。
private SqlValidator createSqlValidator(Context context,
CalciteCatalogReader catalogReader) {
final SqlOperatorTable opTab0 =
context.config().fun(SqlOperatorTable.class,
SqlStdOperatorTable.instance());
final SqlOperatorTable opTab =
ChainedSqlOperatorTable.of(opTab0, catalogReader);
final JavaTypeFactory typeFactory = context.getTypeFactory();
final CalciteConnectionConfig connectionConfig = context.config();
final SqlValidator.Config config = SqlValidator.Config.DEFAULT
.withLenientOperatorLookup(connectionConfig.lenientOperatorLookup())
.withSqlConformance(connectionConfig.conformance())
.withDefaultNullCollation(connectionConfig.defaultNullCollation())
.withIdentifierExpansion(true);
return new CalciteSqlValidator(opTab, catalogReader, typeFactory,
config);
}
SqlNode转换为RelNode
final SqlToRelConverter.ConfigBuilder builder =
SqlToRelConverter.configBuilder()
.withTrimUnusedFields(true)
.withExpand(THREAD_EXPAND.get())
.withExplain(sqlQuery.getKind() == SqlKind.EXPLAIN);
final SqlToRelConverter sqlToRelConverter =
getSqlToRelConverter(validator, catalogReader, builder.build());
. . . . . . .
RelRoot root =
sqlToRelConverter.convertQuery(sqlQuery, needsValidation, true);
经过SqlToRelConverter转换后,生成下面形式的RelNode树(也可称为逻辑图)。目前生成的RelNode树中都是逻辑表达式节点,它们只用于表示SQL的关系表达式形式而不能真正的执行,因为它们都没有具体算子的实现算法。
以 “select c.* from flink_k8s.k8s_cluster c inner join tw_datahub_test.v2_dwd_game_uid_reg_log reg on reg.uid=c.id where c.id=1 and reg.uid=21792015” 这条多表 join 复杂语句为例,在控制台中打印 RelNode 树类似于如下:
10:37:55.005 [main] DEBUG org.apache.calcite.sql2rel - Plan after converting SqlNode to RelNode
LogicalProject(id=[$0], name=[$1], cluster_master_url=[$2], cluster_kube_config=[$3], cluster_properties=[$4], type=[$5], status=[$6], delete_status=[$7], creator_uid=[$8], created_at=[$9], updated_at=[$10], deleted_at=[$11], description=[$12])
LogicalFilter(condition=[AND(=($0, 1), =($13, 21792015))])
LogicalJoin(condition=[=($13, $0)], joinType=[inner])
JdbcTableScan(table=[[flink_k8s, k8s_cluster]])
JdbcTableScan(table=[[tw_datahub_test, v2_dwd_game_uid_reg_log]])
10:37:55.048 [main] DEBUG org.apache.calcite.sql2rel - Plan after trimming unused fields
LogicalProject(id=[$0], name=[$1], cluster_master_url=[$2], cluster_kube_config=[$3], cluster_properties=[$4], type=[$5], status=[$6], delete_status=[$7], creator_uid=[$8], created_at=[$9], updated_at=[$10], deleted_at=[$11], description=[$12])
LogicalFilter(condition=[AND(=($0, 1), =($13, 21792015))])
LogicalJoin(condition=[=($13, $0)], joinType=[inner])
JdbcTableScan(table=[[flink_k8s, k8s_cluster]])
LogicalProject(uid=[$0])
JdbcTableScan(table=[[tw_datahub_test, v2_dwd_game_uid_reg_log]])
Planner 优化 RelNode 树
在优化阶段Calcite会创建优化器,优化器会根据定义好好的 Rule 对前面生成的逻辑 RelNode 树进行优化,生成物理表达式树(物理图)。目前Calcite实现了两个优化器:HepPlanner和VolcanoPlanner。
- HepPlanner是基于规则的启发式优化器
- Volcano是基于代价的优化器。
在JDBC查询过程中Calcite使用的是VolcanoPlanner。通过优化器优化后RelNode树变成下面形式。
JdbcToEnumerableConverter可以通过它的子节点生成要下推到后端数据库的SQL字符串。如果是多个数据源join查询时,比如下面的例子:
SELECT s.name,c.name
FROM db1.student AS s join db2.colleage AS c on s.cid = c.id
优化后的RelNode树如下所示,有两个JdbcToEnumerableConverter分别生成对应数据源的查询sql,返回的数据通过EnumerableHashJoin算子的算法join起来,形成例子中单条sql查询语句的结果。
生成Java Plan
通过优化获得了Jdbc的物理表达式树(物理图)后,要将这些表达式生成Java代码后才能真正将执行计划跑起来。在生成Java代码之前,将Jdbc的物理表达式的根节点变为EnumerableCalc,JdbcToEnumerableConverter变为它的子节点。
由上图可知,EnumerableCalc和JdbcToEnumerableConverter都实现了EnumerableRel接口,所以它们可以连接起来,并且都实现了implement方法。 Calcaite如何通过这些物理关系表达式算子生成可执行的Java代码?在EnumerableRel的每个算子的implement方法中会将这个算子要实现的算法写成Linq4j的表达式,然后通过这些Linq4j表达式生成Java Class。
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
final BlockBuilder builder0 = new BlockBuilder(false);
final JdbcRel child = (JdbcRel) getInput();
. . . . . .
final JdbcConvention jdbcConvention =
(JdbcConvention) child.getConvention();
SqlString sqlString = generateSql(jdbcConvention.dialect);
String sql = sqlString.getSql();
. . . . . .
final Expression sql_ =
builder0.append("sql", Expressions.constant(sql));
. . . . ..
}
比如上面的代码块是 JdbcToEnumerableConverter 的 implement方法的一小块,implememnt 中都是像上面的代码一样生成linq4j的表达式,linq4j有很多不同的表达式代表一个java程序的不同元素,正确的生成这些元素并且拼装起来可以生成正确的java代码。所以上一小节生成的物理表达式树的每个节点的implemnt被调用后就会生成linq4j表达式树,根据linq4j能生成可执行的java代码:
public static Bindable toBindable(Map<String, Object> parameters,
CalcitePrepare.SparkHandler spark, EnumerableRel rel,
EnumerableRel.Prefer prefer) {
EnumerableRelImplementor relImplementor =
new EnumerableRelImplementor(rel.getCluster().getRexBuilder(),
parameters);
final ClassDeclaration expr = relImplementor.implementRoot(rel, prefer);
String s = Expressions.toString(expr.memberDeclarations, "\n", false);
......
}
上面的代码是EnumerableInterpretable的toBindable方法的片段,jdbc流程中在获得上一小节讲解的物理表达式树后,经过上面的代码生成一个EnumerableRelImplementor对象,通过调用它的implementRoot方法会调用物理表达式树根节点(这里是EnumerableCalc)的implement方法,父节点又会迭代调用子节点的implement方法。最终返会获得生成的linq4j表达式树。通过上面代码片段的Expressions.toString方法可以生成java代码字符串,比如如下就是生成的Java类:
public class Baz implements org.apache.calcite.runtime.Bindable {
public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {
//of方法调用ResultSetEnumerable的构造方法
//ResultSetEnumerable对象保存of的参数:后端数据库连接池,sql查询字符串,行映射方法
final org.apache.calcite.runtime.ResultSetEnumerable enumerable =
org.apache.calcite.runtime.ResultSetEnumerable.of(
(javax.sql.DataSource) root.getRootSchema().getSubSchema("SCOTT")
.unwrap(javax.sql.DataSource.class),
"SELECT \"ENAME\", \"EMPNO\"\nFROM \"SCOTT\".\"EMP\"\nORDER BY \"EMPNO\" NULLS LAST",
new org.apache.calcite.linq4j.function.Function1() {
public org.apache.calcite.linq4j.function.Function0 apply(
final java.sql.ResultSet resultSet) {
return new org.apache.calcite.linq4j.function.Function0() {
public Object apply() {
try {
final Object[] values = new Object[2];
values[0] = resultSet.getObject(1);
values[1] = resultSet.getShort(2);
if (resultSet.wasNull()) {
values[1] = null;
}
return values;
} catch (java.sql.SQLException e) {
throw new RuntimeException(e);
}
}
};
}
public Object apply(final Object resultSet) {
return apply((java.sql.ResultSet) resultSet);
}
});
//设置jdbc查询超时时间
enumerable.setTimeout(root);
return new org.apache.calcite.linq4j.AbstractEnumerable() {
public org.apache.calcite.linq4j.Enumerator<String> enumerator() {
return new org.apache.calcite.linq4j.Enumerator<String>() {
//内部使用上面生成的ResultSetEnumerable生成的枚举器
public final org.apache.calcite.linq4j.Enumerator<Object[]> inputEnumerator =
enumerable.enumerator();
.............
};
}
};
}
public Class getElementType() {
return java.lang.String.class;
}
}
得到生成的java代码后,Calcite会调用Janino编译器动态编译这个java类,并且实例化这个类的一个对象。 后面在创建CalciteResultSet的时候会调用这个对象的bind方法,这个方法返回一个Eumerable对象,通过这个对象可以迭代JDBC查询的结果。这个Eumerable对象的实际工作是委托给ResultSetEnumerable的enumerator()方法生成的枚举器实现的。
public Enumerator<T> enumerator() {
if (preparedStatementEnricher == null) {
return enumeratorBasedOnStatement();
} else {
return enumeratorBasedOnPreparedStatement();
}
}
private Enumerator<T> enumeratorBasedOnStatement() {
Connection connection = null;
Statement statement = null;
try {
connection = dataSource.getConnection(); //获取实际数据库的jdbc连接
statement = connection.createStatement(); //生成jdbc statement
setTimeoutIfPossible(statement);
if (statement.execute(sql)) {
final ResultSet resultSet = statement.getResultSet(); //执行sql获得实际的ResultSet
statement = null;
connection = null;
return new ResultSetEnumerator<>(resultSet, rowBuilderFactory); //用ResultSetEnumerator对象包装ResultSet
} else {
Integer updateCount = statement.getUpdateCount();
return Linq4j.singletonEnumerator((T) updateCount);
}
} catch (SQLException e) {
throw Static.RESOURCE.exceptionWhilePerformingQueryOnJdbcSubSchema(sql)
.ex(e);
} finally {
closeIfPossible(connection, statement);
}
}
通过上面ResultSetEnumerable的enumerator()方法实现代码可以知道,在生成枚举器的时候就开始执行真正的数据库查询,获得实际的ResultSet,用ResultSetEnumerator包装起来,通过ResultSetEnumerator就能操作ResultSet。 在executeSql方法的最后,会返回CalciteResultSet,它与实际的ResultSet关系如下:
使用ResultSet
经过执行statement的executeSql方法获得CalciteResultSet后,就可以使用next方法获取查询数据。CalciteResultSet的next方法实现如下:
public boolean next() throws SQLException {
// TODO: for timeout, see IteratorResultSet.next
checkOpen();
if (null != statement && statement.cancelFlag.get()) {
throw AvaticaConnection.HELPER.createException("Statement canceled");
}
if (cursor.next()) { //调用cursor的next方法,cursor是IteraterCursor
++row;
beforeFirst = false;
return true;
} else {
row = 0;
afterLast = true;
return false;
}
}
CalciteResultSet的next方法会调用cursor的next方法。cursor又会调用Enumerable的next方法,这样一层层最后调用实际ResultSet对象的next方法。对于其他的ResultSet方法实现也大致一样。
最后
以上就是Calcite JDBC的实现流程,通过理解以上流程可以更好地理解Calcite的核心功能是如何使用的。
感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
原文地址:https://zhuanlan.zhihu.com/p/144170868
相关推荐
- 如何将数据仓库迁移到阿里云 AnalyticDB for PostgreSQL
-
阿里云AnalyticDBforPostgreSQL(以下简称ADBPG,即原HybridDBforPostgreSQL)为基于PostgreSQL内核的MPP架构的实时数据仓库服务,可以...
- Python数据分析:探索性分析
-
写在前面如果你忘记了前面的文章,可以看看加深印象:Python数据处理...
- C++基础语法梳理:算法丨十大排序算法(二)
-
本期是C++基础语法分享的第十六节,今天给大家来梳理一下十大排序算法后五个!归并排序...
- C 语言的标准库有哪些
-
C语言的标准库并不是一个单一的实体,而是由一系列头文件(headerfiles)组成的集合。每个头文件声明了一组相关的函数、宏、类型和常量。程序员通过在代码中使用#include<...
- [深度学习] ncnn安装和调用基础教程
-
1介绍ncnn是腾讯开发的一个为手机端极致优化的高性能神经网络前向计算框架,无第三方依赖,跨平台,但是通常都需要protobuf和opencv。ncnn目前已在腾讯多款应用中使用,如QQ,Qzon...
- 用rust实现经典的冒泡排序和快速排序
-
1.假设待排序数组如下letmutarr=[5,3,8,4,2,7,1];...
- ncnn+PPYOLOv2首次结合!全网最详细代码解读来了
-
编辑:好困LRS【新智元导读】今天给大家安利一个宝藏仓库miemiedetection,该仓库集合了PPYOLO、PPYOLOv2、PPYOLOE三个算法pytorch实现三合一,其中的PPYOL...
- C++特性使用建议
-
1.引用参数使用引用替代指针且所有不变的引用参数必须加上const。在C语言中,如果函数需要修改变量的值,参数必须为指针,如...
- Qt4/5升级到Qt6吐血经验总结V202308
-
00:直观总结增加了很多轮子,同时原有模块拆分的也更细致,估计为了方便拓展个管理。把一些过度封装的东西移除了(比如同样的功能有多个函数),保证了只有一个函数执行该功能。把一些Qt5中兼容Qt4的方法废...
- 到底什么是C++11新特性,请看下文
-
C++11是一个比较大的更新,引入了很多新特性,以下是对这些特性的详细解释,帮助您快速理解C++11的内容1.自动类型推导(auto和decltype)...
- 掌握C++11这些特性,代码简洁性、安全性和性能轻松跃升!
-
C++11(又称C++0x)是C++编程语言的一次重大更新,引入了许多新特性,显著提升了代码简洁性、安全性和性能。以下是主要特性的分类介绍及示例:一、核心语言特性1.自动类型推导(auto)编译器自...
- 经典算法——凸包算法
-
凸包算法(ConvexHull)一、概念与问题描述凸包是指在平面上给定一组点,找到包含这些点的最小面积或最小周长的凸多边形。这个多边形没有任何内凹部分,即从一个多边形内的任意一点画一条线到多边形边界...
- 一起学习c++11——c++11中的新增的容器
-
c++11新增的容器1:array当时的初衷是希望提供一个在栈上分配的,定长数组,而且可以使用stl中的模板算法。array的用法如下:#include<string>#includ...
- C++ 编程中的一些最佳实践
-
1.遵循代码简洁原则尽量避免冗余代码,通过模块化设计、清晰的命名和良好的结构,让代码更易于阅读和维护...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- node卸载 (33)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- exceptionininitializererror (33)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)