fix: case insensitive

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-24 17:14:02 +08:00
parent cc7fa65793
commit 1d30b95573

View File

@@ -96,7 +96,7 @@ impl WindowedSortPhysicalRule {
input_schema.field(column_expr.index()).data_type(),
DataType::Timestamp(_, _)
)
&& is_time_index_expr(sort_input.clone(), first_sort_expr.expr.clone())?
&& is_time_index_expr(&sort_input, &first_sort_expr.expr)?
&& sort_exec.fetch().is_none()
// skip if there is a limit, as dyn filter along is good enough in this case
{
@@ -213,8 +213,8 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
}
fn is_time_index_expr(
plan: Arc<dyn ExecutionPlan>,
expr: Arc<dyn PhysicalExpr>,
plan: &Arc<dyn ExecutionPlan>,
expr: &Arc<dyn PhysicalExpr>,
) -> DataFusionResult<bool> {
if let Some(column_expr) = expr.as_any().downcast_ref::<PhysicalColumn>() {
return is_time_index_column(plan, column_expr);
@@ -222,7 +222,7 @@ fn is_time_index_expr(
if let Some(cast_expr) = expr.as_any().downcast_ref::<CastExpr>() {
return if matches!(cast_expr.cast_type(), DataType::Timestamp(_, _)) {
is_time_index_expr(plan, cast_expr.expr().clone())
is_time_index_expr(plan, cast_expr.expr())
} else {
Ok(false)
};
@@ -232,7 +232,7 @@ fn is_time_index_expr(
return if is_supported_time_index_wrapper(scalar_function_expr)
&& scalar_function_expr.args().len() == 1
{
is_time_index_expr(plan, scalar_function_expr.args()[0].clone())
is_time_index_expr(plan, &scalar_function_expr.args()[0])
} else {
Ok(false)
};
@@ -242,14 +242,14 @@ fn is_time_index_expr(
}
fn is_time_index_column(
plan: Arc<dyn ExecutionPlan>,
plan: &Arc<dyn ExecutionPlan>,
column_expr: &PhysicalColumn,
) -> DataFusionResult<bool> {
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
let Some(projection_expr) = projection.expr().get(column_expr.index()) else {
return Ok(false);
};
return is_time_index_expr(projection.input().clone(), projection_expr.expr.clone());
return is_time_index_expr(projection.input(), &projection_expr.expr);
}
if let Some(filter) = plan.as_any().downcast_ref::<FilterExec>() {
@@ -264,7 +264,8 @@ fn is_time_index_column(
)
})
.unwrap_or_else(|| column_expr.clone());
return is_time_index_expr(filter.input().clone(), Arc::new(child_column_expr));
let child_expr = Arc::new(child_column_expr) as Arc<dyn PhysicalExpr>;
return is_time_index_expr(filter.input(), &child_expr);
}
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
@@ -279,7 +280,8 @@ fn is_time_index_column(
let Some(child) = passthrough_child(plan.as_ref()) else {
return Ok(false);
};
is_time_index_expr(child, Arc::new(column_expr.clone()))
let child_expr = Arc::new(column_expr.clone()) as Arc<dyn PhysicalExpr>;
is_time_index_expr(&child, &child_expr)
}
fn passthrough_child(plan: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
@@ -299,14 +301,12 @@ fn schema_preserving_child(plan: &dyn ExecutionPlan) -> Option<Arc<dyn Execution
}
fn is_supported_time_index_wrapper(expr: &ScalarFunctionExpr) -> bool {
matches!(
expr.name(),
"to_timestamp"
| "to_timestamp_seconds"
| "to_timestamp_millis"
| "to_timestamp_micros"
| "to_timestamp_nanos"
) && matches!(expr.return_type(), DataType::Timestamp(_, _))
(expr.name().eq_ignore_ascii_case("to_timestamp")
|| expr.name().eq_ignore_ascii_case("to_timestamp_seconds")
|| expr.name().eq_ignore_ascii_case("to_timestamp_millis")
|| expr.name().eq_ignore_ascii_case("to_timestamp_micros")
|| expr.name().eq_ignore_ascii_case("to_timestamp_nanos"))
&& matches!(expr.return_type(), DataType::Timestamp(_, _))
}
/// Removes the repartition plan between the filter and region scan.
@@ -363,10 +363,9 @@ mod test {
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let expr = Arc::new(PhysicalColumn::new("alias_ts", 0)) as Arc<dyn PhysicalExpr>;
assert!(
is_time_index_expr(projection, Arc::new(PhysicalColumn::new("alias_ts", 0))).unwrap()
);
assert!(is_time_index_expr(&projection, &expr).unwrap());
}
#[test]
@@ -392,14 +391,9 @@ mod test {
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let expr = Arc::new(PhysicalColumn::new("alias_2", 0)) as Arc<dyn PhysicalExpr>;
assert!(
is_time_index_expr(
second_projection,
Arc::new(PhysicalColumn::new("alias_2", 0))
)
.unwrap()
);
assert!(is_time_index_expr(&second_projection, &expr).unwrap());
}
#[test]
@@ -427,8 +421,9 @@ mod test {
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc<dyn PhysicalExpr>;
assert!(is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts", 0))).unwrap());
assert!(is_time_index_expr(&projection, &expr).unwrap());
}
#[test]
@@ -448,8 +443,9 @@ mod test {
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let expr = Arc::new(PhysicalColumn::new("ts_ms", 0)) as Arc<dyn PhysicalExpr>;
assert!(is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts_ms", 0))).unwrap());
assert!(is_time_index_expr(&projection, &expr).unwrap());
}
#[test]
@@ -477,26 +473,46 @@ mod test {
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc<dyn PhysicalExpr>;
assert!(!is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts", 0))).unwrap());
assert!(!is_time_index_expr(&projection, &expr).unwrap());
}
#[test]
fn test_is_supported_time_index_wrapper_ignores_function_name_case() {
let config = Arc::new(ConfigOptions::default());
let return_field = Arc::new(Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
));
let expr = ScalarFunctionExpr::new(
"To_Timestamp_Millis",
to_timestamp_millis(config.as_ref()),
vec![Arc::new(PhysicalColumn::new("ts", 1))],
return_field,
config,
);
assert!(is_supported_time_index_wrapper(&expr));
}
#[test]
fn test_is_time_index_expr_rejects_non_timestamp_casts() {
let scan = new_region_scan();
let cast_expr = CastExpr::new(
let cast_expr = Arc::new(CastExpr::new(
Arc::new(PhysicalColumn::new("ts", 1)),
DataType::Timestamp(TimeUnit::Millisecond, None),
None,
);
assert!(is_time_index_expr(scan.clone(), Arc::new(cast_expr)).unwrap());
)) as Arc<dyn PhysicalExpr>;
assert!(is_time_index_expr(&scan, &cast_expr).unwrap());
let non_timestamp_cast = CastExpr::new(
let non_timestamp_cast = Arc::new(CastExpr::new(
Arc::new(PhysicalColumn::new("ts", 1)),
DataType::Int64,
None,
);
assert!(!is_time_index_expr(scan, Arc::new(non_timestamp_cast)).unwrap());
)) as Arc<dyn PhysicalExpr>;
assert!(!is_time_index_expr(&scan, &non_timestamp_cast).unwrap());
}
#[test]
@@ -509,8 +525,9 @@ mod test {
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;
let expr = Arc::new(PhysicalColumn::new("ts", 1)) as Arc<dyn PhysicalExpr>;
assert!(is_time_index_expr(filter, Arc::new(PhysicalColumn::new("ts", 1))).unwrap());
assert!(is_time_index_expr(&filter, &expr).unwrap());
}
#[test]
@@ -529,8 +546,9 @@ mod test {
) as Arc<dyn ExecutionPlan>;
let cooperative =
Arc::new(CooperativeExec::new(projected_filter)) as Arc<dyn ExecutionPlan>;
let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc<dyn PhysicalExpr>;
assert!(is_time_index_expr(cooperative, Arc::new(PhysicalColumn::new("ts", 0))).unwrap());
assert!(is_time_index_expr(&cooperative, &expr).unwrap());
}
#[test]