feat: update some interceptor to carry more information (#8090)

* feat: provide query information for post_execute interceptor

* test: update for tests-integration

* feat: make interceptor available to prometheus serialization

* feat: revert post_execute change

* feat: add expr to pre_execute and remove serialization interceptor

* chore: lint
This commit is contained in:
Ning Sun
2026-05-24 20:14:15 -07:00
committed by GitHub
parent e1e75b3ffe
commit 5401cc2e26
4 changed files with 66 additions and 17 deletions

View File

@@ -303,7 +303,7 @@ impl Instance {
.await
}
_ => {
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
query_interceptor.pre_execute(Some(&stmt), None, query_ctx.clone())?;
self.statement_executor
.execute_sql(stmt, query_ctx)
.await
@@ -326,7 +326,7 @@ impl Instance {
let QueryStatement::Sql(stmt) = stmt else {
unreachable!()
};
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
query_interceptor.pre_execute(Some(&stmt), Some(&plan), query_ctx.clone())?;
self.statement_executor
.exec_plan(plan, query_ctx.clone())
@@ -344,7 +344,11 @@ impl Instance {
.statement_executor
.plan_tql(tql.clone(), query_ctx)
.await?;
query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
query_interceptor.pre_execute(
Some(&Statement::Tql(tql)),
Some(&plan),
query_ctx.clone(),
)?;
self.statement_executor
.exec_plan(plan, query_ctx.clone())
.await
@@ -649,9 +653,7 @@ impl Instance {
let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor_opt.as_ref();
if let Some(ref s) = stmt {
query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?;
}
query_interceptor.pre_execute(stmt.as_ref(), Some(&plan), query_ctx.clone())?;
let query = stmt
.as_ref()
@@ -880,7 +882,11 @@ impl PrometheusHandler for Instance {
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;
interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
let QueryStatement::Promql(eval_stmt, _) = &stmt else {
unreachable!("query is parsed from promql");
};
interceptor.pre_execute(query, &eval_stmt.expr, Some(&plan), query_ctx.clone())?;
// Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
@@ -892,7 +898,7 @@ impl PrometheusHandler for Instance {
}
.fail();
};
let query = query_statement.to_string();
let raw_query = query_statement.to_string();
let slow_query_timer = self
.slow_query_options
@@ -912,7 +918,7 @@ impl PrometheusHandler for Instance {
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
query,
raw_query,
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
slow_query_timer,
@@ -1394,11 +1400,11 @@ mod tests {
fn pre_execute(
&self,
statement: &Statement,
statement: Option<&Statement>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
let Statement::Insert(insert) = statement else {
let Some(Statement::Insert(insert)) = statement else {
return Ok(());
};
if !insert.has_non_values_query_source() {

View File

@@ -23,6 +23,7 @@ use common_error::ext::ErrorExt;
use common_query::Output;
use datafusion_expr::LogicalPlan;
use log_query::LogQuery;
use promql_parser::parser::Expr;
use query::parser::PromQuery;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
@@ -58,7 +59,7 @@ pub trait SqlQueryInterceptor {
/// Called before sql is actually executed. This hook is not called at the moment.
fn pre_execute(
&self,
_statement: &Statement,
_statement: Option<&Statement>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
@@ -111,7 +112,7 @@ where
fn pre_execute(
&self,
statement: &Statement,
statement: Option<&Statement>,
plan: Option<&LogicalPlan>,
query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
@@ -224,6 +225,7 @@ pub trait PromQueryInterceptor {
fn pre_execute(
&self,
_query: &PromQuery,
_expr: &Expr,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
@@ -253,11 +255,45 @@ where
fn pre_execute(
&self,
query: &PromQuery,
expr: &Expr,
plan: Option<&LogicalPlan>,
query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(query, plan, query_ctx)
this.pre_execute(query, expr, plan, query_ctx)
} else {
Ok(())
}
}
fn post_execute(
&self,
output: Output,
query_ctx: QueryContextRef,
) -> Result<Output, Self::Error> {
if let Some(this) = self {
this.post_execute(output, query_ctx)
} else {
Ok(output)
}
}
}
impl<E> PromQueryInterceptor for Option<&PromQueryInterceptorRef<E>>
where
E: ErrorExt,
{
type Error = E;
fn pre_execute(
&self,
query: &PromQuery,
expr: &Expr,
plan: Option<&LogicalPlan>,
query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(query, expr, plan, query_ctx)
} else {
Ok(())
}

View File

@@ -90,6 +90,7 @@ impl PromQueryInterceptor for NoopInterceptor {
fn pre_execute(
&self,
query: &PromQuery,
_expr: &promql_parser::parser::Expr,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> std::result::Result<(), Self::Error> {
@@ -121,7 +122,13 @@ fn test_prom_interceptor() {
..Default::default()
};
let fail = PromQueryInterceptor::pre_execute(&di, &query, None, ctx.clone());
let fail = PromQueryInterceptor::pre_execute(
&di,
&query,
&promql_parser::parser::parse(&query.query).unwrap(),
None,
ctx.clone(),
);
assert!(fail.is_err());
let output = Output::new_with_affected_rows(1);

View File

@@ -323,7 +323,7 @@ mod tests {
fn pre_execute(
&self,
_statement: &Statement,
_statement: Option<&Statement>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
@@ -396,7 +396,7 @@ mod tests {
fn pre_execute(
&self,
_statement: &Statement,
_statement: Option<&Statement>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {