feat: even more guard

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-23 18:30:50 +08:00
parent e116c2870d
commit cc7fa65793

View File

@@ -252,6 +252,21 @@ fn is_time_index_column(
return is_time_index_expr(projection.input().clone(), projection_expr.expr.clone());
}
if let Some(filter) = plan.as_any().downcast_ref::<FilterExec>() {
let child_column_expr = filter
.projection()
.as_ref()
.and_then(|projection| projection.get(column_expr.index()).copied())
.map(|input_index| {
PhysicalColumn::new(
filter.input().schema().field(input_index).name(),
input_index,
)
})
.unwrap_or_else(|| column_expr.clone());
return is_time_index_expr(filter.input().clone(), Arc::new(child_column_expr));
}
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
let schema = plan.schema();
let column_field = schema.field(column_expr.index());
@@ -268,17 +283,21 @@ fn is_time_index_column(
}
fn passthrough_child(plan: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
if plan.as_any().is::<FilterExec>()
|| plan.as_any().is::<CoalescePartitionsExec>()
if plan.as_any().is::<CoalescePartitionsExec>()
|| plan.as_any().is::<RepartitionExec>()
|| plan.as_any().is::<CooperativeExec>()
{
return plan.children().first().cloned().cloned();
return schema_preserving_child(plan);
}
None
}
fn schema_preserving_child(plan: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
let child = plan.children().first().cloned().cloned()?;
(plan.schema().as_ref() == child.schema().as_ref()).then_some(child)
}
fn is_supported_time_index_wrapper(expr: &ScalarFunctionExpr) -> bool {
matches!(
expr.name(),
@@ -319,8 +338,10 @@ mod test {
use arrow_schema::{Field, TimeUnit};
use common_recordbatch::RecordBatches;
use datafusion::config::ConfigOptions;
use datafusion::physical_plan::filter::FilterExecBuilder;
use datafusion_common::ScalarValue;
use datafusion_functions::datetime::to_timestamp_millis;
use datafusion_physical_expr::expressions::CastExpr;
use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
@@ -478,6 +499,95 @@ mod test {
assert!(!is_time_index_expr(scan, Arc::new(non_timestamp_cast)).unwrap());
}
#[test]
fn test_is_time_index_expr_tracks_time_index_through_filter() {
let scan = new_region_scan();
let filter = Arc::new(
FilterExec::try_new(
Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))),
scan,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
assert!(is_time_index_expr(filter, Arc::new(PhysicalColumn::new("ts", 1))).unwrap());
}
#[test]
fn test_is_time_index_expr_tracks_time_index_through_passthrough_wrapper_and_filter_projection()
{
let scan = new_region_scan();
let projected_filter = Arc::new(
FilterExecBuilder::new(
Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))),
scan,
)
.apply_projection(Some(vec![1]))
.unwrap()
.build()
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let cooperative =
Arc::new(CooperativeExec::new(projected_filter)) as Arc<dyn ExecutionPlan>;
assert!(is_time_index_expr(cooperative, Arc::new(PhysicalColumn::new("ts", 0))).unwrap());
}
#[test]
fn test_schema_preserving_child_rejects_schema_changing_projection() {
let scan = new_region_scan();
let projection = ProjectionExec::try_new(
vec![(
Arc::new(PhysicalColumn::new("ts", 1)) as Arc<dyn PhysicalExpr>,
"ts".to_string(),
)],
scan,
)
.unwrap();
assert!(schema_preserving_child(&projection).is_none());
}
#[test]
fn test_cooperative_exec_satisfies_passthrough_schema_contract() {
let child = new_region_scan();
let plan = Arc::new(CooperativeExec::new(child.clone())) as Arc<dyn ExecutionPlan>;
assert_passthrough_schema_contract(plan, child);
}
#[test]
fn test_repartition_exec_satisfies_passthrough_schema_contract() {
let child = new_region_scan();
let plan = Arc::new(
RepartitionExec::try_new(
child.clone(),
datafusion_physical_expr::Partitioning::RoundRobinBatch(2),
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
assert_passthrough_schema_contract(plan, child);
}
#[test]
fn test_coalesce_partitions_exec_satisfies_passthrough_schema_contract() {
let child = new_region_scan();
let plan = Arc::new(CoalescePartitionsExec::new(child.clone())) as Arc<dyn ExecutionPlan>;
assert_passthrough_schema_contract(plan, child);
}
fn assert_passthrough_schema_contract(
plan: Arc<dyn ExecutionPlan>,
child: Arc<dyn ExecutionPlan>,
) {
assert_eq!(plan.schema().as_ref(), child.schema().as_ref());
let passthrough = passthrough_child(plan.as_ref()).expect("wrapper should preserve schema");
assert_eq!(passthrough.schema().as_ref(), child.schema().as_ref());
}
fn new_region_scan() -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("value", ConcreteDataType::int32_datatype(), false),