diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 4a8e184055..40e4ec1b56 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -252,6 +252,21 @@ fn is_time_index_column( return is_time_index_expr(projection.input().clone(), projection_expr.expr.clone()); } + if let Some(filter) = plan.as_any().downcast_ref::() { + let child_column_expr = filter + .projection() + .as_ref() + .and_then(|projection| projection.get(column_expr.index()).copied()) + .map(|input_index| { + PhysicalColumn::new( + filter.input().schema().field(input_index).name(), + input_index, + ) + }) + .unwrap_or_else(|| column_expr.clone()); + return is_time_index_expr(filter.input().clone(), Arc::new(child_column_expr)); + } + if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { let schema = plan.schema(); let column_field = schema.field(column_expr.index()); @@ -268,17 +283,21 @@ fn is_time_index_column( } fn passthrough_child(plan: &dyn ExecutionPlan) -> Option> { - if plan.as_any().is::() - || plan.as_any().is::() + if plan.as_any().is::() || plan.as_any().is::() || plan.as_any().is::() { - return plan.children().first().cloned().cloned(); + return schema_preserving_child(plan); } None } +fn schema_preserving_child(plan: &dyn ExecutionPlan) -> Option> { + let child = plan.children().first().cloned().cloned()?; + (plan.schema().as_ref() == child.schema().as_ref()).then_some(child) +} + fn is_supported_time_index_wrapper(expr: &ScalarFunctionExpr) -> bool { matches!( expr.name(), @@ -319,8 +338,10 @@ mod test { use arrow_schema::{Field, TimeUnit}; use common_recordbatch::RecordBatches; use datafusion::config::ConfigOptions; + use datafusion::physical_plan::filter::FilterExecBuilder; + use datafusion_common::ScalarValue; use datafusion_functions::datetime::to_timestamp_millis; - use datafusion_physical_expr::expressions::CastExpr; + use datafusion_physical_expr::expressions::{CastExpr, Literal}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; @@ -478,6 +499,95 @@ mod test { assert!(!is_time_index_expr(scan, Arc::new(non_timestamp_cast)).unwrap()); } + #[test] + fn test_is_time_index_expr_tracks_time_index_through_filter() { + let scan = new_region_scan(); + let filter = Arc::new( + FilterExec::try_new( + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + scan, + ) + .unwrap(), + ) as Arc; + + assert!(is_time_index_expr(filter, Arc::new(PhysicalColumn::new("ts", 1))).unwrap()); + } + + #[test] + fn test_is_time_index_expr_tracks_time_index_through_passthrough_wrapper_and_filter_projection() + { + let scan = new_region_scan(); + let projected_filter = Arc::new( + FilterExecBuilder::new( + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + scan, + ) + .apply_projection(Some(vec![1])) + .unwrap() + .build() + .unwrap(), + ) as Arc; + let cooperative = + Arc::new(CooperativeExec::new(projected_filter)) as Arc; + + assert!(is_time_index_expr(cooperative, Arc::new(PhysicalColumn::new("ts", 0))).unwrap()); + } + + #[test] + fn test_schema_preserving_child_rejects_schema_changing_projection() { + let scan = new_region_scan(); + let projection = ProjectionExec::try_new( + vec![( + Arc::new(PhysicalColumn::new("ts", 1)) as Arc, + "ts".to_string(), + )], + scan, + ) + .unwrap(); + + assert!(schema_preserving_child(&projection).is_none()); + } + + #[test] + fn test_cooperative_exec_satisfies_passthrough_schema_contract() { + let child = new_region_scan(); + let plan = Arc::new(CooperativeExec::new(child.clone())) as Arc; + + assert_passthrough_schema_contract(plan, child); + } + + #[test] + fn test_repartition_exec_satisfies_passthrough_schema_contract() { + let child = new_region_scan(); + let plan = Arc::new( + RepartitionExec::try_new( + child.clone(), + datafusion_physical_expr::Partitioning::RoundRobinBatch(2), + ) + .unwrap(), + ) as Arc; + + assert_passthrough_schema_contract(plan, child); + } + + #[test] + fn test_coalesce_partitions_exec_satisfies_passthrough_schema_contract() { + let child = new_region_scan(); + let plan = Arc::new(CoalescePartitionsExec::new(child.clone())) as Arc; + + assert_passthrough_schema_contract(plan, child); + } + + fn assert_passthrough_schema_contract( + plan: Arc, + child: Arc, + ) { + assert_eq!(plan.schema().as_ref(), child.schema().as_ref()); + + let passthrough = passthrough_child(plan.as_ref()).expect("wrapper should preserve schema"); + assert_eq!(passthrough.schema().as_ref(), child.schema().as_ref()); + } + fn new_region_scan() -> Arc { let schema = Arc::new(Schema::new(vec![ ColumnSchema::new("value", ConcreteDataType::int32_datatype(), false),