From 6c90f252994bd5b48866c4ab707ac22ba92155de Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 3 Mar 2025 10:52:13 -0800 Subject: [PATCH] feat(log-query): implement compound filter and alias expr (#5596) * refine alias behavior Signed-off-by: Ruihang Xia * implement compound Signed-off-by: Ruihang Xia * support gt, lt, and in Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/log-query/src/log_query.rs | 11 + src/query/src/log_query/planner.rs | 540 +++++++++++++++++++++++------ 2 files changed, 436 insertions(+), 115 deletions(-) diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index 26a715200e..f981d8b65c 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -63,6 +63,7 @@ pub enum LogExpr { ScalarFunc { name: String, args: Vec, + alias: Option, }, AggrFunc { name: String, @@ -70,6 +71,7 @@ pub enum LogExpr { /// Optional range function parameter. Stands for the time range for both step and align. range: Option, by: Vec, + alias: Option, }, Decompose { expr: Box, @@ -316,6 +318,15 @@ pub enum ContentFilter { start_inclusive: bool, end_inclusive: bool, }, + GreatThan { + value: String, + inclusive: bool, + }, + LessThan { + value: String, + inclusive: bool, + }, + In(Vec), // TODO(ruihang): arithmetic operations // Compound filters diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index 60918d852f..8b285f0e35 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -108,50 +108,7 @@ impl LogQueryPlanner { // Apply log expressions for expr in &query.exprs { - match expr { - LogExpr::AggrFunc { - name, - args, - by, - range: _range, - } => { - let schema = plan_builder.schema(); - let (group_expr, aggr_exprs) = self.build_aggr_func(schema, name, args, by)?; - plan_builder = plan_builder - .aggregate([group_expr], aggr_exprs) - .context(DataFusionPlanningSnafu)?; - } - LogExpr::Filter { expr, filter } => { - let schema = plan_builder.schema(); - let expr = self.log_expr_to_df_expr(expr, schema)?; - let col_name = expr.schema_name().to_string(); - let filter = self.build_column_filter(&ColumnFilters { - column_name: col_name, - filters: vec![filter.clone()], - })?; - if let Some(filter) = filter { - plan_builder = plan_builder - .filter(filter) - .context(DataFusionPlanningSnafu)?; - } - } - LogExpr::ScalarFunc { name, args } => { - let schema = plan_builder.schema(); - let expr = self.build_scalar_func(schema, name, args)?; - plan_builder = plan_builder - .project([expr]) - .context(DataFusionPlanningSnafu)?; - } - LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => { - // nothing to do - } - _ => { - UnimplementedSnafu { - feature: "log expression", - } - .fail()?; - } - } + plan_builder = self.process_log_expr(plan_builder, expr)?; } // Build the final plan @@ -187,73 +144,118 @@ impl LogQueryPlanner { return Ok(None); } - let exprs = column_filter - .filters + self.build_content_filters(&column_filter.column_name, &column_filter.filters) + } + + /// Builds filter expressions from content filters for a specific column + fn build_content_filters( + &self, + column_name: &str, + filters: &[log_query::ContentFilter], + ) -> Result> { + if filters.is_empty() { + return Ok(None); + } + + let exprs = filters .iter() - .map(|filter| match filter { - log_query::ContentFilter::Exact(pattern) => Ok(col(&column_filter.column_name) - .like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern)))))), - log_query::ContentFilter::Prefix(pattern) => Ok(col(&column_filter.column_name) - .like(lit(ScalarValue::Utf8(Some(format!( - "{}%", - escape_like_pattern(pattern) - )))))), - log_query::ContentFilter::Postfix(pattern) => Ok(col(&column_filter.column_name) - .like(lit(ScalarValue::Utf8(Some(format!( - "%{}", - escape_like_pattern(pattern) - )))))), - log_query::ContentFilter::Contains(pattern) => Ok(col(&column_filter.column_name) - .like(lit(ScalarValue::Utf8(Some(format!( - "%{}%", - escape_like_pattern(pattern) - )))))), - log_query::ContentFilter::Regex(..) => Err::( - UnimplementedSnafu { - feature: "regex filter", - } - .build(), - ), - log_query::ContentFilter::Exist => { - Ok(col(&column_filter.column_name).is_not_null()) - } - log_query::ContentFilter::Between { - start, - end, - start_inclusive, - end_inclusive, - } => { - let left = if *start_inclusive { - Expr::gt_eq - } else { - Expr::gt - }; - let right = if *end_inclusive { - Expr::lt_eq - } else { - Expr::lt - }; - Ok(left( - col(&column_filter.column_name), - lit(ScalarValue::Utf8(Some(escape_like_pattern(start)))), - ) - .and(right( - col(&column_filter.column_name), - lit(ScalarValue::Utf8(Some(escape_like_pattern(end)))), - ))) - } - log_query::ContentFilter::Compound(..) => Err::( - UnimplementedSnafu { - feature: "compound filter", - } - .build(), - ), - }) + .map(|filter| self.build_content_filter(column_name, filter)) .try_collect::>()?; Ok(conjunction(exprs)) } + /// Builds a single content filter expression + #[allow(clippy::only_used_in_recursion)] + fn build_content_filter( + &self, + column_name: &str, + filter: &log_query::ContentFilter, + ) -> Result { + match filter { + log_query::ContentFilter::Exact(pattern) => { + Ok(col(column_name) + .like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern)))))) + } + log_query::ContentFilter::Prefix(pattern) => Ok(col(column_name).like(lit( + ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(pattern)))), + ))), + log_query::ContentFilter::Postfix(pattern) => Ok(col(column_name).like(lit( + ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(pattern)))), + ))), + log_query::ContentFilter::Contains(pattern) => Ok(col(column_name).like(lit( + ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(pattern)))), + ))), + log_query::ContentFilter::Regex(..) => Err::( + UnimplementedSnafu { + feature: "regex filter", + } + .build(), + ), + log_query::ContentFilter::Exist => Ok(col(column_name).is_not_null()), + log_query::ContentFilter::Between { + start, + end, + start_inclusive, + end_inclusive, + } => { + let left = if *start_inclusive { + Expr::gt_eq + } else { + Expr::gt + }; + let right = if *end_inclusive { + Expr::lt_eq + } else { + Expr::lt + }; + Ok(left( + col(column_name), + lit(ScalarValue::Utf8(Some(escape_like_pattern(start)))), + ) + .and(right( + col(column_name), + lit(ScalarValue::Utf8(Some(escape_like_pattern(end)))), + ))) + } + log_query::ContentFilter::GreatThan { value, inclusive } => { + let expr = if *inclusive { Expr::gt_eq } else { Expr::gt }; + Ok(expr( + col(column_name), + lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))), + )) + } + log_query::ContentFilter::LessThan { value, inclusive } => { + let expr = if *inclusive { Expr::lt_eq } else { Expr::lt }; + Ok(expr( + col(column_name), + lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))), + )) + } + log_query::ContentFilter::In(values) => { + let list = values + .iter() + .map(|value| lit(ScalarValue::Utf8(Some(escape_like_pattern(value))))) + .collect(); + Ok(col(column_name).in_list(list, false)) + } + log_query::ContentFilter::Compound(filters, op) => { + let exprs = filters + .iter() + .map(|filter| self.build_content_filter(column_name, filter)) + .try_collect::>()?; + + match op { + log_query::BinaryOperator::And => Ok(conjunction(exprs).unwrap()), + log_query::BinaryOperator::Or => { + // Build a disjunction (OR) of expressions + Ok(exprs.into_iter().reduce(|a, b| a.or(b)).unwrap()) + } + } + } + } + } + fn build_aggr_func( &self, schema: &DFSchema, @@ -270,18 +272,22 @@ impl LogQueryPlanner { })?; let args = args .iter() - .map(|expr| self.log_expr_to_df_expr(expr, schema)) + .map(|expr| self.log_expr_to_column_expr(expr, schema)) .try_collect::>()?; let group_exprs = by .iter() - .map(|expr| self.log_expr_to_df_expr(expr, schema)) + .map(|expr| self.log_expr_to_column_expr(expr, schema)) .try_collect::>()?; let aggr_expr = aggr_fn.call(args); Ok((aggr_expr, group_exprs)) } - fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result { + /// Converts a log expression to a column expression. + /// + /// A column expression here can be a column identifier, a positional identifier, or a literal. + /// They don't rely on the context of the query or other columns. + fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result { match expr { LogExpr::NamedIdent(name) => Ok(col(name)), LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())), @@ -297,7 +303,7 @@ impl LogQueryPlanner { fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result { let args = args .iter() - .map(|expr| self.log_expr_to_df_expr(expr, schema)) + .map(|expr| self.log_expr_to_column_expr(expr, schema)) .try_collect::>()?; let func = self.session_state.scalar_functions().get(name).context( UnknownScalarFunctionSnafu { @@ -308,6 +314,74 @@ impl LogQueryPlanner { Ok(expr) } + + /// Process LogExpr recursively. + /// + /// Return the [`LogicalPlanBuilder`] after modification and the resulting expression's names. + fn process_log_expr( + &self, + plan_builder: LogicalPlanBuilder, + expr: &LogExpr, + ) -> Result { + let mut plan_builder = plan_builder; + + match expr { + LogExpr::AggrFunc { + name, + args, + by, + range: _range, + alias, + } => { + let schema = plan_builder.schema(); + let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?; + if let Some(alias) = alias { + aggr_expr = aggr_expr.alias(alias); + } + + plan_builder = plan_builder + .aggregate(group_exprs, [aggr_expr.clone()]) + .context(DataFusionPlanningSnafu)?; + } + LogExpr::Filter { expr, filter } => { + let schema = plan_builder.schema(); + let expr = self.log_expr_to_column_expr(expr, schema)?; + + let col_name = expr.schema_name().to_string(); + let filter_expr = self.build_content_filter(&col_name, filter)?; + plan_builder = plan_builder + .filter(filter_expr) + .context(DataFusionPlanningSnafu)?; + } + LogExpr::ScalarFunc { name, args, alias } => { + let schema = plan_builder.schema(); + let mut expr = self.build_scalar_func(schema, name, args)?; + if let Some(alias) = alias { + expr = expr.alias(alias); + } + plan_builder = plan_builder + .project([expr.clone()]) + .context(DataFusionPlanningSnafu)?; + } + LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => { + // nothing to do, return empty vec. + } + LogExpr::Alias { expr, alias } => { + let expr = self.log_expr_to_column_expr(expr, plan_builder.schema())?; + let aliased_expr = expr.alias(alias); + plan_builder = plan_builder + .project([aliased_expr.clone()]) + .context(DataFusionPlanningSnafu)?; + } + _ => { + UnimplementedSnafu { + feature: "log expression", + } + .fail()?; + } + } + Ok(plan_builder) + } } #[cfg(test)] @@ -321,7 +395,7 @@ mod tests { use datafusion::execution::SessionStateBuilder; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaRef}; - use log_query::{ContentFilter, Context, Limit}; + use log_query::{BinaryOperator, ContentFilter, Context, Limit}; use session::context::QueryContext; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::table_name::TableName; @@ -615,14 +689,15 @@ mod tests { args: vec![LogExpr::NamedIdent("message".to_string())], by: vec![LogExpr::NamedIdent("host".to_string())], range: None, + alias: Some("count_result".to_string()), }], }; let plan = planner.query_to_plan(log_query).await.unwrap(); - let expected = "Aggregate: groupBy=[[count(greptime.public.test_table.message)]], aggr=[[greptime.public.test_table.host]] [count(greptime.public.test_table.message):Int64, host:Utf8;N]\ - \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ - \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ - \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\ +\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -654,11 +729,12 @@ mod tests { LogExpr::NamedIdent("timestamp".to_string()), LogExpr::Literal("day".to_string()), ], + alias: Some("time_bucket".to_string()), }], }; let plan = planner.query_to_plan(log_query).await.unwrap(); - let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) [date_trunc(greptime.public.test_table.timestamp,Utf8(\"day\")):Timestamp(Nanosecond, None);N]\ + let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\ \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; @@ -693,4 +769,238 @@ mod tests { assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); } + + #[tokio::test] + async fn test_query_to_plan_with_date_histogram() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let mut planner = LogQueryPlanner::new(table_provider, session_state); + + let log_query = LogQuery { + table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), + time_filter: TimeFilter { + start: Some("2021-01-01T00:00:00Z".to_string()), + end: Some("2021-01-02T00:00:00Z".to_string()), + span: None, + }, + filters: vec![], + limit: Limit { + skip: Some(0), + fetch: None, + }, + context: Context::None, + columns: vec![], + exprs: vec![ + LogExpr::ScalarFunc { + name: "date_bin".to_string(), + args: vec![ + LogExpr::Literal("30 seconds".to_string()), + LogExpr::NamedIdent("timestamp".to_string()), + ], + alias: Some("2__date_histogram__time_bucket".to_string()), + }, + LogExpr::AggrFunc { + name: "count".to_string(), + args: vec![LogExpr::PositionalIdent(0)], + by: vec![LogExpr::NamedIdent( + "2__date_histogram__time_bucket".to_string(), + )], + range: None, + alias: Some("count_result".to_string()), + }, + ], + }; + + let plan = planner.query_to_plan(log_query).await.unwrap(); + let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\ +\n Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\ +\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn test_build_compound_filter() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + + // Test AND compound + let filter = ContentFilter::Compound( + vec![ + ContentFilter::Contains("error".to_string()), + ContentFilter::Prefix("WARN".to_string()), + ], + BinaryOperator::And, + ); + let expr = planner.build_content_filter("message", &filter).unwrap(); + + let expected_expr = col("message") + .like(lit(ScalarValue::Utf8(Some("%error%".to_string())))) + .and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + + // Test OR compound + let filter = ContentFilter::Compound( + vec![ + ContentFilter::Contains("error".to_string()), + ContentFilter::Prefix("WARN".to_string()), + ], + BinaryOperator::Or, + ); + let expr = planner.build_content_filter("message", &filter).unwrap(); + + let expected_expr = col("message") + .like(lit(ScalarValue::Utf8(Some("%error%".to_string())))) + .or(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + + // Test nested compound + let filter = ContentFilter::Compound( + vec![ + ContentFilter::Contains("error".to_string()), + ContentFilter::Compound( + vec![ + ContentFilter::Prefix("WARN".to_string()), + ContentFilter::Exact("DEBUG".to_string()), + ], + BinaryOperator::Or, + ), + ], + BinaryOperator::And, + ); + let expr = planner.build_content_filter("message", &filter).unwrap(); + + let expected_nested = col("message") + .like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))) + .or(col("message").like(lit(ScalarValue::Utf8(Some("DEBUG".to_string()))))); + let expected_expr = col("message") + .like(lit(ScalarValue::Utf8(Some("%error%".to_string())))) + .and(expected_nested); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_build_great_than_filter() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + + // Test GreatThan with inclusive=true + let column_filter = ColumnFilters { + column_name: "message".to_string(), + filters: vec![ContentFilter::GreatThan { + value: "error".to_string(), + inclusive: true, + }], + }; + + let expr_option = planner.build_column_filter(&column_filter).unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + let expected_expr = col("message").gt_eq(lit(ScalarValue::Utf8(Some("error".to_string())))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + + // Test GreatThan with inclusive=false + let column_filter = ColumnFilters { + column_name: "message".to_string(), + filters: vec![ContentFilter::GreatThan { + value: "error".to_string(), + inclusive: false, + }], + }; + + let expr_option = planner.build_column_filter(&column_filter).unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("error".to_string())))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_build_less_than_filter() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + + // Test LessThan with inclusive=true + let column_filter = ColumnFilters { + column_name: "message".to_string(), + filters: vec![ContentFilter::LessThan { + value: "error".to_string(), + inclusive: true, + }], + }; + + let expr_option = planner.build_column_filter(&column_filter).unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + let expected_expr = col("message").lt_eq(lit(ScalarValue::Utf8(Some("error".to_string())))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + + // Test LessThan with inclusive=false + let column_filter = ColumnFilters { + column_name: "message".to_string(), + filters: vec![ContentFilter::LessThan { + value: "error".to_string(), + inclusive: false, + }], + }; + + let expr_option = planner.build_column_filter(&column_filter).unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + let expected_expr = col("message").lt(lit(ScalarValue::Utf8(Some("error".to_string())))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_build_in_filter() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + + // Test In filter with multiple values + let column_filter = ColumnFilters { + column_name: "message".to_string(), + filters: vec![ContentFilter::In(vec![ + "error".to_string(), + "warning".to_string(), + "info".to_string(), + ])], + }; + + let expr_option = planner.build_column_filter(&column_filter).unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + let expected_expr = col("message").in_list( + vec![ + lit(ScalarValue::Utf8(Some("error".to_string()))), + lit(ScalarValue::Utf8(Some("warning".to_string()))), + lit(ScalarValue::Utf8(Some("info".to_string()))), + ], + false, + ); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } }