From c8da35c7e56f41849b6be9bbb09fa7b28a292e3f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 5 Aug 2025 21:38:25 -0700 Subject: [PATCH] feat(log-query): support binary op, scalar fn & is_true/is_false (#6659) * rename symbol Signed-off-by: Ruihang Xia * handle binary op Signed-off-by: Ruihang Xia * update test results Signed-off-by: Ruihang Xia * Update src/query/src/log_query/planner.rs Co-authored-by: Yingwen * fix format Signed-off-by: Ruihang Xia * reduce duplication Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: Yingwen --- src/log-query/src/log_query.rs | 30 +++- src/query/src/log_query/planner.rs | 217 ++++++++++++++++++++++------- 2 files changed, 195 insertions(+), 52 deletions(-) diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index 9fc9f7733d..5d40f2c387 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -82,7 +82,7 @@ pub enum LogExpr { }, BinaryOp { left: Box, - op: String, + op: BinaryOperator, right: Box, }, Alias { @@ -326,14 +326,38 @@ pub enum ContentFilter { inclusive: bool, }, In(Vec), - // TODO(ruihang): arithmetic operations + IsTrue, + IsFalse, // Compound filters - Compound(Vec, BinaryOperator), + Compound(Vec, ConjunctionOperator), } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ConjunctionOperator { + And, + Or, +} + +/// Binary operators for LogExpr::BinaryOp. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum BinaryOperator { + // Comparison operators + Eq, + Ne, + Lt, + Le, + Gt, + Ge, + + // Arithmetic operators + Plus, + Minus, + Multiply, + Divide, + Modulo, + + // Logical operators And, Or, } diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index 9204be2382..4608973d06 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -19,10 +19,10 @@ use datafusion::datasource::DefaultTableSource; use datafusion::execution::SessionState; use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::utils::conjunction; -use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}; +use datafusion_expr::{col, lit, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator}; use datafusion_sql::TableReference; use datatypes::schema::Schema; -use log_query::{LogExpr, LogQuery, TimeFilter}; +use log_query::{BinaryOperator, LogExpr, LogQuery, TimeFilter}; use snafu::{OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; @@ -146,7 +146,9 @@ impl LogQueryPlanner { column_filter: &log_query::ColumnFilters, schema: &ArrowSchema, ) -> Result> { - let col_expr = self.log_expr_to_column_expr(&column_filter.expr, schema)?; + // Convert ArrowSchema to DFSchema for the more generic function + let df_schema = DFSchema::try_from(schema.clone()).context(DataFusionPlanningSnafu)?; + let col_expr = self.log_expr_to_df_expr(&column_filter.expr, &df_schema)?; let filter_exprs = column_filter .filters @@ -238,6 +240,8 @@ impl LogQueryPlanner { .collect(); Ok(Some(col_expr.in_list(values, false))) } + log_query::ContentFilter::IsTrue => Ok(Some(col_expr.is_true())), + log_query::ContentFilter::IsFalse => Ok(Some(col_expr.is_false())), log_query::ContentFilter::Compound(filters, op) => { let exprs = filters .iter() @@ -252,8 +256,8 @@ impl LogQueryPlanner { } match op { - log_query::BinaryOperator::And => Ok(conjunction(exprs)), - log_query::BinaryOperator::Or => { + log_query::ConjunctionOperator::And => Ok(conjunction(exprs)), + log_query::ConjunctionOperator::Or => { // Build a disjunction (OR) of expressions Ok(exprs.into_iter().reduce(|a, b| a.or(b))) } @@ -278,38 +282,61 @@ impl LogQueryPlanner { })?; let args = args .iter() - .map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow())) + .map(|expr| self.log_expr_to_df_expr(expr, schema)) .try_collect::>()?; let group_exprs = by .iter() - .map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow())) + .map(|expr| self.log_expr_to_df_expr(expr, schema)) .try_collect::>()?; let aggr_expr = aggr_fn.call(args); Ok((aggr_expr, group_exprs)) } - /// Converts a log expression to a column expression. - /// - /// A column expression here can be a column identifier, a positional identifier, or a literal. - /// They don't rely on the context of the query or other columns. - fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &ArrowSchema) -> Result { + /// Converts a LogExpr to a DataFusion Expr, handling all expression types. + fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result { match expr { LogExpr::NamedIdent(name) => Ok(col(name)), LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())), LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))), - _ => UnexpectedLogExprSnafu { - expr: expr.clone(), - expected: "named identifier, positional identifier, or literal", + LogExpr::BinaryOp { left, op, right } => { + let left_expr = self.log_expr_to_df_expr(left, schema)?; + let right_expr = self.log_expr_to_df_expr(right, schema)?; + let df_op = Self::binary_operator_to_df_operator(op); + + Ok(Expr::BinaryExpr(BinaryExpr { + left: Box::new(left_expr), + op: df_op, + right: Box::new(right_expr), + })) + } + LogExpr::ScalarFunc { name, args, alias } => { + self.build_scalar_func(schema, name, args, alias) + } + LogExpr::Alias { expr, alias } => { + let df_expr = self.log_expr_to_df_expr(expr, schema)?; + Ok(df_expr.alias(alias)) + } + LogExpr::AggrFunc { .. } | LogExpr::Filter { .. } | LogExpr::Decompose { .. } => { + UnexpectedLogExprSnafu { + expr: expr.clone(), + expected: "not a typical expression", + } + .fail() } - .fail(), } } - fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result { + fn build_scalar_func( + &self, + schema: &DFSchema, + name: &str, + args: &[LogExpr], + alias: &Option, + ) -> Result { let args = args .iter() - .map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow())) + .map(|expr| self.log_expr_to_df_expr(expr, schema)) .try_collect::>()?; let func = self.session_state.scalar_functions().get(name).context( UnknownScalarFunctionSnafu { @@ -318,7 +345,30 @@ impl LogQueryPlanner { )?; let expr = func.call(args); - Ok(expr) + if let Some(alias) = alias { + Ok(expr.alias(alias)) + } else { + Ok(expr) + } + } + + /// Convert BinaryOperator to DataFusion's Operator. + fn binary_operator_to_df_operator(op: &BinaryOperator) -> Operator { + match op { + BinaryOperator::Eq => Operator::Eq, + BinaryOperator::Ne => Operator::NotEq, + BinaryOperator::Lt => Operator::Lt, + BinaryOperator::Le => Operator::LtEq, + BinaryOperator::Gt => Operator::Gt, + BinaryOperator::Ge => Operator::GtEq, + BinaryOperator::Plus => Operator::Plus, + BinaryOperator::Minus => Operator::Minus, + BinaryOperator::Multiply => Operator::Multiply, + BinaryOperator::Divide => Operator::Divide, + BinaryOperator::Modulo => Operator::Modulo, + BinaryOperator::And => Operator::And, + BinaryOperator::Or => Operator::Or, + } } /// Process LogExpr recursively. @@ -359,24 +409,30 @@ impl LogQueryPlanner { } LogExpr::ScalarFunc { name, args, alias } => { let schema = plan_builder.schema(); - let mut expr = self.build_scalar_func(schema, name, args)?; - if let Some(alias) = alias { - expr = expr.alias(alias); - } + let expr = self.build_scalar_func(schema, name, args, alias)?; plan_builder = plan_builder - .project([expr.clone()]) + .project([expr]) .context(DataFusionPlanningSnafu)?; } LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => { // nothing to do, return empty vec. } LogExpr::Alias { expr, alias } => { - let expr = self.log_expr_to_column_expr(expr, plan_builder.schema().as_arrow())?; - let aliased_expr = expr.alias(alias); + let schema = plan_builder.schema(); + let df_expr = self.log_expr_to_df_expr(expr, schema)?; + let aliased_expr = df_expr.alias(alias); plan_builder = plan_builder .project([aliased_expr.clone()]) .context(DataFusionPlanningSnafu)?; } + LogExpr::BinaryOp { .. } => { + let schema = plan_builder.schema(); + let binary_expr = self.log_expr_to_df_expr(expr, schema)?; + + plan_builder = plan_builder + .project([binary_expr]) + .context(DataFusionPlanningSnafu)?; + } _ => { UnimplementedSnafu { feature: "log expression", @@ -399,7 +455,7 @@ mod tests { use datafusion::execution::SessionStateBuilder; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaRef}; - use log_query::{BinaryOperator, ColumnFilters, ContentFilter, Context, Limit, LogExpr}; + use log_query::{ColumnFilters, ConjunctionOperator, ContentFilter, Context, Limit, LogExpr}; use session::context::QueryContext; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::table_name::TableName; @@ -425,12 +481,17 @@ mod tests { ConcreteDataType::string_datatype(), true, ), + ColumnSchema::new( + "is_active".to_string(), + ConcreteDataType::boolean_datatype(), + true, + ), ]; Arc::new(Schema::new(columns)) } - /// Registers table under `greptime`, with `message` and `timestamp` and `host` columns. + /// Registers table under `greptime`, with `message`, `timestamp`, `host`, and `is_active` columns. async fn build_test_table_provider( table_name_tuples: &[(String, String)], ) -> DfTableSourceProvider { @@ -499,9 +560,9 @@ mod tests { }; let plan = planner.query_to_plan(log_query).await.unwrap(); - let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -620,9 +681,9 @@ mod tests { }; let plan = planner.query_to_plan(log_query).await.unwrap(); - let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -655,9 +716,9 @@ mod tests { }; let plan = planner.query_to_plan(log_query).await.unwrap(); - let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -702,9 +763,9 @@ mod tests { let plan = planner.query_to_plan(log_query).await.unwrap(); let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\ -\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; +\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -742,9 +803,9 @@ mod tests { let plan = planner.query_to_plan(log_query).await.unwrap(); let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\ - \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ - \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ - \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ + \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ + \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -825,9 +886,9 @@ mod tests { let plan = planner.query_to_plan(log_query).await.unwrap(); let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\ \n Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\ -\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ -\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; +\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]"; assert_eq!(plan.display_indent_schema().to_string(), expected); } @@ -867,7 +928,7 @@ mod tests { ContentFilter::Contains("error".to_string()), ContentFilter::Prefix("WARN".to_string()), ], - BinaryOperator::Or, + ConjunctionOperator::Or, )], }; let expr = planner @@ -892,10 +953,10 @@ mod tests { ContentFilter::Prefix("WARN".to_string()), ContentFilter::Exact("DEBUG".to_string()), ], - BinaryOperator::Or, + ConjunctionOperator::Or, ), ], - BinaryOperator::And, + ConjunctionOperator::And, )], }; let expr = planner @@ -1042,4 +1103,62 @@ mod tests { assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); } + + #[tokio::test] + async fn test_build_is_true_filter() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema(); + + // Test IsTrue filter + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::NamedIdent("is_active".to_string())), + filters: vec![ContentFilter::IsTrue], + }; + + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + let expected_expr_string = + "IsTrue(Column(Column { relation: None, name: \"is_active\" }))".to_string(); + + assert_eq!(format!("{:?}", expr), expected_expr_string); + } + + #[tokio::test] + async fn test_build_filter_with_scalar_fn() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema(); + + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::BinaryOp { + left: Box::new(LogExpr::ScalarFunc { + name: "character_length".to_string(), + args: vec![LogExpr::NamedIdent("message".to_string())], + alias: None, + }), + op: BinaryOperator::Gt, + right: Box::new(LogExpr::Literal("100".to_string())), + }), + filters: vec![ContentFilter::IsTrue], + }; + + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + let expected_expr_string = "character_length(message) > Utf8(\"100\") IS TRUE"; + + assert_eq!(format!("{}", expr), expected_expr_string); + } }