From 2e9737c01d1dd69129e24869dc130334bdee081a Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Tue, 5 Nov 2024 16:49:05 +0800 Subject: [PATCH] refactor: pass `LogicalPlan` to promql execution interceptor (#4937) --- src/flow/src/server.rs | 2 -- src/frontend/src/instance.rs | 20 ++++++++++++++++++-- src/frontend/src/instance/builder.rs | 2 +- src/operator/src/statement.rs | 11 ++++------- src/servers/src/interceptor.rs | 4 +++- src/servers/tests/interceptor.rs | 4 +++- tests-integration/Cargo.toml | 2 +- 7 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index ff80bb64fe..c9d75a9299 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -37,7 +37,6 @@ use operator::delete::Deleter; use operator::insert::Inserter; use operator::statement::StatementExecutor; use partition::manager::PartitionRuleManager; -use query::stats::StatementStatistics; use query::{QueryEngine, QueryEngineFactory}; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use servers::server::Server; @@ -476,7 +475,6 @@ impl FrontendInvoker { layered_cache_registry.clone(), inserter.clone(), table_route_cache, - StatementStatistics::default(), )); let invoker = FrontendInvoker::new(inserter, deleter, statement_executor); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index bd9f8959fe..17102b51cd 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -51,6 +51,7 @@ use query::metrics::OnDone; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use query::query_engine::DescribeResult; +use query::stats::StatementStatistics; use query::QueryEngineRef; use raft_engine::{Config, ReadableSize, RecoveryMode}; use servers::error as server_error; @@ -122,6 +123,7 @@ pub struct Instance { deleter: DeleterRef, export_metrics_task: Option, table_metadata_manager: TableMetadataManagerRef, + stats: StatementStatistics, } impl Instance { @@ -228,6 +230,10 @@ impl Instance { let query_interceptor = self.plugins.get::>(); let query_interceptor = query_interceptor.as_ref(); + let _slow_query_timer = self + .stats + .start_slow_query_timer(QueryStatement::Sql(stmt.clone())); + let output = match stmt { Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { let stmt = QueryStatement::Sql(stmt); @@ -412,7 +418,6 @@ impl PrometheusHandler for Instance { let interceptor = self .plugins .get::>(); - interceptor.pre_execute(query, query_ctx.clone())?; self.plugins .get::() @@ -426,9 +431,20 @@ impl PrometheusHandler for Instance { } })?; + let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone()); + + let plan = self + .statement_executor + .plan(&stmt, query_ctx.clone()) + .await + .map_err(BoxedError::new) + .context(ExecuteQuerySnafu)?; + + interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?; + let output = self .statement_executor - .execute_stmt(stmt, query_ctx.clone()) + .exec_plan(plan, query_ctx.clone()) .await .map_err(BoxedError::new) .context(ExecuteQuerySnafu)?; diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index a9513121d8..f24141d8ba 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -185,7 +185,6 @@ impl FrontendBuilder { local_cache_invalidator, inserter.clone(), table_route_cache, - self.stats, )); let pipeline_operator = Arc::new(PipelineOperator::new( @@ -211,6 +210,7 @@ impl FrontendBuilder { deleter, export_metrics_task: None, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), + stats: self.stats, }) } } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index b9063781fe..53b1eaf6ea 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -45,7 +45,6 @@ use common_time::Timestamp; use datafusion_expr::LogicalPlan; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use query::parser::QueryStatement; -use query::stats::StatementStatistics; use query::QueryEngineRef; use session::context::{Channel, QueryContextRef}; use session::table_name::table_idents_to_full_name; @@ -81,13 +80,11 @@ pub struct StatementExecutor { partition_manager: PartitionRuleManagerRef, cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, - stats: StatementStatistics, } pub type StatementExecutorRef = Arc; impl StatementExecutor { - #[allow(clippy::too_many_arguments)] pub fn new( catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, @@ -96,7 +93,6 @@ impl StatementExecutor { cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, table_route_cache: TableRouteCacheRef, - stats: StatementStatistics, ) -> Self { Self { catalog_manager, @@ -108,23 +104,22 @@ impl StatementExecutor { partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)), cache_invalidator, inserter, - stats, } } - #[tracing::instrument(skip_all)] + #[cfg(feature = "testing")] pub async fn execute_stmt( &self, stmt: QueryStatement, query_ctx: QueryContextRef, ) -> Result { - let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone()); match stmt { QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await, QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await, } } + #[tracing::instrument(skip_all)] pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { match stmt { Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { @@ -361,6 +356,7 @@ impl StatementExecutor { Ok(Output::new_with_affected_rows(0)) } + #[tracing::instrument(skip_all)] pub async fn plan( &self, stmt: &QueryStatement, @@ -374,6 +370,7 @@ impl StatementExecutor { } /// Execute [`LogicalPlan`] directly. + #[tracing::instrument(skip_all)] pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { self.query_engine .execute(plan, query_ctx) diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index 76ed4728e0..241bbe1d0e 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -201,6 +201,7 @@ pub trait PromQueryInterceptor { fn pre_execute( &self, _query: &PromQuery, + _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> Result<(), Self::Error> { Ok(()) @@ -229,10 +230,11 @@ where fn pre_execute( &self, query: &PromQuery, + plan: Option<&LogicalPlan>, query_ctx: QueryContextRef, ) -> Result<(), Self::Error> { if let Some(this) = self { - this.pre_execute(query, query_ctx) + this.pre_execute(query, plan, query_ctx) } else { Ok(()) } diff --git a/src/servers/tests/interceptor.rs b/src/servers/tests/interceptor.rs index ca5837a993..7712c90332 100644 --- a/src/servers/tests/interceptor.rs +++ b/src/servers/tests/interceptor.rs @@ -18,6 +18,7 @@ use api::v1::greptime_request::Request; use api::v1::{InsertRequest, InsertRequests}; use client::OutputData; use common_query::Output; +use datafusion_expr::LogicalPlan; use query::parser::PromQuery; use servers::error::{self, InternalSnafu, NotSupportedSnafu, Result}; use servers::interceptor::{GrpcQueryInterceptor, PromQueryInterceptor, SqlQueryInterceptor}; @@ -89,6 +90,7 @@ impl PromQueryInterceptor for NoopInterceptor { fn pre_execute( &self, query: &PromQuery, + _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> std::result::Result<(), Self::Error> { match query.query.as_str() { @@ -119,7 +121,7 @@ fn test_prom_interceptor() { ..Default::default() }; - let fail = PromQueryInterceptor::pre_execute(&di, &query, ctx.clone()); + let fail = PromQueryInterceptor::pre_execute(&di, &query, None, ctx.clone()); assert!(fail.is_err()); let output = Output::new_with_affected_rows(1); diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 80fb76c55a..38c3c01557 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -52,7 +52,7 @@ mysql_async = { version = "0.33", default-features = false, features = [ "default-rustls", ] } object-store.workspace = true -operator.workspace = true +operator = { workspace = true, features = ["testing"] } prost.workspace = true query.workspace = true rstest.workspace = true