diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index f981d8b65c..9fc9f7733d 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -42,7 +42,7 @@ pub struct LogQuery { // Filters /// Conjunction of filters to apply for the raw logs. /// - /// Filters here can only refer to the columns from the original log. + /// Filters here can apply to any LogExpr. pub filters: Vec, /// Adjacent lines to return. Applies to all filters above. /// @@ -90,8 +90,7 @@ pub enum LogExpr { alias: String, }, Filter { - expr: Box, - filter: ContentFilter, + filter: ColumnFilters, }, } @@ -282,12 +281,12 @@ impl TimeFilter { } } -/// Represents a column with filters to query. -#[derive(Debug, Serialize, Deserialize)] +/// Represents an expression with filters to query. +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ColumnFilters { - /// Case-sensitive column name to query. - pub column_name: String, - /// Filters to apply to the column. Can be empty. + /// Expression to apply filters to. Can be a column reference or any other LogExpr. + pub expr: Box, + /// Filters to apply to the expression result. Can be empty. pub filters: Vec, } diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index e9bb2eb9b6..9204be2382 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow_schema::Schema as ArrowSchema; use catalog::table_source::DfTableSourceProvider; use common_function::utils::escape_like_pattern; use datafusion::datasource::DefaultTableSource; @@ -21,7 +22,7 @@ use datafusion_expr::utils::conjunction; use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_sql::TableReference; use datatypes::schema::Schema; -use log_query::{ColumnFilters, LogExpr, LogQuery, TimeFilter}; +use log_query::{LogExpr, LogQuery, TimeFilter}; use snafu::{OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; @@ -68,6 +69,7 @@ impl LogQueryPlanner { // Build the initial scan plan let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None) .context(DataFusionPlanningSnafu)?; + let df_schema = plan_builder.schema().clone(); // Collect filter expressions let mut filters = Vec::new(); @@ -76,8 +78,8 @@ impl LogQueryPlanner { filters.push(self.build_time_filter(&query.time_filter, &schema)?); // Column filters - for column_filter in &query.filters { - if let Some(expr) = self.build_column_filter(column_filter)? { + for filter in &query.filters { + if let Some(expr) = self.build_column_filter(filter, df_schema.as_arrow())? { filters.push(expr); } } @@ -138,61 +140,56 @@ impl LogQueryPlanner { Ok(expr) } - /// Returns filter expressions - fn build_column_filter(&self, column_filter: &ColumnFilters) -> Result> { - if column_filter.filters.is_empty() { - return Ok(None); - } - - self.build_content_filters(&column_filter.column_name, &column_filter.filters) - } - - /// Builds filter expressions from content filters for a specific column - fn build_content_filters( + /// Builds filter expression from ColumnFilters (new structure with expr + filters) + fn build_column_filter( &self, - column_name: &str, - filters: &[log_query::ContentFilter], + column_filter: &log_query::ColumnFilters, + schema: &ArrowSchema, ) -> Result> { - if filters.is_empty() { - return Ok(None); - } + let col_expr = self.log_expr_to_column_expr(&column_filter.expr, schema)?; - let exprs = filters + let filter_exprs = column_filter + .filters .iter() - .map(|filter| self.build_content_filter(column_name, filter)) + .filter_map(|filter| { + self.build_content_filter_with_expr(col_expr.clone(), filter) + .transpose() + }) .try_collect::>()?; - Ok(conjunction(exprs)) + if filter_exprs.is_empty() { + return Ok(None); + } + + // Combine all filters with AND logic + Ok(conjunction(filter_exprs)) } - /// Builds a single content filter expression + /// Builds filter expression from a single ContentFilter using a provided column expression #[allow(clippy::only_used_in_recursion)] - fn build_content_filter( + fn build_content_filter_with_expr( &self, - column_name: &str, + col_expr: Expr, filter: &log_query::ContentFilter, - ) -> Result { + ) -> Result> { match filter { - log_query::ContentFilter::Exact(pattern) => { - Ok(col(column_name) - .like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern)))))) + log_query::ContentFilter::Exact(value) => Ok(Some( + col_expr.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(value))))), + )), + log_query::ContentFilter::Prefix(value) => Ok(Some(col_expr.like(lit( + ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(value)))), + )))), + log_query::ContentFilter::Postfix(value) => Ok(Some(col_expr.like(lit( + ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(value)))), + )))), + log_query::ContentFilter::Contains(value) => Ok(Some(col_expr.like(lit( + ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(value)))), + )))), + log_query::ContentFilter::Regex(_pattern) => Err(UnimplementedSnafu { + feature: "regex filter", } - log_query::ContentFilter::Prefix(pattern) => Ok(col(column_name).like(lit( - ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(pattern)))), - ))), - log_query::ContentFilter::Postfix(pattern) => Ok(col(column_name).like(lit( - ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(pattern)))), - ))), - log_query::ContentFilter::Contains(pattern) => Ok(col(column_name).like(lit( - ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(pattern)))), - ))), - log_query::ContentFilter::Regex(..) => Err::( - UnimplementedSnafu { - feature: "regex filter", - } - .build(), - ), - log_query::ContentFilter::Exist => Ok(col(column_name).is_not_null()), + .build()), + log_query::ContentFilter::Exist => Ok(Some(col_expr.is_not_null())), log_query::ContentFilter::Between { start, end, @@ -200,56 +197,65 @@ impl LogQueryPlanner { end_inclusive, } => { let left = if *start_inclusive { - Expr::gt_eq + col_expr + .clone() + .gt_eq(lit(ScalarValue::Utf8(Some(start.clone())))) } else { - Expr::gt + col_expr + .clone() + .gt(lit(ScalarValue::Utf8(Some(start.clone())))) }; let right = if *end_inclusive { - Expr::lt_eq + col_expr.lt_eq(lit(ScalarValue::Utf8(Some(end.clone())))) } else { - Expr::lt + col_expr.lt(lit(ScalarValue::Utf8(Some(end.clone())))) }; - Ok(left( - col(column_name), - lit(ScalarValue::Utf8(Some(escape_like_pattern(start)))), - ) - .and(right( - col(column_name), - lit(ScalarValue::Utf8(Some(escape_like_pattern(end)))), - ))) + Ok(Some(left.and(right))) } log_query::ContentFilter::GreatThan { value, inclusive } => { - let expr = if *inclusive { Expr::gt_eq } else { Expr::gt }; - Ok(expr( - col(column_name), - lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))), - )) + let comparison_expr = if *inclusive { + col_expr.gt_eq(lit(ScalarValue::Utf8(Some(value.clone())))) + } else { + col_expr.gt(lit(ScalarValue::Utf8(Some(value.clone())))) + }; + Ok(Some(comparison_expr)) } log_query::ContentFilter::LessThan { value, inclusive } => { - let expr = if *inclusive { Expr::lt_eq } else { Expr::lt }; - Ok(expr( - col(column_name), - lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))), - )) + if *inclusive { + Ok(Some( + col_expr.lt_eq(lit(ScalarValue::Utf8(Some(value.clone())))), + )) + } else { + Ok(Some( + col_expr.lt(lit(ScalarValue::Utf8(Some(value.clone())))), + )) + } } log_query::ContentFilter::In(values) => { - let list = values + let values: Vec<_> = values .iter() - .map(|value| lit(ScalarValue::Utf8(Some(escape_like_pattern(value))))) + .map(|v| lit(ScalarValue::Utf8(Some(v.clone())))) .collect(); - Ok(col(column_name).in_list(list, false)) + Ok(Some(col_expr.in_list(values, false))) } log_query::ContentFilter::Compound(filters, op) => { let exprs = filters .iter() - .map(|filter| self.build_content_filter(column_name, filter)) + .filter_map(|filter| { + self.build_content_filter_with_expr(col_expr.clone(), filter) + .transpose() + }) .try_collect::>()?; + if exprs.is_empty() { + return Ok(None); + } + match op { - log_query::BinaryOperator::And => Ok(conjunction(exprs).unwrap()), + log_query::BinaryOperator::And => Ok(conjunction(exprs)), log_query::BinaryOperator::Or => { // Build a disjunction (OR) of expressions - Ok(exprs.into_iter().reduce(|a, b| a.or(b)).unwrap()) + Ok(exprs.into_iter().reduce(|a, b| a.or(b))) } } } @@ -272,11 +278,11 @@ impl LogQueryPlanner { })?; let args = args .iter() - .map(|expr| self.log_expr_to_column_expr(expr, schema)) + .map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow())) .try_collect::>()?; let group_exprs = by .iter() - .map(|expr| self.log_expr_to_column_expr(expr, schema)) + .map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow())) .try_collect::>()?; let aggr_expr = aggr_fn.call(args); @@ -287,7 +293,7 @@ impl LogQueryPlanner { /// /// A column expression here can be a column identifier, a positional identifier, or a literal. /// They don't rely on the context of the query or other columns. - fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result { + fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &ArrowSchema) -> Result { match expr { LogExpr::NamedIdent(name) => Ok(col(name)), LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())), @@ -303,7 +309,7 @@ impl LogQueryPlanner { fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result { let args = args .iter() - .map(|expr| self.log_expr_to_column_expr(expr, schema)) + .map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow())) .try_collect::>()?; let func = self.session_state.scalar_functions().get(name).context( UnknownScalarFunctionSnafu { @@ -343,15 +349,13 @@ impl LogQueryPlanner { .aggregate(group_exprs, [aggr_expr.clone()]) .context(DataFusionPlanningSnafu)?; } - LogExpr::Filter { expr, filter } => { + LogExpr::Filter { filter } => { let schema = plan_builder.schema(); - let expr = self.log_expr_to_column_expr(expr, schema)?; - - let col_name = expr.schema_name().to_string(); - let filter_expr = self.build_content_filter(&col_name, filter)?; - plan_builder = plan_builder - .filter(filter_expr) - .context(DataFusionPlanningSnafu)?; + if let Some(filter_expr) = self.build_column_filter(filter, schema.as_arrow())? { + plan_builder = plan_builder + .filter(filter_expr) + .context(DataFusionPlanningSnafu)?; + } } LogExpr::ScalarFunc { name, args, alias } => { let schema = plan_builder.schema(); @@ -367,7 +371,7 @@ impl LogQueryPlanner { // nothing to do, return empty vec. } LogExpr::Alias { expr, alias } => { - let expr = self.log_expr_to_column_expr(expr, plan_builder.schema())?; + let expr = self.log_expr_to_column_expr(expr, plan_builder.schema().as_arrow())?; let aliased_expr = expr.alias(alias); plan_builder = plan_builder .project([aliased_expr.clone()]) @@ -395,7 +399,7 @@ mod tests { use datafusion::execution::SessionStateBuilder; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaRef}; - use log_query::{BinaryOperator, ContentFilter, Context, Limit}; + use log_query::{BinaryOperator, ColumnFilters, ContentFilter, Context, Limit, LogExpr}; use session::context::QueryContext; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::table_name::TableName; @@ -482,7 +486,7 @@ mod tests { span: None, }, filters: vec![ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::Contains("error".to_string())], }], limit: Limit { @@ -559,21 +563,24 @@ mod tests { } #[tokio::test] - async fn test_build_column_filter() { + async fn test_build_content_filter() { let table_provider = build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; let session_state = SessionStateBuilder::new().with_default_features().build(); let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema(); let column_filter = ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ ContentFilter::Contains("error".to_string()), ContentFilter::Prefix("WARN".to_string()), ], }; - let expr_option = planner.build_column_filter(&column_filter).unwrap(); + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); assert!(expr_option.is_some()); let expr = expr_option.unwrap(); @@ -600,7 +607,7 @@ mod tests { span: None, }, filters: vec![ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::Contains("error".to_string())], }], limit: Limit { @@ -635,7 +642,7 @@ mod tests { span: None, }, filters: vec![ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::Contains("error".to_string())], }], limit: Limit { @@ -743,14 +750,15 @@ mod tests { } #[tokio::test] - async fn test_build_column_filter_between() { + async fn test_build_content_filter_between() { let table_provider = build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; let session_state = SessionStateBuilder::new().with_default_features().build(); let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema(); let column_filter = ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::Between { start: "a".to_string(), end: "z".to_string(), @@ -759,7 +767,9 @@ mod tests { }], }; - let expr_option = planner.build_column_filter(&column_filter).unwrap(); + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); assert!(expr_option.is_some()); let expr = expr_option.unwrap(); @@ -828,16 +838,20 @@ mod tests { build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; let session_state = SessionStateBuilder::new().with_default_features().build(); let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema(); // Test AND compound - let filter = ContentFilter::Compound( - vec![ + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::NamedIdent("message".to_string())), + filters: vec![ ContentFilter::Contains("error".to_string()), ContentFilter::Prefix("WARN".to_string()), ], - BinaryOperator::And, - ); - let expr = planner.build_content_filter("message", &filter).unwrap(); + }; + let expr = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap() + .unwrap(); let expected_expr = col("message") .like(lit(ScalarValue::Utf8(Some("%error%".to_string())))) @@ -845,15 +859,21 @@ mod tests { assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); - // Test OR compound - let filter = ContentFilter::Compound( - vec![ - ContentFilter::Contains("error".to_string()), - ContentFilter::Prefix("WARN".to_string()), - ], - BinaryOperator::Or, - ); - let expr = planner.build_content_filter("message", &filter).unwrap(); + // Test OR compound - use Compound filter for OR logic + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::NamedIdent("message".to_string())), + filters: vec![ContentFilter::Compound( + vec![ + ContentFilter::Contains("error".to_string()), + ContentFilter::Prefix("WARN".to_string()), + ], + BinaryOperator::Or, + )], + }; + let expr = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap() + .unwrap(); let expected_expr = col("message") .like(lit(ScalarValue::Utf8(Some("%error%".to_string())))) @@ -862,20 +882,26 @@ mod tests { assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); // Test nested compound - let filter = ContentFilter::Compound( - vec![ - ContentFilter::Contains("error".to_string()), - ContentFilter::Compound( - vec![ - ContentFilter::Prefix("WARN".to_string()), - ContentFilter::Exact("DEBUG".to_string()), - ], - BinaryOperator::Or, - ), - ], - BinaryOperator::And, - ); - let expr = planner.build_content_filter("message", &filter).unwrap(); + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::NamedIdent("message".to_string())), + filters: vec![ContentFilter::Compound( + vec![ + ContentFilter::Contains("error".to_string()), + ContentFilter::Compound( + vec![ + ContentFilter::Prefix("WARN".to_string()), + ContentFilter::Exact("DEBUG".to_string()), + ], + BinaryOperator::Or, + ), + ], + BinaryOperator::And, + )], + }; + let expr = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap() + .unwrap(); let expected_nested = col("message") .like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))) @@ -893,17 +919,20 @@ mod tests { build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; let session_state = SessionStateBuilder::new().with_default_features().build(); let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema(); // Test GreatThan with inclusive=true let column_filter = ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::GreatThan { value: "error".to_string(), inclusive: true, }], }; - let expr_option = planner.build_column_filter(&column_filter).unwrap(); + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); assert!(expr_option.is_some()); let expr = expr_option.unwrap(); @@ -913,14 +942,16 @@ mod tests { // Test GreatThan with inclusive=false let column_filter = ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::GreatThan { value: "error".to_string(), inclusive: false, }], }; - let expr_option = planner.build_column_filter(&column_filter).unwrap(); + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); assert!(expr_option.is_some()); let expr = expr_option.unwrap(); @@ -935,17 +966,20 @@ mod tests { build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; let session_state = SessionStateBuilder::new().with_default_features().build(); let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema(); // Test LessThan with inclusive=true let column_filter = ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::LessThan { value: "error".to_string(), inclusive: true, }], }; - let expr_option = planner.build_column_filter(&column_filter).unwrap(); + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); assert!(expr_option.is_some()); let expr = expr_option.unwrap(); @@ -955,14 +989,16 @@ mod tests { // Test LessThan with inclusive=false let column_filter = ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::LessThan { value: "error".to_string(), inclusive: false, }], }; - let expr_option = planner.build_column_filter(&column_filter).unwrap(); + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); assert!(expr_option.is_some()); let expr = expr_option.unwrap(); @@ -977,10 +1013,11 @@ mod tests { build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; let session_state = SessionStateBuilder::new().with_default_features().build(); let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema(); // Test In filter with multiple values let column_filter = ColumnFilters { - column_name: "message".to_string(), + expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::In(vec![ "error".to_string(), "warning".to_string(), @@ -988,7 +1025,9 @@ mod tests { ])], }; - let expr_option = planner.build_column_filter(&column_filter).unwrap(); + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); assert!(expr_option.is_some()); let expr = expr_option.unwrap();