From fd8eba36a8d03d7ce7b9566ebc17220d728f9fd5 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Wed, 30 Oct 2024 17:16:46 +0800 Subject: [PATCH] refactor: make use of the "pre_execute" in sql execution interceptor (#4875) * feat: dynamic definition of plugin options * rebase * revert * fix ci --- Cargo.lock | 1 - Cargo.toml | 5 +- src/client/Cargo.toml | 1 - src/cmd/Cargo.toml | 2 +- src/cmd/src/cli/repl.rs | 2 +- src/cmd/src/standalone.rs | 14 +++- src/common/meta/Cargo.toml | 2 +- src/common/telemetry/Cargo.toml | 4 +- src/flow/src/df_optimizer.rs | 2 +- src/flow/src/transform.rs | 4 +- src/frontend/src/error.rs | 13 +++- src/frontend/src/instance.rs | 54 +++++++++++---- src/meta-client/Cargo.toml | 2 +- src/meta-srv/Cargo.toml | 4 +- src/operator/src/statement.rs | 17 +++-- src/operator/src/statement/ddl.rs | 2 +- src/operator/src/statement/tql.rs | 2 +- src/query/src/datafusion.rs | 6 +- src/query/src/planner.rs | 10 +-- src/query/src/promql/planner.rs | 76 +++++++++------------- src/query/src/range_select/plan_rewrite.rs | 2 +- src/query/src/tests.rs | 2 +- src/query/src/tests/query_engine_test.rs | 4 +- src/script/src/python/engine.rs | 2 +- src/script/src/python/ffi_types/copr.rs | 2 +- src/servers/tests/mod.rs | 4 +- tests-integration/src/grpc.rs | 2 +- tests-integration/src/instance.rs | 2 +- 28 files changed, 141 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 264ceb2de2..26554d7f81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1788,7 +1788,6 @@ dependencies = [ "tokio-stream", "tonic 0.11.0", "tracing", - "tracing-subscriber", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d89289b304..2e7f70d2ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -180,13 +180,16 @@ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "5 ] } strum = { version = "0.25", features = ["derive"] } tempfile = "3" -tokio = { version = "1.36", features = ["full"] } +tokio = { version = "1.40", features = ["full"] } tokio-postgres = "0.7" tokio-stream = { version = "0.1" } tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.8.8" tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] } tower = { version = "0.4" } +tracing-appender = "0.2" +tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] } +typetag = "0.2" uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] } zstd = "0.13" diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 501c2f82d8..9d198ab9fb 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -45,7 +45,6 @@ common-grpc-expr.workspace = true datanode.workspace = true derive-new = "0.5" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies.substrait_proto] package = "substrait" diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 501f40a047..6309d64294 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -78,7 +78,7 @@ table.workspace = true tokio.workspace = true toml.workspace = true tonic.workspace = true -tracing-appender = "0.2" +tracing-appender.workspace = true [target.'cfg(not(windows))'.dependencies] tikv-jemallocator = "0.6" diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 0138471227..0f8e5b0450 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -174,7 +174,7 @@ impl Repl { let plan = query_engine .planner() - .plan(stmt, query_ctx.clone()) + .plan(&stmt, query_ctx.clone()) .await .context(PlanStatementSnafu)?; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index a72aea3c9a..54d6e4d72c 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::net::SocketAddr; use std::sync::Arc; use std::{fs, path}; @@ -250,6 +251,13 @@ pub struct Instance { _guard: Vec, } +impl Instance { + /// Find the socket addr of a server by its `name`. + pub async fn server_addr(&self, name: &str) -> Option { + self.frontend.server_handlers().addr(name).await + } +} + #[async_trait] impl App for Instance { fn name(&self) -> &str { @@ -340,7 +348,8 @@ pub struct StartCommand { } impl StartCommand { - fn load_options( + /// Load the GreptimeDB options from various sources (command line, config file or env). + pub fn load_options( &self, global_options: &GlobalOptions, ) -> Result> { @@ -430,7 +439,8 @@ impl StartCommand { #[allow(unreachable_code)] #[allow(unused_variables)] #[allow(clippy::diverging_sub_expression)] - async fn build(&self, opts: GreptimeOptions) -> Result { + /// Build GreptimeDB instance with the loaded options. + pub async fn build(&self, opts: GreptimeOptions) -> Result { common_runtime::init_global_runtimes(&opts.runtime); let guard = common_telemetry::init_global_logging( diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 591055d472..d165a403f5 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -60,7 +60,7 @@ table.workspace = true tokio.workspace = true tokio-postgres = { workspace = true, optional = true } tonic.workspace = true -typetag = "0.2" +typetag.workspace = true [dev-dependencies] chrono.workspace = true diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index da044436c8..0d5d188a06 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -32,7 +32,7 @@ serde.workspace = true serde_json.workspace = true tokio.workspace = true tracing = "0.1" -tracing-appender = "0.2" +tracing-appender.workspace = true tracing-log = "0.1" tracing-opentelemetry = "0.22.0" -tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] } +tracing-subscriber.workspace = true diff --git a/src/flow/src/df_optimizer.rs b/src/flow/src/df_optimizer.rs index 0b788a79db..bb296cba70 100644 --- a/src/flow/src/df_optimizer.rs +++ b/src/flow/src/df_optimizer.rs @@ -106,7 +106,7 @@ pub async fn sql_to_flow_plan( .context(ExternalSnafu)?; let plan = engine .planner() - .plan(stmt, query_ctx) + .plan(&stmt, query_ctx) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index e33b023560..94878115cf 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -278,7 +278,7 @@ mod test { let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); let plan = engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .unwrap(); let plan = apply_df_optimizer(plan).await.unwrap(); @@ -300,7 +300,7 @@ mod test { let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); let plan = engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .unwrap(); let plan = apply_df_optimizer(plan).await; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 742deaf808..c6e7218a38 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -313,6 +313,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to init plugin"))] + // this comment is to bypass the unused snafu check in "check-snafu.py" + InitPlugin { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -375,8 +383,9 @@ impl ErrorExt for Error { | Error::ExecLogicalPlan { source, .. } => source.status_code(), Error::InvokeRegionServer { source, .. } => source.status_code(), - - Error::External { source, .. } => source.status_code(), + Error::External { source, .. } | Error::InitPlugin { source, .. } => { + source.status_code() + } Error::FindTableRoute { source, .. } => source.status_code(), #[cfg(feature = "python")] diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index fc46ae9e59..bd9f8959fe 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -225,11 +225,45 @@ impl Instance { async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { check_permission(self.plugins.clone(), &stmt, &query_ctx)?; - let stmt = QueryStatement::Sql(stmt); - self.statement_executor - .execute_stmt(stmt, query_ctx) - .await - .context(TableOperationSnafu) + let query_interceptor = self.plugins.get::>(); + let query_interceptor = query_interceptor.as_ref(); + + let output = match stmt { + Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { + let stmt = QueryStatement::Sql(stmt); + let plan = self + .statement_executor + .plan(&stmt, query_ctx.clone()) + .await?; + + let QueryStatement::Sql(stmt) = stmt else { + unreachable!() + }; + query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?; + + self.statement_executor.exec_plan(plan, query_ctx).await + } + Statement::Tql(tql) => { + let plan = self + .statement_executor + .plan_tql(tql.clone(), &query_ctx) + .await?; + + query_interceptor.pre_execute( + &Statement::Tql(tql), + Some(&plan), + query_ctx.clone(), + )?; + + self.statement_executor.exec_plan(plan, query_ctx).await + } + _ => { + query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?; + + self.statement_executor.execute_sql(stmt, query_ctx).await + } + }; + output.context(TableOperationSnafu) } } @@ -255,14 +289,6 @@ impl SqlQueryHandler for Instance { Ok(stmts) => { let mut results = Vec::with_capacity(stmts.len()); for stmt in stmts { - // TODO(sunng87): figure out at which stage we can call - // this hook after ArrowFlight adoption. We need to provide - // LogicalPlan as to this hook. - if let Err(e) = query_interceptor.pre_execute(&stmt, None, query_ctx.clone()) { - results.push(Err(e)); - break; - } - if let Err(e) = checker .check_permission( query_ctx.current_user(), @@ -341,7 +367,7 @@ impl SqlQueryHandler for Instance { let plan = self .query_engine .planner() - .plan(QueryStatement::Sql(stmt), query_ctx.clone()) + .plan(&QueryStatement::Sql(stmt), query_ctx.clone()) .await .context(PlanStatementSnafu)?; self.query_engine diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index c0cb0d3e89..7304274499 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -29,4 +29,4 @@ futures = "0.3" meta-srv = { workspace = true, features = ["mock"] } tower.workspace = true tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber.workspace = true diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 6d051c2eee..d436867582 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -59,7 +59,7 @@ tokio-stream = { workspace = true, features = ["net"] } toml.workspace = true tonic.workspace = true tower.workspace = true -typetag = "0.2" +typetag.workspace = true url = "2.3" [dev-dependencies] @@ -69,4 +69,4 @@ common-meta = { workspace = true, features = ["testing"] } common-procedure-test.workspace = true session.workspace = true tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber.workspace = true diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 033bd14e9c..b9063781fe 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -363,7 +363,7 @@ impl StatementExecutor { pub async fn plan( &self, - stmt: QueryStatement, + stmt: &QueryStatement, query_ctx: QueryContextRef, ) -> Result { self.query_engine @@ -373,6 +373,14 @@ impl StatementExecutor { .context(PlanStatementSnafu) } + /// Execute [`LogicalPlan`] directly. + pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { + self.query_engine + .execute(plan, query_ctx) + .await + .context(ExecLogicalPlanSnafu) + } + pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result { self.query_engine .planner() @@ -382,11 +390,8 @@ impl StatementExecutor { #[tracing::instrument(skip_all)] async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { - let plan = self.plan(stmt, query_ctx.clone()).await?; - self.query_engine - .execute(plan, query_ctx) - .await - .context(ExecLogicalPlanSnafu) + let plan = self.plan(&stmt, query_ctx.clone()).await?; + self.exec_plan(plan, query_ctx).await } async fn get_table(&self, table_ref: &TableReference<'_>) -> Result { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index af0f6883d8..cf10a45652 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -391,7 +391,7 @@ impl StatementExecutor { let logical_plan = match &*create_view.query { Statement::Query(query) => { self.plan( - QueryStatement::Sql(Statement::Query(query.clone())), + &QueryStatement::Sql(Statement::Query(query.clone())), ctx.clone(), ) .await? diff --git a/src/operator/src/statement/tql.rs b/src/operator/src/statement/tql.rs index 008aba0d78..5a5d3808e7 100644 --- a/src/operator/src/statement/tql.rs +++ b/src/operator/src/statement/tql.rs @@ -90,7 +90,7 @@ impl StatementExecutor { }; self.query_engine .planner() - .plan(stmt, query_ctx.clone()) + .plan(&stmt, query_ctx.clone()) .await .context(PlanStatementSnafu) } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 888ebbba83..f295a2c9b3 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -570,7 +570,7 @@ mod tests { let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); let plan = engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .unwrap(); @@ -592,7 +592,7 @@ mod tests { let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); let plan = engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .unwrap(); @@ -671,7 +671,7 @@ mod tests { let plan = engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .unwrap(); diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 0f4f74133a..29a0a364ea 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -39,7 +39,7 @@ use crate::{DfContextProviderAdapter, QueryEngineContext}; #[async_trait] pub trait LogicalPlanner: Send + Sync { - async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result; + async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result; fn optimize(&self, plan: LogicalPlan) -> Result; @@ -61,8 +61,8 @@ impl DfLogicalPlanner { } #[tracing::instrument(skip_all)] - async fn plan_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { - let df_stmt = (&stmt).try_into().context(SqlSnafu)?; + async fn plan_sql(&self, stmt: &Statement, query_ctx: QueryContextRef) -> Result { + let df_stmt = stmt.try_into().context(SqlSnafu)?; let table_provider = DfTableSourceProvider::new( self.engine_state.catalog_manager().clone(), @@ -142,7 +142,7 @@ impl DfLogicalPlanner { } #[tracing::instrument(skip_all)] - async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result { + async fn plan_pql(&self, stmt: &EvalStmt, query_ctx: QueryContextRef) -> Result { let plan_decoder = Arc::new(DefaultPlanDecoder::new( self.session_state.clone(), &query_ctx, @@ -175,7 +175,7 @@ impl DfLogicalPlanner { #[async_trait] impl LogicalPlanner for DfLogicalPlanner { #[tracing::instrument(skip_all)] - async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { + async fn plan(&self, stmt: &QueryStatement, query_ctx: QueryContextRef) -> Result { match stmt { QueryStatement::Sql(stmt) => self.plan_sql(stmt, query_ctx).await, QueryStatement::Promql(stmt) => self.plan_pql(stmt, query_ctx).await, diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index bafd69aa95..001e41ca99 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -157,29 +157,29 @@ pub struct PromPlanner { impl PromPlanner { pub async fn stmt_to_plan( table_provider: DfTableSourceProvider, - stmt: EvalStmt, + stmt: &EvalStmt, session_state: &SessionState, ) -> Result { let mut planner = Self { table_provider, - ctx: PromPlannerContext::from_eval_stmt(&stmt), + ctx: PromPlannerContext::from_eval_stmt(stmt), }; - planner.prom_expr_to_plan(stmt.expr, session_state).await + planner.prom_expr_to_plan(&stmt.expr, session_state).await } #[async_recursion] pub async fn prom_expr_to_plan( &mut self, - prom_expr: PromExpr, + prom_expr: &PromExpr, session_state: &SessionState, ) -> Result { - let res = match &prom_expr { + let res = match prom_expr { PromExpr::Aggregate(expr) => self.prom_aggr_expr_to_plan(session_state, expr).await?, PromExpr::Unary(expr) => self.prom_unary_expr_to_plan(session_state, expr).await?, PromExpr::Binary(expr) => self.prom_binary_expr_to_plan(session_state, expr).await?, PromExpr::Paren(ParenExpr { expr }) => { - self.prom_expr_to_plan(*expr.clone(), session_state).await? + self.prom_expr_to_plan(expr, session_state).await? } PromExpr::Subquery(SubqueryExpr { .. }) => UnsupportedExprSnafu { name: "Prom Subquery", @@ -212,7 +212,7 @@ impl PromPlanner { modifier, } = aggr_expr; - let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?; + let input = self.prom_expr_to_plan(expr, session_state).await?; // calculate columns to group by // Need to append time index column into group by columns @@ -242,7 +242,7 @@ impl PromPlanner { ) -> Result { let UnaryExpr { expr } = unary_expr; // Unary Expr in PromQL implys the `-` operator - let input = self.prom_expr_to_plan(*expr.clone(), session_state).await?; + let input = self.prom_expr_to_plan(expr, session_state).await?; self.projection_for_each_field_column(input, |col| { Ok(DfExpr::Negative(Box::new(DfExpr::Column(col.into())))) }) @@ -305,7 +305,7 @@ impl PromPlanner { } // lhs is a literal, rhs is a column (Some(mut expr), None) => { - let input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?; + let input = self.prom_expr_to_plan(rhs, session_state).await?; // check if the literal is a special time expr if let Some(time_expr) = Self::try_build_special_time_expr( lhs, @@ -334,7 +334,7 @@ impl PromPlanner { } // lhs is a column, rhs is a literal (None, Some(mut expr)) => { - let input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?; + let input = self.prom_expr_to_plan(lhs, session_state).await?; // check if the literal is a special time expr if let Some(time_expr) = Self::try_build_special_time_expr( rhs, @@ -363,14 +363,14 @@ impl PromPlanner { } // both are columns. join them on time index (None, None) => { - let left_input = self.prom_expr_to_plan(*lhs.clone(), session_state).await?; + let left_input = self.prom_expr_to_plan(lhs, session_state).await?; let left_field_columns = self.ctx.field_columns.clone(); let mut left_table_ref = self .table_ref() .unwrap_or_else(|_| TableReference::bare("")); let left_context = self.ctx.clone(); - let right_input = self.prom_expr_to_plan(*rhs.clone(), session_state).await?; + let right_input = self.prom_expr_to_plan(rhs, session_state).await?; let right_field_columns = self.ctx.field_columns.clone(); let mut right_table_ref = self .table_ref() @@ -589,7 +589,7 @@ impl PromPlanner { // transform function arguments let args = self.create_function_args(&args.args)?; let input = if let Some(prom_expr) = args.input { - self.prom_expr_to_plan(prom_expr, session_state).await? + self.prom_expr_to_plan(&prom_expr, session_state).await? } else { self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); self.ctx.reset_table_name_and_schema(); @@ -628,9 +628,7 @@ impl PromPlanner { // let promql_parser::parser::ast::Extension { expr } = ext_expr; let expr = &ext_expr.expr; let children = expr.children(); - let plan = self - .prom_expr_to_plan(children[0].clone(), session_state) - .await?; + let plan = self.prom_expr_to_plan(&children[0], session_state).await?; // Wrapper for the explanation/analyze of the existing plan // https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html#method.explain // if `analyze` is true, runs the actual plan and produces @@ -1548,7 +1546,7 @@ impl PromPlanner { } })?; let input = args.args[1].as_ref().clone(); - let input_plan = self.prom_expr_to_plan(input, session_state).await?; + let input_plan = self.prom_expr_to_plan(&input, session_state).await?; if !self.ctx.has_le_tag() { return ColumnNotFoundSnafu { @@ -1633,9 +1631,7 @@ impl PromPlanner { fn_name: SCALAR_FUNCTION } ); - let input = self - .prom_expr_to_plan(args.args[0].as_ref().clone(), session_state) - .await?; + let input = self.prom_expr_to_plan(&args.args[0], session_state).await?; ensure!( self.ctx.field_columns.len() == 1, MultiFieldsNotSupportedSnafu { @@ -2420,7 +2416,7 @@ mod test { 1, ) .await; - let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state()) + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) .await .unwrap(); @@ -2630,10 +2626,9 @@ mod test { 2, ) .await; - let plan = - PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone(), &build_session_state()) - .await - .unwrap(); + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) + .await + .unwrap(); let expected_no_without = String::from( "Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ \n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\ @@ -2661,7 +2656,7 @@ mod test { 2, ) .await; - let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state()) + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) .await .unwrap(); let expected_without = String::from( @@ -2786,7 +2781,7 @@ mod test { 1, ) .await; - let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state()) + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) .await .unwrap(); @@ -2836,7 +2831,7 @@ mod test { 1, ) .await; - let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state()) + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) .await .unwrap(); @@ -3080,13 +3075,10 @@ mod test { 3, ) .await; - let plan = PromPlanner::stmt_to_plan( - table_provider, - eval_stmt.clone(), - &build_session_state(), - ) - .await - .unwrap(); + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) + .await + .unwrap(); let mut fields = plan.schema().field_names(); let mut expected = case.1.into_iter().map(String::from).collect::>(); fields.sort(); @@ -3108,12 +3100,8 @@ mod test { 3, ) .await; - let plan = PromPlanner::stmt_to_plan( - table_provider, - eval_stmt.clone(), - &build_session_state(), - ) - .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await; assert!(plan.is_err(), "case: {:?}", case); } } @@ -3166,7 +3154,7 @@ mod test { .await; let plan = - PromPlanner::stmt_to_plan(table_provider, eval_stmt, &build_session_state()).await; + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()).await; assert!(plan.is_err(), "query: {:?}", query); } } @@ -3224,7 +3212,7 @@ mod test { DummyDecoder::arc(), true, ), - EvalStmt { + &EvalStmt { expr: parser::parse("metrics{tag = \"1\"}").unwrap(), start: UNIX_EPOCH, end: UNIX_EPOCH @@ -3254,7 +3242,7 @@ mod test { DummyDecoder::arc(), true, ), - EvalStmt { + &EvalStmt { expr: parser::parse("avg_over_time(metrics{tag = \"1\"}[5s])").unwrap(), start: UNIX_EPOCH, end: UNIX_EPOCH diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 3e6516a9f1..56cad37ef2 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -634,7 +634,7 @@ mod test { async fn do_query(sql: &str) -> Result { let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); let engine = create_test_engine().await; - engine.planner().plan(stmt, QueryContext::arc()).await + engine.planner().plan(&stmt, QueryContext::arc()).await } async fn query_plan_compare(sql: &str, expected: String) { diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index 1fee632073..2bebdbad58 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -39,7 +39,7 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).unwrap(); let plan = engine .planner() - .plan(stmt, query_ctx.clone()) + .plan(&stmt, query_ctx.clone()) .await .unwrap(); let OutputData::Stream(stream) = engine.execute(plan, query_ctx).await.unwrap().data else { diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 687346dbcb..d46d7afd9d 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -134,7 +134,7 @@ async fn test_query_validate() -> Result<()> { .unwrap(); assert!(engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .is_ok()); @@ -145,7 +145,7 @@ async fn test_query_validate() -> Result<()> { .unwrap(); assert!(engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .is_err()); Ok(()) diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index d7d3b1326a..0398a02f43 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -315,7 +315,7 @@ impl Script for PyScript { let plan = self .query_engine .planner() - .plan(stmt, ctx.query_ctx.clone()) + .plan(&stmt, ctx.query_ctx.clone()) .await .context(DatabaseQuerySnafu)?; let res = self diff --git a/src/script/src/python/ffi_types/copr.rs b/src/script/src/python/ffi_types/copr.rs index b81fc9d576..1a9a88466b 100644 --- a/src/script/src/python/ffi_types/copr.rs +++ b/src/script/src/python/ffi_types/copr.rs @@ -414,7 +414,7 @@ impl PyQueryEngine { let ctx = Arc::new(QueryContextBuilder::default().build()); let plan = engine .planner() - .plan(stmt, ctx.clone()) + .plan(&stmt, ctx.clone()) .await .map_err(|e| e.to_string())?; let res = engine diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index ca098546aa..6a280783e4 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -70,7 +70,7 @@ impl SqlQueryHandler for DummyInstance { let plan = self .query_engine .planner() - .plan(stmt, query_ctx.clone()) + .plan(&stmt, query_ctx.clone()) .await .unwrap(); let output = self.query_engine.execute(plan, query_ctx).await.unwrap(); @@ -98,7 +98,7 @@ impl SqlQueryHandler for DummyInstance { let plan = self .query_engine .planner() - .plan(QueryStatement::Sql(stmt), query_ctx.clone()) + .plan(&QueryStatement::Sql(stmt), query_ctx.clone()) .await .unwrap(); let schema = self.query_engine.describe(plan, query_ctx).await.unwrap(); diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index a4ef632b33..b785f4ad37 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -542,7 +542,7 @@ CREATE TABLE {table_name} ( let plan = instance .frontend() .statement_executor() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .unwrap(); let plan = DFLogicalSubstraitConvertor diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 096be44e0c..fb4a001467 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -236,7 +236,7 @@ mod tests { let plan = instance .frontend() .statement_executor() - .plan(stmt, QueryContext::arc()) + .plan(&stmt, QueryContext::arc()) .await .unwrap(); let plan = DFLogicalSubstraitConvertor