From d8b967408ea82906c6aee0699724ab396d581c04 Mon Sep 17 00:00:00 2001 From: localhost Date: Wed, 17 Sep 2025 20:19:48 +0800 Subject: [PATCH] chore: modify LogExpr AggrFunc (#6948) * chore: modify LogExpr AggrFunc * chore: change AggrFunc range field * chore: remove range from aggrfunc --- src/log-query/src/log_query.rs | 23 +++++++-- src/query/src/log_query/planner.rs | 75 ++++++++++++++++-------------- 2 files changed, 59 insertions(+), 39 deletions(-) diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index a6398dcd4d..c5b71c6efb 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -91,6 +91,21 @@ impl Filters { Filters::Single(filter) } } +/// Aggregation function with optional range and alias. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AggFunc { + /// Function name, e.g., "count", "sum", etc. + pub name: String, + /// Arguments to the function. e.g., column references or literals. LogExpr::NamedIdent("column1".to_string()) + pub args: Vec, + pub alias: Option, +} + +impl AggFunc { + pub fn new(name: String, args: Vec, alias: Option) -> Self { + Self { name, args, alias } + } +} /// Expression to calculate on log after filtering. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -103,13 +118,11 @@ pub enum LogExpr { args: Vec, alias: Option, }, + /// Aggregation function with optional grouping. AggrFunc { - name: String, - args: Vec, - /// Optional range function parameter. Stands for the time range for both step and align. - range: Option, + /// Function name, arguments, and optional alias. + expr: Vec, by: Vec, - alias: Option, }, Decompose { expr: Box, diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index d26b6aaba1..133bebb23c 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -24,7 +24,7 @@ use datafusion_expr::{ }; use datafusion_sql::TableReference; use datatypes::schema::Schema; -use log_query::{BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter}; +use log_query::{AggFunc, BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter}; use snafu::{OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; @@ -312,26 +312,40 @@ impl LogQueryPlanner { fn build_aggr_func( &self, schema: &DFSchema, - fn_name: &str, - args: &[LogExpr], + expr: &[AggFunc], by: &[LogExpr], - ) -> Result<(Expr, Vec)> { - let aggr_fn = self - .session_state - .aggregate_functions() - .get(fn_name) - .context(UnknownAggregateFunctionSnafu { - name: fn_name.to_string(), - })?; - let args = args + ) -> Result<(Vec, Vec)> { + let aggr_expr = expr .iter() - .map(|expr| self.log_expr_to_df_expr(expr, schema)) + .map(|agg_func| { + let AggFunc { + name: fn_name, + args, + alias, + } = agg_func; + let aggr_fn = self + .session_state + .aggregate_functions() + .get(fn_name) + .context(UnknownAggregateFunctionSnafu { + name: fn_name.to_string(), + })?; + let args = args + .iter() + .map(|expr| self.log_expr_to_df_expr(expr, schema)) + .try_collect::>()?; + if let Some(alias) = alias { + Ok(aggr_fn.call(args).alias(alias)) + } else { + Ok(aggr_fn.call(args)) + } + }) .try_collect::>()?; + let group_exprs = by .iter() .map(|expr| self.log_expr_to_df_expr(expr, schema)) .try_collect::>()?; - let aggr_expr = aggr_fn.call(args); Ok((aggr_expr, group_exprs)) } @@ -490,21 +504,12 @@ impl LogQueryPlanner { let mut plan_builder = plan_builder; match expr { - LogExpr::AggrFunc { - name, - args, - by, - range: _range, - alias, - } => { + LogExpr::AggrFunc { expr, by } => { let schema = plan_builder.schema(); - let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?; - if let Some(alias) = alias { - aggr_expr = aggr_expr.alias(alias); - } + let (aggr_expr, group_exprs) = self.build_aggr_func(schema, expr, by)?; plan_builder = plan_builder - .aggregate(group_exprs, [aggr_expr.clone()]) + .aggregate(group_exprs, aggr_expr) .context(DataFusionPlanningSnafu)?; } LogExpr::Filter { filter } => { @@ -917,11 +922,12 @@ mod tests { context: Context::None, columns: vec![], exprs: vec![LogExpr::AggrFunc { - name: "count".to_string(), - args: vec![LogExpr::NamedIdent("message".to_string())], + expr: vec![AggFunc::new( + "count".to_string(), + vec![LogExpr::NamedIdent("message".to_string())], + Some("count_result".to_string()), + )], by: vec![LogExpr::NamedIdent("host".to_string())], - range: None, - alias: Some("count_result".to_string()), }], }; @@ -1036,13 +1042,14 @@ mod tests { alias: Some("2__date_histogram__time_bucket".to_string()), }, LogExpr::AggrFunc { - name: "count".to_string(), - args: vec![LogExpr::PositionalIdent(0)], + expr: vec![AggFunc::new( + "count".to_string(), + vec![LogExpr::PositionalIdent(0)], + Some("count_result".to_string()), + )], by: vec![LogExpr::NamedIdent( "2__date_histogram__time_bucket".to_string(), )], - range: None, - alias: Some("count_result".to_string()), }, ], };