From ccccaf77347bf1c24e42e9dc7ad79f39115072a6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 12 Aug 2025 23:28:37 -0700 Subject: [PATCH] feat(log-query): try infer and cast type for literal value (#6712) * initial impl Signed-off-by: Ruihang Xia * one more test Signed-off-by: Ruihang Xia * remove duplicated test cases Signed-off-by: Ruihang Xia * remove duplicated methods Signed-off-by: Ruihang Xia * initial impl Signed-off-by: Ruihang Xia * one more test Signed-off-by: Ruihang Xia * remove duplicated test cases Signed-off-by: Ruihang Xia * remove duplicated methods Signed-off-by: Ruihang Xia * chore: add eq for log query * skip for both literals Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: paomian --- src/log-query/src/log_query.rs | 37 +++ src/query/src/log_query/planner.rs | 418 ++++++++++++++++++++++++++--- 2 files changed, 419 insertions(+), 36 deletions(-) diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index 6196e24998..fce7d8a712 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -328,6 +328,42 @@ pub struct ColumnFilters { pub filters: Vec, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum EqualValue { + /// Exact match with a string value. + String(String), + /// Exact match with a boolean value. + Boolean(bool), + /// Exact match with a number value. + Int(i64), + /// Exact match with a float value. + Float(f64), +} + +impl From for EqualValue { + fn from(value: String) -> Self { + EqualValue::String(value) + } +} + +impl From for EqualValue { + fn from(value: bool) -> Self { + EqualValue::Boolean(value) + } +} + +impl From for EqualValue { + fn from(value: i64) -> Self { + EqualValue::Int(value) + } +} + +impl From for EqualValue { + fn from(value: f64) -> Self { + EqualValue::Float(value) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ContentFilter { // Search-based filters @@ -366,6 +402,7 @@ pub enum ContentFilter { In(Vec), IsTrue, IsFalse, + Equal(EqualValue), // Compound filters Compound(Vec, ConjunctionOperator), diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index 6fe9a1b73f..b25a116959 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_schema::Schema as ArrowSchema; +use arrow_schema::{DataType, Schema as ArrowSchema}; use catalog::table_source::DfTableSourceProvider; use common_function::utils::escape_like_pattern; use datafusion::datasource::DefaultTableSource; use datafusion::execution::SessionState; use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::utils::{conjunction, disjunction}; -use datafusion_expr::{col, lit, not, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator}; +use datafusion_expr::{ + col, lit, not, BinaryExpr, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, Operator, +}; use datafusion_sql::TableReference; use datatypes::schema::Schema; -use log_query::{BinaryOperator, LogExpr, LogQuery, TimeFilter}; +use log_query::{BinaryOperator, EqualValue, LogExpr, LogQuery, TimeFilter}; use snafu::{OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; @@ -193,7 +195,7 @@ impl LogQueryPlanner { .filters .iter() .filter_map(|filter| { - self.build_content_filter_with_expr(col_expr.clone(), filter) + self.build_content_filter_with_expr(col_expr.clone(), filter, &df_schema) .transpose() }) .try_collect::>()?; @@ -212,6 +214,7 @@ impl LogQueryPlanner { &self, col_expr: Expr, filter: &log_query::ContentFilter, + schema: &DFSchema, ) -> Result> { match filter { log_query::ContentFilter::Exact(value) => Ok(Some( @@ -237,55 +240,56 @@ impl LogQueryPlanner { start_inclusive, end_inclusive, } => { + let start_literal = self.create_inferred_literal(start, &col_expr, schema); + let end_literal = self.create_inferred_literal(end, &col_expr, schema); + let left = if *start_inclusive { - col_expr - .clone() - .gt_eq(lit(ScalarValue::Utf8(Some(start.clone())))) + col_expr.clone().gt_eq(start_literal) } else { - col_expr - .clone() - .gt(lit(ScalarValue::Utf8(Some(start.clone())))) + col_expr.clone().gt(start_literal) }; let right = if *end_inclusive { - col_expr.lt_eq(lit(ScalarValue::Utf8(Some(end.clone())))) + col_expr.lt_eq(end_literal) } else { - col_expr.lt(lit(ScalarValue::Utf8(Some(end.clone())))) + col_expr.lt(end_literal) }; Ok(Some(left.and(right))) } log_query::ContentFilter::GreatThan { value, inclusive } => { + let value_literal = self.create_inferred_literal(value, &col_expr, schema); let comparison_expr = if *inclusive { - col_expr.gt_eq(lit(ScalarValue::Utf8(Some(value.clone())))) + col_expr.gt_eq(value_literal) } else { - col_expr.gt(lit(ScalarValue::Utf8(Some(value.clone())))) + col_expr.gt(value_literal) }; Ok(Some(comparison_expr)) } log_query::ContentFilter::LessThan { value, inclusive } => { + let value_literal = self.create_inferred_literal(value, &col_expr, schema); if *inclusive { - Ok(Some( - col_expr.lt_eq(lit(ScalarValue::Utf8(Some(value.clone())))), - )) + Ok(Some(col_expr.lt_eq(value_literal))) } else { - Ok(Some( - col_expr.lt(lit(ScalarValue::Utf8(Some(value.clone())))), - )) + Ok(Some(col_expr.lt(value_literal))) } } log_query::ContentFilter::In(values) => { - let values: Vec<_> = values + let inferred_values: Vec<_> = values .iter() - .map(|v| lit(ScalarValue::Utf8(Some(v.clone())))) + .map(|v| self.create_inferred_literal(v, &col_expr, schema)) .collect(); - Ok(Some(col_expr.in_list(values, false))) + Ok(Some(col_expr.in_list(inferred_values, false))) } log_query::ContentFilter::IsTrue => Ok(Some(col_expr.is_true())), log_query::ContentFilter::IsFalse => Ok(Some(col_expr.is_false())), + log_query::ContentFilter::Equal(value) => { + let value_literal = Self::create_eq_literal(value.clone()); + Ok(Some(col_expr.eq(value_literal))) + } log_query::ContentFilter::Compound(filters, op) => { let exprs = filters .iter() .filter_map(|filter| { - self.build_content_filter_with_expr(col_expr.clone(), filter) + self.build_content_filter_with_expr(col_expr.clone(), filter, schema) .transpose() }) .try_collect::>()?; @@ -339,15 +343,8 @@ impl LogQueryPlanner { LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())), LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))), LogExpr::BinaryOp { left, op, right } => { - let left_expr = self.log_expr_to_df_expr(left, schema)?; - let right_expr = self.log_expr_to_df_expr(right, schema)?; - let df_op = Self::binary_operator_to_df_operator(op); - - Ok(Expr::BinaryExpr(BinaryExpr { - left: Box::new(left_expr), - op: df_op, - right: Box::new(right_expr), - })) + // For binary operations, always use type inference (matches original behavior) + self.build_binary_expr(left, op, right, schema) } LogExpr::ScalarFunc { name, args, alias } => { self.build_scalar_func(schema, name, args, alias) @@ -410,6 +407,77 @@ impl LogQueryPlanner { } } + /// Parse a string literal to the appropriate ScalarValue based on target DataType. + /// Falls back to UTF8 if parsing fails or type is not supported. + fn infer_literal_scalar_value(&self, literal: &str, target_type: &DataType) -> ScalarValue { + let utf8_literal = ScalarValue::Utf8(Some(literal.to_string())); + utf8_literal.cast_to(target_type).unwrap_or(utf8_literal) + } + + /// Build binary expression with type inference for literals. + /// Attempts to infer literal types from the non-literal operand's type. + fn build_binary_expr( + &self, + left: &LogExpr, + op: &BinaryOperator, + right: &LogExpr, + schema: &DFSchema, + ) -> Result { + // Convert both sides to DataFusion expressions first + let mut left_expr = self.log_expr_to_df_expr(left, schema)?; + let mut right_expr = self.log_expr_to_df_expr(right, schema)?; + + // Try to infer literal types based on the other operand + match (left, right) { + (LogExpr::Literal(_), LogExpr::Literal(_)) => { + // both are literal, do nothing + } + (LogExpr::Literal(literal), _) => { + // Left is literal, try to infer from right + if let Ok(right_type) = right_expr.get_type(schema) { + let inferred_scalar = self.infer_literal_scalar_value(literal, &right_type); + left_expr = lit(inferred_scalar); + } + } + (_, LogExpr::Literal(literal)) => { + // Right is literal, try to infer from left + if let Ok(left_type) = left_expr.get_type(schema) { + let inferred_scalar = self.infer_literal_scalar_value(literal, &left_type); + right_expr = lit(inferred_scalar); + } + } + _ => { + // Neither is a simple literal, no type inference needed + } + } + + let df_op = Self::binary_operator_to_df_operator(op); + Ok(Expr::BinaryExpr(BinaryExpr { + left: Box::new(left_expr), + op: df_op, + right: Box::new(right_expr), + })) + } + + /// Create a type-inferred literal based on the provided expression's type. + /// Falls back to UTF8 if type inference fails. + fn create_inferred_literal(&self, value: &str, expr: &Expr, schema: &DFSchema) -> Expr { + if let Ok(expr_type) = expr.get_type(schema) { + lit(self.infer_literal_scalar_value(value, &expr_type)) + } else { + lit(ScalarValue::Utf8(Some(value.to_string()))) + } + } + + fn create_eq_literal(value: EqualValue) -> Expr { + match value { + EqualValue::String(s) => lit(ScalarValue::Utf8(Some(s))), + EqualValue::Float(n) => lit(ScalarValue::Float64(Some(n))), + EqualValue::Int(n) => lit(ScalarValue::Int64(Some(n))), + EqualValue::Boolean(b) => lit(ScalarValue::Boolean(Some(b))), + } + } + /// Process LogExpr recursively. /// /// Return the [`LogicalPlanBuilder`] after modification and the resulting expression's names. @@ -532,15 +600,69 @@ mod tests { Arc::new(Schema::new(columns)) } + fn mock_schema_with_typed_columns() -> SchemaRef { + let columns = vec![ + ColumnSchema::new( + "message".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + "timestamp".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "host".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "is_active".to_string(), + ConcreteDataType::boolean_datatype(), + true, + ), + // Add more typed columns for comprehensive testing + ColumnSchema::new("age".to_string(), ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "score".to_string(), + ConcreteDataType::float64_datatype(), + true, + ), + ColumnSchema::new( + "count".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + ]; + + Arc::new(Schema::new(columns)) + } + /// Registers table under `greptime`, with `message`, `timestamp`, `host`, and `is_active` columns. async fn build_test_table_provider( table_name_tuples: &[(String, String)], + ) -> DfTableSourceProvider { + build_test_table_provider_with_schema(table_name_tuples, mock_schema()).await + } + + /// Registers table under `greptime`, with typed columns for type inference tests. + async fn build_test_table_provider_with_typed_columns( + table_name_tuples: &[(String, String)], + ) -> DfTableSourceProvider { + build_test_table_provider_with_schema(table_name_tuples, mock_schema_with_typed_columns()) + .await + } + + async fn build_test_table_provider_with_schema( + table_name_tuples: &[(String, String)], + schema: SchemaRef, ) -> DfTableSourceProvider { let catalog_list = MemoryCatalogManager::with_default_setup(); for (schema_name, table_name) in table_name_tuples { - let schema = mock_schema(); let table_meta = TableMetaBuilder::empty() - .schema(schema) + .schema(schema.clone()) .primary_key_indices(vec![2]) .value_indices(vec![0]) .next_column_id(1024) @@ -1198,8 +1320,232 @@ mod tests { assert!(expr_option.is_some()); let expr = expr_option.unwrap(); - let expected_expr_string = "character_length(message) > Utf8(\"100\") IS TRUE"; + let expected_expr_string = "character_length(message) > Int32(100) IS TRUE"; assert_eq!(format!("{}", expr), expected_expr_string); } + + #[tokio::test] + async fn test_type_inference_float_comparison() { + let table_provider = build_test_table_provider_with_typed_columns(&[( + "public".to_string(), + "test_table".to_string(), + )]) + .await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema_with_typed_columns(); + + // Test Between with float column and string literals + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::NamedIdent("score".to_string())), + filters: vec![ContentFilter::Between { + start: "75.5".to_string(), + end: "100.0".to_string(), + start_inclusive: true, + end_inclusive: false, + }], + }; + + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + // Should infer literals as Float64 since score is a float64 column + let expected_expr = col("score") + .gt_eq(lit(ScalarValue::Float64(Some(75.5)))) + .and(col("score").lt(lit(ScalarValue::Float64(Some(100.0))))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_type_inference_boolean_comparison() { + let table_provider = build_test_table_provider_with_typed_columns(&[( + "public".to_string(), + "test_table".to_string(), + )]) + .await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema_with_typed_columns(); + + // Test In filter with boolean column and string literals + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::NamedIdent("is_active".to_string())), + filters: vec![ContentFilter::In(vec![ + "true".to_string(), + "1".to_string(), + "false".to_string(), + ])], + }; + + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + // Should infer string literals as boolean values + let expected_expr = col("is_active").in_list( + vec![ + lit(ScalarValue::Boolean(Some(true))), + lit(ScalarValue::Boolean(Some(true))), + lit(ScalarValue::Boolean(Some(false))), + ], + false, + ); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_fallback_to_utf8_on_parse_failure() { + let table_provider = build_test_table_provider_with_typed_columns(&[( + "public".to_string(), + "test_table".to_string(), + )]) + .await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema_with_typed_columns(); + + // Test with invalid number format - should fallback to UTF8 + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::NamedIdent("age".to_string())), + filters: vec![ContentFilter::GreatThan { + value: "not_a_number".to_string(), + inclusive: false, + }], + }; + + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + // Should fallback to UTF8 since "not_a_number" can't be parsed as int32 + let expected_expr = col("age").gt(lit(ScalarValue::Utf8(Some("not_a_number".to_string())))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_string_column_remains_utf8() { + let table_provider = build_test_table_provider_with_typed_columns(&[( + "public".to_string(), + "test_table".to_string(), + )]) + .await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema_with_typed_columns(); + + // Test with string column - should remain UTF8 even if value looks like a number + let column_filter = ColumnFilters { + expr: Box::new(LogExpr::NamedIdent("message".to_string())), + filters: vec![ContentFilter::GreatThan { + value: "123".to_string(), + inclusive: false, + }], + }; + + let expr_option = planner + .build_column_filter(&column_filter, schema.arrow_schema()) + .unwrap(); + assert!(expr_option.is_some()); + + let expr = expr_option.unwrap(); + // Should remain UTF8 since message is a string column + let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("123".to_string())))); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + + #[tokio::test] + async fn test_all_binary_operators() { + let table_provider = build_test_table_provider_with_typed_columns(&[( + "public".to_string(), + "test_table".to_string(), + )]) + .await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema_with_typed_columns(); + + let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap(); + + // Test all comparison operators + let test_cases = vec![ + (BinaryOperator::Eq, Operator::Eq), + (BinaryOperator::Ne, Operator::NotEq), + (BinaryOperator::Lt, Operator::Lt), + (BinaryOperator::Le, Operator::LtEq), + (BinaryOperator::Gt, Operator::Gt), + (BinaryOperator::Ge, Operator::GtEq), + (BinaryOperator::Plus, Operator::Plus), + (BinaryOperator::Minus, Operator::Minus), + (BinaryOperator::Multiply, Operator::Multiply), + (BinaryOperator::Divide, Operator::Divide), + (BinaryOperator::Modulo, Operator::Modulo), + (BinaryOperator::And, Operator::And), + (BinaryOperator::Or, Operator::Or), + ]; + + for (binary_op, expected_df_op) in test_cases { + let binary_expr = LogExpr::BinaryOp { + left: Box::new(LogExpr::NamedIdent("age".to_string())), + op: binary_op, + right: Box::new(LogExpr::Literal("25".to_string())), + }; + + let expr = planner + .log_expr_to_df_expr(&binary_expr, &df_schema) + .unwrap(); + + let expected_expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(col("age")), + op: expected_df_op, + right: Box::new(lit(ScalarValue::Int32(Some(25)))), + }); + + assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); + } + } + + #[tokio::test] + async fn test_nested_binary_operations() { + let table_provider = build_test_table_provider_with_typed_columns(&[( + "public".to_string(), + "test_table".to_string(), + )]) + .await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); + let schema = mock_schema_with_typed_columns(); + + let df_schema = DFSchema::try_from(schema.arrow_schema().clone()).unwrap(); + + // Test nested binary operations: (age + 5) > 30 + let nested_binary_expr = LogExpr::BinaryOp { + left: Box::new(LogExpr::BinaryOp { + left: Box::new(LogExpr::NamedIdent("age".to_string())), + op: BinaryOperator::Plus, + right: Box::new(LogExpr::Literal("5".to_string())), + }), + op: BinaryOperator::Gt, + right: Box::new(LogExpr::Literal("30".to_string())), + }; + + let expr = planner + .log_expr_to_df_expr(&nested_binary_expr, &df_schema) + .unwrap(); + + // Verify the nested structure is properly created + let expected_expr_debug = "BinaryExpr(BinaryExpr { left: BinaryExpr(BinaryExpr { left: Column(Column { relation: None, name: \"age\" }), op: Plus, right: Literal(Int32(5)) }), op: Gt, right: Literal(Int32(30)) })"; + assert_eq!(format!("{:?}", expr), expected_expr_debug); + } }