From 3a2f5413e0690fc88f7b9b09d27f1b1640a40c25 Mon Sep 17 00:00:00 2001 From: localhost Date: Fri, 8 Aug 2025 16:48:03 +0800 Subject: [PATCH] chore: add and/or for log query (#6681) * chore: add and/or for log query * chore: remove impl From> for Filters --- src/log-query/src/log_query.rs | 42 ++++++++++++++++- src/query/src/log_query/planner.rs | 75 +++++++++++++++++++++++------- tests-integration/tests/http.rs | 2 +- 3 files changed, 99 insertions(+), 20 deletions(-) diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index 5d40f2c387..6196e24998 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -43,7 +43,7 @@ pub struct LogQuery { /// Conjunction of filters to apply for the raw logs. /// /// Filters here can apply to any LogExpr. - pub filters: Vec, + pub filters: Filters, /// Adjacent lines to return. Applies to all filters above. /// /// TODO(ruihang): Do we need per-filter context? @@ -54,6 +54,44 @@ pub struct LogQuery { pub exprs: Vec, } +/// Nested filter structure that supports and/or relationships +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Filters { + /// Single filter condition + Single(ColumnFilters), + /// Multiple filters with AND relationship + And(Vec), + /// Multiple filters with OR relationship + Or(Vec), + Not(Box), +} + +impl Default for Filters { + fn default() -> Self { + Filters::And(vec![]) + } +} + +impl From for Filters { + fn from(filter: ColumnFilters) -> Self { + Filters::Single(filter) + } +} + +impl Filters { + pub fn and>(other: Vec) -> Filters { + Filters::And(other.into_iter().map(Into::into).collect()) + } + + pub fn or>(other: Vec) -> Filters { + Filters::Or(other.into_iter().map(Into::into).collect()) + } + + pub fn single(filter: ColumnFilters) -> Filters { + Filters::Single(filter) + } +} + /// Expression to calculate on log after filtering. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum LogExpr { @@ -99,7 +137,7 @@ impl Default for LogQuery { Self { table: TableName::new("", "", ""), time_filter: Default::default(), - filters: vec![], + filters: Filters::And(vec![]), limit: Limit::default(), context: Default::default(), columns: vec![], diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index 4608973d06..6fe9a1b73f 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -18,8 +18,8 @@ use common_function::utils::escape_like_pattern; use datafusion::datasource::DefaultTableSource; use datafusion::execution::SessionState; use datafusion_common::{DFSchema, ScalarValue}; -use datafusion_expr::utils::conjunction; -use datafusion_expr::{col, lit, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator}; +use datafusion_expr::utils::{conjunction, disjunction}; +use datafusion_expr::{col, lit, not, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator}; use datafusion_sql::TableReference; use datatypes::schema::Schema; use log_query::{BinaryOperator, LogExpr, LogQuery, TimeFilter}; @@ -77,11 +77,8 @@ impl LogQueryPlanner { // Time filter filters.push(self.build_time_filter(&query.time_filter, &schema)?); - // Column filters - for filter in &query.filters { - if let Some(expr) = self.build_column_filter(filter, df_schema.as_arrow())? { - filters.push(expr); - } + if let Some(filters_expr) = self.build_filters(&query.filters, df_schema.as_arrow())? { + filters.push(filters_expr); } // Apply filters @@ -139,6 +136,48 @@ impl LogQueryPlanner { Ok(expr) } + //disjunction + fn build_filters( + &self, + filters: &log_query::Filters, + schema: &ArrowSchema, + ) -> Result> { + match filters { + log_query::Filters::And(filters) => { + let exprs = filters + .iter() + .filter_map(|filter| self.build_filters(filter, schema).transpose()) + .try_collect::>()?; + if exprs.is_empty() { + Ok(None) + } else { + Ok(conjunction(exprs)) + } + } + log_query::Filters::Or(filters) => { + let exprs = filters + .iter() + .filter_map(|filter| self.build_filters(filter, schema).transpose()) + .try_collect::>()?; + if exprs.is_empty() { + Ok(None) + } else { + Ok(disjunction(exprs)) + } + } + log_query::Filters::Not(filter) => { + if let Some(expr) = self.build_filters(filter, schema)? { + Ok(Some(not(expr))) + } else { + Ok(None) + } + } + log_query::Filters::Single(column_filters) => { + // Build a single column filter + self.build_column_filter(column_filters, schema) + } + } + } /// Builds filter expression from ColumnFilters (new structure with expr + filters) fn build_column_filter( @@ -455,7 +494,9 @@ mod tests { use datafusion::execution::SessionStateBuilder; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaRef}; - use log_query::{ColumnFilters, ConjunctionOperator, ContentFilter, Context, Limit, LogExpr}; + use log_query::{ + ColumnFilters, ConjunctionOperator, ContentFilter, Context, Filters, Limit, LogExpr, + }; use session::context::QueryContext; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::table_name::TableName; @@ -546,10 +587,10 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - filters: vec![ColumnFilters { + filters: Filters::Single(ColumnFilters { expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::Contains("error".to_string())], - }], + }), limit: Limit { skip: None, fetch: Some(100), @@ -667,10 +708,10 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - filters: vec![ColumnFilters { + filters: Filters::Single(ColumnFilters { expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::Contains("error".to_string())], - }], + }), limit: Limit { skip: Some(10), fetch: None, @@ -702,10 +743,10 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - filters: vec![ColumnFilters { + filters: Filters::Single(ColumnFilters { expr: Box::new(LogExpr::NamedIdent("message".to_string())), filters: vec![ContentFilter::Contains("error".to_string())], - }], + }), limit: Limit { skip: None, fetch: None, @@ -745,7 +786,7 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - filters: vec![], + filters: Default::default(), limit: Limit { skip: None, fetch: Some(100), @@ -784,7 +825,7 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - filters: vec![], + filters: Default::default(), limit: Limit { skip: None, fetch: Some(100), @@ -855,7 +896,7 @@ mod tests { end: Some("2021-01-02T00:00:00Z".to_string()), span: None, }, - filters: vec![], + filters: Default::default(), limit: Limit { skip: Some(0), fetch: None, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index ef072b477b..8c11920ec6 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -5009,7 +5009,7 @@ pub async fn test_log_query(store_type: StorageType) { fetch: Some(1), }, columns: vec!["ts".to_string(), "message".to_string()], - filters: vec![], + filters: Default::default(), context: Context::None, exprs: vec![], };