fix: honor tighter filter fetch under global limit

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-07-03 20:41:24 +08:00
parent c7273efb36
commit d6be665b96

View File

@@ -57,7 +57,7 @@ impl EnsureGlobalLimitForFetch {
let required_input_ordering = plan.required_input_ordering();
let maintains_input_order = plan.maintains_input_order();
let child_parent = ParentContext {
has_global_fetch: provides_global_fetch(&plan),
global_fetch: provided_global_fetch(&plan),
required_ordering: None,
};
let children = children
@@ -90,7 +90,9 @@ impl EnsureGlobalLimitForFetch {
return Ok(plan);
};
if parent.has_global_fetch
if parent
.global_fetch
.is_some_and(|parent_fetch| parent_fetch <= fetch)
|| !plan.as_any().is::<FilterExec>()
|| plan.output_partitioning().partition_count() <= 1
{
@@ -103,18 +105,16 @@ impl EnsureGlobalLimitForFetch {
#[derive(Clone, Default)]
struct ParentContext {
has_global_fetch: bool,
global_fetch: Option<usize>,
required_ordering: Option<OrderingRequirements>,
}
fn provides_global_fetch(plan: &Arc<dyn ExecutionPlan>) -> bool {
if plan.fetch().is_none() {
return false;
}
plan.as_any().is::<GlobalLimitExec>()
fn provided_global_fetch(plan: &Arc<dyn ExecutionPlan>) -> Option<usize> {
let fetch = plan.fetch()?;
(plan.as_any().is::<GlobalLimitExec>()
|| plan.as_any().is::<CoalescePartitionsExec>()
|| plan.as_any().is::<SortPreservingMergeExec>()
|| plan.as_any().is::<SortPreservingMergeExec>())
.then_some(fetch)
}
fn add_global_fetch(
@@ -195,6 +195,22 @@ mod tests {
assert!(child.as_any().is::<FilterExec>());
}
#[test]
fn adds_tighter_global_fetch_under_looser_parent_fetch() {
let (input, ordering) = ordered_input();
let filter = filter_fetch(input, 5);
let merge = Arc::new(SortPreservingMergeExec::new(ordering, filter).with_fetch(Some(10)))
as Arc<dyn ExecutionPlan>;
let optimized =
EnsureGlobalLimitForFetch::optimize_plan(merge, ParentContext::default()).unwrap();
let child = optimized.children()[0];
assert!(optimized.as_any().is::<SortPreservingMergeExec>());
assert!(child.as_any().is::<SortPreservingMergeExec>());
assert_eq!(child.fetch(), Some(5));
}
#[test]
fn preserves_parent_ordering_requirement() {
let (input, ordering) = ordered_input();