chore: modify LogExpr AggrFunc (#6948)

* chore: modify  LogExpr AggrFunc

* chore: change AggrFunc range field

* chore: remove range from aggrfunc
This commit is contained in:
localhost
2025-09-17 20:19:48 +08:00
committed by GitHub
parent c35407fdce
commit d8b967408e
2 changed files with 59 additions and 39 deletions

View File

@@ -91,6 +91,21 @@ impl Filters {
Filters::Single(filter)
}
}
/// Aggregation function with optional range and alias.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggFunc {
/// Function name, e.g., "count", "sum", etc.
pub name: String,
/// Arguments to the function. e.g., column references or literals. LogExpr::NamedIdent("column1".to_string())
pub args: Vec<LogExpr>,
pub alias: Option<String>,
}
impl AggFunc {
pub fn new(name: String, args: Vec<LogExpr>, alias: Option<String>) -> Self {
Self { name, args, alias }
}
}
/// Expression to calculate on log after filtering.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -103,13 +118,11 @@ pub enum LogExpr {
args: Vec<LogExpr>,
alias: Option<String>,
},
/// Aggregation function with optional grouping.
AggrFunc {
name: String,
args: Vec<LogExpr>,
/// Optional range function parameter. Stands for the time range for both step and align.
range: Option<String>,
/// Function name, arguments, and optional alias.
expr: Vec<AggFunc>,
by: Vec<LogExpr>,
alias: Option<String>,
},
Decompose {
expr: Box<LogExpr>,

View File

@@ -24,7 +24,7 @@ use datafusion_expr::{
};
use datafusion_sql::TableReference;
use datatypes::schema::Schema;
use log_query::{BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
use log_query::{AggFunc, BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
@@ -312,26 +312,40 @@ impl LogQueryPlanner {
fn build_aggr_func(
&self,
schema: &DFSchema,
fn_name: &str,
args: &[LogExpr],
expr: &[AggFunc],
by: &[LogExpr],
) -> Result<(Expr, Vec<Expr>)> {
let aggr_fn = self
.session_state
.aggregate_functions()
.get(fn_name)
.context(UnknownAggregateFunctionSnafu {
name: fn_name.to_string(),
})?;
let args = args
) -> Result<(Vec<Expr>, Vec<Expr>)> {
let aggr_expr = expr
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.map(|agg_func| {
let AggFunc {
name: fn_name,
args,
alias,
} = agg_func;
let aggr_fn = self
.session_state
.aggregate_functions()
.get(fn_name)
.context(UnknownAggregateFunctionSnafu {
name: fn_name.to_string(),
})?;
let args = args
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
if let Some(alias) = alias {
Ok(aggr_fn.call(args).alias(alias))
} else {
Ok(aggr_fn.call(args))
}
})
.try_collect::<Vec<_>>()?;
let group_exprs = by
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let aggr_expr = aggr_fn.call(args);
Ok((aggr_expr, group_exprs))
}
@@ -490,21 +504,12 @@ impl LogQueryPlanner {
let mut plan_builder = plan_builder;
match expr {
LogExpr::AggrFunc {
name,
args,
by,
range: _range,
alias,
} => {
LogExpr::AggrFunc { expr, by } => {
let schema = plan_builder.schema();
let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?;
if let Some(alias) = alias {
aggr_expr = aggr_expr.alias(alias);
}
let (aggr_expr, group_exprs) = self.build_aggr_func(schema, expr, by)?;
plan_builder = plan_builder
.aggregate(group_exprs, [aggr_expr.clone()])
.aggregate(group_exprs, aggr_expr)
.context(DataFusionPlanningSnafu)?;
}
LogExpr::Filter { filter } => {
@@ -917,11 +922,12 @@ mod tests {
context: Context::None,
columns: vec![],
exprs: vec![LogExpr::AggrFunc {
name: "count".to_string(),
args: vec![LogExpr::NamedIdent("message".to_string())],
expr: vec![AggFunc::new(
"count".to_string(),
vec![LogExpr::NamedIdent("message".to_string())],
Some("count_result".to_string()),
)],
by: vec![LogExpr::NamedIdent("host".to_string())],
range: None,
alias: Some("count_result".to_string()),
}],
};
@@ -1036,13 +1042,14 @@ mod tests {
alias: Some("2__date_histogram__time_bucket".to_string()),
},
LogExpr::AggrFunc {
name: "count".to_string(),
args: vec![LogExpr::PositionalIdent(0)],
expr: vec![AggFunc::new(
"count".to_string(),
vec![LogExpr::PositionalIdent(0)],
Some("count_result".to_string()),
)],
by: vec![LogExpr::NamedIdent(
"2__date_histogram__time_bucket".to_string(),
)],
range: None,
alias: Some("count_result".to_string()),
},
],
};