From 1d30b95573c828e7ebcb57cac4e307f23a2302c6 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 24 Apr 2026 17:14:02 +0800 Subject: [PATCH] fix: case insensitive Signed-off-by: discord9 --- src/query/src/optimizer/windowed_sort.rs | 94 ++++++++++++++---------- 1 file changed, 56 insertions(+), 38 deletions(-) diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 40e4ec1b56..2480613ec6 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -96,7 +96,7 @@ impl WindowedSortPhysicalRule { input_schema.field(column_expr.index()).data_type(), DataType::Timestamp(_, _) ) - && is_time_index_expr(sort_input.clone(), first_sort_expr.expr.clone())? + && is_time_index_expr(&sort_input, &first_sort_expr.expr)? && sort_exec.fetch().is_none() // skip if there is a limit, as dyn filter along is good enough in this case { @@ -213,8 +213,8 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult, - expr: Arc, + plan: &Arc, + expr: &Arc, ) -> DataFusionResult { if let Some(column_expr) = expr.as_any().downcast_ref::() { return is_time_index_column(plan, column_expr); @@ -222,7 +222,7 @@ fn is_time_index_expr( if let Some(cast_expr) = expr.as_any().downcast_ref::() { return if matches!(cast_expr.cast_type(), DataType::Timestamp(_, _)) { - is_time_index_expr(plan, cast_expr.expr().clone()) + is_time_index_expr(plan, cast_expr.expr()) } else { Ok(false) }; @@ -232,7 +232,7 @@ fn is_time_index_expr( return if is_supported_time_index_wrapper(scalar_function_expr) && scalar_function_expr.args().len() == 1 { - is_time_index_expr(plan, scalar_function_expr.args()[0].clone()) + is_time_index_expr(plan, &scalar_function_expr.args()[0]) } else { Ok(false) }; @@ -242,14 +242,14 @@ fn is_time_index_expr( } fn is_time_index_column( - plan: Arc, + plan: &Arc, column_expr: &PhysicalColumn, ) -> DataFusionResult { if let Some(projection) = plan.as_any().downcast_ref::() { let Some(projection_expr) = projection.expr().get(column_expr.index()) else { return Ok(false); }; - return is_time_index_expr(projection.input().clone(), projection_expr.expr.clone()); + return is_time_index_expr(projection.input(), &projection_expr.expr); } if let Some(filter) = plan.as_any().downcast_ref::() { @@ -264,7 +264,8 @@ fn is_time_index_column( ) }) .unwrap_or_else(|| column_expr.clone()); - return is_time_index_expr(filter.input().clone(), Arc::new(child_column_expr)); + let child_expr = Arc::new(child_column_expr) as Arc; + return is_time_index_expr(filter.input(), &child_expr); } if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { @@ -279,7 +280,8 @@ fn is_time_index_column( let Some(child) = passthrough_child(plan.as_ref()) else { return Ok(false); }; - is_time_index_expr(child, Arc::new(column_expr.clone())) + let child_expr = Arc::new(column_expr.clone()) as Arc; + is_time_index_expr(&child, &child_expr) } fn passthrough_child(plan: &dyn ExecutionPlan) -> Option> { @@ -299,14 +301,12 @@ fn schema_preserving_child(plan: &dyn ExecutionPlan) -> Option bool { - matches!( - expr.name(), - "to_timestamp" - | "to_timestamp_seconds" - | "to_timestamp_millis" - | "to_timestamp_micros" - | "to_timestamp_nanos" - ) && matches!(expr.return_type(), DataType::Timestamp(_, _)) + (expr.name().eq_ignore_ascii_case("to_timestamp") + || expr.name().eq_ignore_ascii_case("to_timestamp_seconds") + || expr.name().eq_ignore_ascii_case("to_timestamp_millis") + || expr.name().eq_ignore_ascii_case("to_timestamp_micros") + || expr.name().eq_ignore_ascii_case("to_timestamp_nanos")) + && matches!(expr.return_type(), DataType::Timestamp(_, _)) } /// Removes the repartition plan between the filter and region scan. @@ -363,10 +363,9 @@ mod test { ) .unwrap(), ) as Arc; + let expr = Arc::new(PhysicalColumn::new("alias_ts", 0)) as Arc; - assert!( - is_time_index_expr(projection, Arc::new(PhysicalColumn::new("alias_ts", 0))).unwrap() - ); + assert!(is_time_index_expr(&projection, &expr).unwrap()); } #[test] @@ -392,14 +391,9 @@ mod test { ) .unwrap(), ) as Arc; + let expr = Arc::new(PhysicalColumn::new("alias_2", 0)) as Arc; - assert!( - is_time_index_expr( - second_projection, - Arc::new(PhysicalColumn::new("alias_2", 0)) - ) - .unwrap() - ); + assert!(is_time_index_expr(&second_projection, &expr).unwrap()); } #[test] @@ -427,8 +421,9 @@ mod test { ) .unwrap(), ) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc; - assert!(is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts", 0))).unwrap()); + assert!(is_time_index_expr(&projection, &expr).unwrap()); } #[test] @@ -448,8 +443,9 @@ mod test { ) .unwrap(), ) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts_ms", 0)) as Arc; - assert!(is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts_ms", 0))).unwrap()); + assert!(is_time_index_expr(&projection, &expr).unwrap()); } #[test] @@ -477,26 +473,46 @@ mod test { ) .unwrap(), ) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc; - assert!(!is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts", 0))).unwrap()); + assert!(!is_time_index_expr(&projection, &expr).unwrap()); + } + + #[test] + fn test_is_supported_time_index_wrapper_ignores_function_name_case() { + let config = Arc::new(ConfigOptions::default()); + let return_field = Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + let expr = ScalarFunctionExpr::new( + "To_Timestamp_Millis", + to_timestamp_millis(config.as_ref()), + vec![Arc::new(PhysicalColumn::new("ts", 1))], + return_field, + config, + ); + + assert!(is_supported_time_index_wrapper(&expr)); } #[test] fn test_is_time_index_expr_rejects_non_timestamp_casts() { let scan = new_region_scan(); - let cast_expr = CastExpr::new( + let cast_expr = Arc::new(CastExpr::new( Arc::new(PhysicalColumn::new("ts", 1)), DataType::Timestamp(TimeUnit::Millisecond, None), None, - ); - assert!(is_time_index_expr(scan.clone(), Arc::new(cast_expr)).unwrap()); + )) as Arc; + assert!(is_time_index_expr(&scan, &cast_expr).unwrap()); - let non_timestamp_cast = CastExpr::new( + let non_timestamp_cast = Arc::new(CastExpr::new( Arc::new(PhysicalColumn::new("ts", 1)), DataType::Int64, None, - ); - assert!(!is_time_index_expr(scan, Arc::new(non_timestamp_cast)).unwrap()); + )) as Arc; + assert!(!is_time_index_expr(&scan, &non_timestamp_cast).unwrap()); } #[test] @@ -509,8 +525,9 @@ mod test { ) .unwrap(), ) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts", 1)) as Arc; - assert!(is_time_index_expr(filter, Arc::new(PhysicalColumn::new("ts", 1))).unwrap()); + assert!(is_time_index_expr(&filter, &expr).unwrap()); } #[test] @@ -529,8 +546,9 @@ mod test { ) as Arc; let cooperative = Arc::new(CooperativeExec::new(projected_filter)) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc; - assert!(is_time_index_expr(cooperative, Arc::new(PhysicalColumn::new("ts", 0))).unwrap()); + assert!(is_time_index_expr(&cooperative, &expr).unwrap()); } #[test]