From 5401cc2e26fde5b8254ce9620954a89d7041e75e Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sun, 24 May 2026 20:14:15 -0700 Subject: [PATCH] feat: update some interceptor to carry more information (#8090) * feat: provide query information for post_execute interceptor * test: update for tests-integration * feat: make interceptor available to prometheus serialization * feat: revert post_execute change * feat: add expr to pre_execute and remove serialization interceptor * chore: lint --- src/frontend/src/instance.rs | 28 +++++++++++++-------- src/servers/src/interceptor.rs | 42 ++++++++++++++++++++++++++++--- src/servers/tests/interceptor.rs | 9 ++++++- tests-integration/src/instance.rs | 4 +-- 4 files changed, 66 insertions(+), 17 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8cc6195e5f..e85bc28f9a 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -303,7 +303,7 @@ impl Instance { .await } _ => { - query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?; + query_interceptor.pre_execute(Some(&stmt), None, query_ctx.clone())?; self.statement_executor .execute_sql(stmt, query_ctx) .await @@ -326,7 +326,7 @@ impl Instance { let QueryStatement::Sql(stmt) = stmt else { unreachable!() }; - query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?; + query_interceptor.pre_execute(Some(&stmt), Some(&plan), query_ctx.clone())?; self.statement_executor .exec_plan(plan, query_ctx.clone()) @@ -344,7 +344,11 @@ impl Instance { .statement_executor .plan_tql(tql.clone(), query_ctx) .await?; - query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?; + query_interceptor.pre_execute( + Some(&Statement::Tql(tql)), + Some(&plan), + query_ctx.clone(), + )?; self.statement_executor .exec_plan(plan, query_ctx.clone()) .await @@ -649,9 +653,7 @@ impl Instance { let query_interceptor_opt = self.plugins.get::>(); let query_interceptor = query_interceptor_opt.as_ref(); - if let Some(ref s) = stmt { - query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?; - } + query_interceptor.pre_execute(stmt.as_ref(), Some(&plan), query_ctx.clone())?; let query = stmt .as_ref() @@ -880,7 +882,11 @@ impl PrometheusHandler for Instance { .map_err(BoxedError::new) .context(ExecuteQuerySnafu)?; - interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?; + let QueryStatement::Promql(eval_stmt, _) = &stmt else { + unreachable!("query is parsed from promql"); + }; + + interceptor.pre_execute(query, &eval_stmt.expr, Some(&plan), query_ctx.clone())?; // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement. let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt { @@ -892,7 +898,7 @@ impl PrometheusHandler for Instance { } .fail(); }; - let query = query_statement.to_string(); + let raw_query = query_statement.to_string(); let slow_query_timer = self .slow_query_options @@ -912,7 +918,7 @@ impl PrometheusHandler for Instance { let ticket = self.process_manager.register_query( query_ctx.current_catalog().to_string(), vec![query_ctx.current_schema()], - query, + raw_query, query_ctx.conn_info().to_string(), Some(query_ctx.process_id()), slow_query_timer, @@ -1394,11 +1400,11 @@ mod tests { fn pre_execute( &self, - statement: &Statement, + statement: Option<&Statement>, _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> Result<()> { - let Statement::Insert(insert) = statement else { + let Some(Statement::Insert(insert)) = statement else { return Ok(()); }; if !insert.has_non_values_query_source() { diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index 7425c228f5..30bc5a2aa0 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -23,6 +23,7 @@ use common_error::ext::ErrorExt; use common_query::Output; use datafusion_expr::LogicalPlan; use log_query::LogQuery; +use promql_parser::parser::Expr; use query::parser::PromQuery; use session::context::QueryContextRef; use sql::statements::statement::Statement; @@ -58,7 +59,7 @@ pub trait SqlQueryInterceptor { /// Called before sql is actually executed. This hook is not called at the moment. fn pre_execute( &self, - _statement: &Statement, + _statement: Option<&Statement>, _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> Result<(), Self::Error> { @@ -111,7 +112,7 @@ where fn pre_execute( &self, - statement: &Statement, + statement: Option<&Statement>, plan: Option<&LogicalPlan>, query_ctx: QueryContextRef, ) -> Result<(), Self::Error> { @@ -224,6 +225,7 @@ pub trait PromQueryInterceptor { fn pre_execute( &self, _query: &PromQuery, + _expr: &Expr, _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> Result<(), Self::Error> { @@ -253,11 +255,45 @@ where fn pre_execute( &self, query: &PromQuery, + expr: &Expr, plan: Option<&LogicalPlan>, query_ctx: QueryContextRef, ) -> Result<(), Self::Error> { if let Some(this) = self { - this.pre_execute(query, plan, query_ctx) + this.pre_execute(query, expr, plan, query_ctx) + } else { + Ok(()) + } + } + + fn post_execute( + &self, + output: Output, + query_ctx: QueryContextRef, + ) -> Result { + if let Some(this) = self { + this.post_execute(output, query_ctx) + } else { + Ok(output) + } + } +} + +impl PromQueryInterceptor for Option<&PromQueryInterceptorRef> +where + E: ErrorExt, +{ + type Error = E; + + fn pre_execute( + &self, + query: &PromQuery, + expr: &Expr, + plan: Option<&LogicalPlan>, + query_ctx: QueryContextRef, + ) -> Result<(), Self::Error> { + if let Some(this) = self { + this.pre_execute(query, expr, plan, query_ctx) } else { Ok(()) } diff --git a/src/servers/tests/interceptor.rs b/src/servers/tests/interceptor.rs index 7712c90332..fd516a0dfe 100644 --- a/src/servers/tests/interceptor.rs +++ b/src/servers/tests/interceptor.rs @@ -90,6 +90,7 @@ impl PromQueryInterceptor for NoopInterceptor { fn pre_execute( &self, query: &PromQuery, + _expr: &promql_parser::parser::Expr, _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> std::result::Result<(), Self::Error> { @@ -121,7 +122,13 @@ fn test_prom_interceptor() { ..Default::default() }; - let fail = PromQueryInterceptor::pre_execute(&di, &query, None, ctx.clone()); + let fail = PromQueryInterceptor::pre_execute( + &di, + &query, + &promql_parser::parser::parse(&query.query).unwrap(), + None, + ctx.clone(), + ); assert!(fail.is_err()); let output = Output::new_with_affected_rows(1); diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index b11d5c32c6..17435a3b7a 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -323,7 +323,7 @@ mod tests { fn pre_execute( &self, - _statement: &Statement, + _statement: Option<&Statement>, _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> Result<()> { @@ -396,7 +396,7 @@ mod tests { fn pre_execute( &self, - _statement: &Statement, + _statement: Option<&Statement>, _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> Result<()> {