perf(mito): skip manifest-pruned file ranges (#8366)

* perf(mito): skip manifest-pruned file ranges

Signed-off-by: discord9 <discord9@163.com>

* test(mito): allow empty prune benchmark output

Signed-off-by: discord9 <discord9@163.com>

* fix(mito): avoid caching stale pruned builders

Signed-off-by: discord9 <discord9@163.com>

* chore(mito): address pruner clippy

Signed-off-by: discord9 <discord9@163.com>

* fix(mito): account worker pruner builder metrics

Signed-off-by: discord9 <discord9@163.com>

* test(mito): keep empty prune benchmark local

Signed-off-by: discord9 <discord9@163.com>

* refactor(mito): share manifest-pruned range skip

Signed-off-by: discord9 <discord9@163.com>

* chore(mito): shorten prune cache comment

Signed-off-by: discord9 <discord9@163.com>

* fix(mito): keep manifest prune state in pruner

Signed-off-by: discord9 <discord9@163.com>

* test(mito): cover manifest prune fast skip edge cases

Signed-off-by: discord9 <discord9@163.com>

* chore: fix typo in logical table alter

Signed-off-by: discord9 <discord9@163.com>

* chore(mito): address pruner review comments

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-07-02 14:25:16 +08:00
committed by GitHub
parent 55852a05b8
commit ba073045e2
4 changed files with 703 additions and 37 deletions

View File

@@ -15,7 +15,7 @@
//! Pruner for parallel file pruning across scanner partitions.
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
@@ -117,6 +117,35 @@ impl PartitionPruner {
Ok(ranges)
}
/// Checks whether the file range at `index` can be skipped because the
/// current predicate definitively prunes it at manifest level (no I/O).
///
/// Returns `true` if the range was skipped. When skipped, this method
/// balances the pruner's per-file reference count and merges the resulting
/// reader metrics into `part_metrics`.
///
/// Uses shared per-file state so repeated row groups can skip cheaply after
/// the first manifest-prune decision.
pub fn try_skip_manifest_pruned_file_range(
&self,
index: RowGroupIndex,
part_metrics: &PartitionMetrics,
) -> bool {
let Some(file_index) = self.file_index(index) else {
return false;
};
let mut reader_metrics = ReaderMetrics::default();
let pruned = self
.pruner
.inner
.try_mark_manifest_pruned(file_index, &mut reader_metrics);
if pruned {
self.pruner.skip_file_range(index, &mut reader_metrics);
part_metrics.merge_reader_metrics(&reader_metrics, None);
}
pruned
}
/// Pre-fetches upcoming files starting from the given position.
fn prefetch_upcoming_files(&self, current_pos: usize, partition_metrics: &PartitionMetrics) {
let start = current_pos + 1;
@@ -139,6 +168,14 @@ impl PartitionPruner {
.copied()
.unwrap_or(PreFilterMode::SkipFields)
}
fn file_index(&self, index: RowGroupIndex) -> Option<usize> {
self.pruner
.inner
.stream_ctx
.is_file_range_index(index)
.then(|| index.index - self.pruner.inner.stream_ctx.input.num_memtables())
}
}
/// A pruner that prunes files for all partitions of a scanner.
@@ -155,6 +192,40 @@ struct PrunerInner {
file_entries: Vec<Mutex<FileBuilderEntry>>,
/// StreamContext containing all context needed for pruning.
stream_ctx: Arc<StreamContext>,
/// Positive manifest-prune cache shared across all scan partitions.
///
/// SAFETY: cached positives are valid because dynamic filters only tighten;
/// negative decisions are not cached. Reset by `add_partition_ranges()` for
/// each fresh batch of partition ranges.
manifest_pruned_files: Vec<AtomicBool>,
}
impl PrunerInner {
/// Checks whether manifest-level pruning proves this file is empty given the
/// current predicate. If true, CAS the shared cache from false→true and
/// record `files_time_range_pruned` in `reader_metrics`.
///
/// Returns `true` if already cached or newly proven pruned.
fn try_mark_manifest_pruned(
&self,
file_index: usize,
reader_metrics: &mut ReaderMetrics,
) -> bool {
if self.manifest_pruned_files[file_index].load(Ordering::Relaxed) {
return true;
}
let file = &self.stream_ctx.input.files[file_index];
if !self.stream_ctx.input.can_manifest_prune_file(file) {
return false;
}
if self.manifest_pruned_files[file_index]
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
reader_metrics.filter_metrics.files_time_range_pruned += 1;
}
true
}
}
/// Per-file state tracking.
@@ -196,6 +267,8 @@ impl Pruner {
})
})
.collect();
let manifest_pruned_files: Vec<AtomicBool> =
(0..num_files).map(|_| AtomicBool::new(false)).collect();
// Create channels and collect senders
let mut worker_senders = Vec::with_capacity(num_workers);
let mut receivers = Vec::with_capacity(num_workers);
@@ -209,6 +282,7 @@ impl Pruner {
num_workers,
file_entries,
stream_ctx,
manifest_pruned_files,
});
// Spawn worker tasks with their receivers
@@ -225,8 +299,14 @@ impl Pruner {
}
}
/// Adds reference counts for all partitions' ranges.
/// Adds reference counts for all partitions' ranges and resets the full
/// manifest-prune cache so that dynamic-filter updates are visible to the
/// fresh scan.
pub fn add_partition_ranges(&self, partition_ranges: &[PartitionRange]) {
for pruned in &self.inner.manifest_pruned_files {
pruned.store(false, Ordering::Relaxed);
}
// Add reference counts for each partition range
let num_memtables = self.inner.stream_ctx.input.num_memtables();
for part_range in partition_ranges {
@@ -277,6 +357,19 @@ impl Pruner {
Ok(ranges)
}
/// Skips a file range that has been pruned before entering the file pruner.
///
/// This keeps the pruner's per-file reference counts balanced with
/// `add_partition_ranges()`. It may also clear a cached builder when this was the
/// last remaining range for the file.
pub fn skip_file_range(&self, index: RowGroupIndex, reader_metrics: &mut ReaderMetrics) {
if !self.inner.stream_ctx.is_file_range_index(index) {
return;
}
let file_index = index.index - self.inner.stream_ctx.input.num_memtables();
self.decrement_and_maybe_clear(file_index, reader_metrics);
}
/// Gets or creates the FileRangeBuilder for a file.
async fn get_file_builder(
&self,
@@ -374,25 +467,33 @@ impl Pruner {
pre_filter_mode: PreFilterMode,
reader_metrics: &mut ReaderMetrics,
) -> Result<Arc<FileRangeBuilder>> {
// Check manifest-level prune first (shared cache, no I/O).
if self
.inner
.try_mark_manifest_pruned(file_index, reader_metrics)
{
let arc_builder = Arc::new(FileRangeBuilder::default());
// Do NOT cache an empty manifest-pruned builder; the cache flag
// already records the decision.
return Ok(arc_builder);
}
let file = &self.inner.stream_ctx.input.files[file_index];
let predicate = self.inner.stream_ctx.input.predicate_for_file(file);
let builder = self
.inner
.stream_ctx
.input
.prune_file(file, pre_filter_mode, reader_metrics)
.prune_file_after_manifest_check(file, pre_filter_mode, predicate, reader_metrics)
.await?;
let arc_builder = Arc::new(builder);
// Caches the builder
// Caches the builder only if the file still has remaining ranges.
// `skip_file_range` may have already consumed all ranges for this file.
{
let mut entry = self.inner.file_entries[file_index].lock().unwrap();
if entry.builder.is_none() {
reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize;
reader_metrics.num_range_builders += 1;
entry.builder = Some(arc_builder.clone());
PRUNER_ACTIVE_BUILDERS.inc();
}
cache_builder_if_needed(&mut entry, &arc_builder, reader_metrics);
}
Ok(arc_builder)
@@ -444,7 +545,6 @@ impl Pruner {
}
worker_cache_miss += 1;
// Do the actual pruning (outside lock)
let file = &inner.stream_ctx.input.files[file_index];
pruned_files.push(file.file_id().file_id());
let explain_verbose = partition_metrics
@@ -455,24 +555,45 @@ impl Pruner {
filter_metrics: new_filter_metrics(explain_verbose),
..Default::default()
};
let result = inner
.stream_ctx
.input
.prune_file(file, pre_filter_mode, &mut metrics)
.await;
// Check manifest-level prune first (shared cache, no I/O).
let result = if inner.try_mark_manifest_pruned(file_index, &mut metrics) {
// Manifest-level pruning proved the file empty — produce a
// default builder without reading any parquet metadata.
Ok(FileRangeBuilder::default())
} else {
let predicate = inner.stream_ctx.input.predicate_for_file(file);
inner
.stream_ctx
.input
.prune_file_after_manifest_check(file, pre_filter_mode, predicate, &mut metrics)
.await
};
// Update state and notify waiters
let mut entry = inner.file_entries[file_index].lock().unwrap();
match result {
Ok(builder) => {
let arc_builder = Arc::new(builder);
entry.builder = Some(arc_builder.clone());
PRUNER_ACTIVE_BUILDERS.inc();
let is_background = response_tx.is_none();
// Only cache the builder if the file still has remaining ranges.
// If remaining_ranges == 0, a concurrent `skip_file_range` (e.g. from a
// dynamic filter tightening via manifest-prune fast-skip) already consumed
// all ranges and may have cleared a previously cached builder.
// Skip caching manifest-pruned empty builders; the cache flag is enough.
let did_cache =
if inner.manifest_pruned_files[file_index].load(Ordering::Relaxed) {
false
} else {
cache_builder_if_needed(&mut entry, &arc_builder, &mut metrics)
};
// Notify all waiters
for waiter in entry.waiters.drain(..) {
let _ = waiter.send(Ok(arc_builder.clone()));
}
// Always respond to foreground caller, even if we did not cache.
if let Some(response_tx) = response_tx {
let _ = response_tx.send(Ok(arc_builder));
}
@@ -485,8 +606,13 @@ impl Pruner {
metrics
);
// Merge metrics to partition if provided
if let Some(part_metrics) = &partition_metrics {
// Merge metrics if this is a foreground request, or if the builder
// was cached. Skip stale per-file metrics
// for background requests that completed after the file was already
// fully skipped.
if (!is_background || did_cache)
&& let Some(part_metrics) = &partition_metrics
{
let per_file_metrics = if part_metrics.explain_verbose() {
let file_id = file.file_id();
let mut map = HashMap::new();
@@ -525,3 +651,460 @@ impl Pruner {
);
}
}
#[cfg(test)]
impl Pruner {
/// Returns the remaining range count for a file (test-only).
fn test_remaining_ranges(&self, file_index: usize) -> usize {
self.inner.file_entries[file_index]
.lock()
.unwrap()
.remaining_ranges
}
/// Returns whether a cached builder exists for a file (test-only).
fn test_has_builder(&self, file_index: usize) -> bool {
self.inner.file_entries[file_index]
.lock()
.unwrap()
.builder
.is_some()
}
/// Returns the manifest-pruned flag for a file (test-only).
fn test_is_manifest_pruned(&self, file_index: usize) -> bool {
self.inner.manifest_pruned_files[file_index].load(Ordering::Relaxed)
}
/// Clears a cached builder for a file, simulating stale cleanup (test-only).
#[allow(dead_code)]
fn test_clear_builder(&self, file_index: usize) {
let mut entry = self.inner.file_entries[file_index].lock().unwrap();
if entry.builder.take().is_some() {
PRUNER_ACTIVE_BUILDERS.dec();
}
}
}
/// Returns true if a freshly pruned builder should be cached for this file.
fn should_cache_builder(entry: &FileBuilderEntry) -> bool {
entry.builder.is_none() && entry.remaining_ranges > 0
}
/// Caches a freshly pruned builder if the file still has remaining ranges, and
/// records the corresponding builder memory/count deltas for verbose metrics.
fn cache_builder_if_needed(
entry: &mut FileBuilderEntry,
builder: &Arc<FileRangeBuilder>,
reader_metrics: &mut ReaderMetrics,
) -> bool {
if should_cache_builder(entry) {
reader_metrics.metadata_mem_size += builder.memory_size() as isize;
reader_metrics.num_range_builders += 1;
entry.builder = Some(builder.clone());
PRUNER_ACTIVE_BUILDERS.inc();
true
} else {
false
}
}
#[cfg(test)]
mod tests {
use common_time::Timestamp;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_common::ScalarValue;
use datafusion_expr::{Expr, col, lit};
use store_api::region_engine::PartitionRange;
use store_api::storage::{FileId, RegionId};
use super::*;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::range::RowGroupIndex;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::scan_util::PartitionMetrics;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::parquet::reader::ReaderMetrics;
use crate::test_util::memtable_util::metadata_with_primary_key;
use crate::test_util::new_noop_file_purger;
use crate::test_util::scheduler_util::SchedulerEnv;
async fn make_test_pruner(num_files: usize) -> (SchedulerEnv, Arc<Pruner>) {
let env = SchedulerEnv::new().await;
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
let files: Vec<FileHandle> = (0..num_files)
.map(|_| {
let meta = FileMeta {
region_id: RegionId::new(123, 456),
file_id: FileId::random(),
time_range: (
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
),
num_row_groups: 1,
num_rows: 1024,
level: 0,
..Default::default()
};
FileHandle::new(meta, new_noop_file_purger())
})
.collect();
let input = ScanInput::new(env.access_layer.clone(), mapper)
.with_files(files)
.with_append_mode(true);
let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
let pruner = Arc::new(Pruner::new(stream_ctx, 1));
(env, pruner)
}
/// Builds a minimal `PartitionRange` that references `file_index`.
/// `add_partition_ranges` will look up `stream_ctx.ranges[identifier]`
/// and find `row_group_indices[0] == RowGroupIndex { index: file_index,
/// row_group_index: 0 }` because `unordered_scan_ranges` with
/// `num_row_groups=1` produces one range per file.
fn file_partition_range(file_index: usize) -> PartitionRange {
PartitionRange {
start: Timestamp::new_millisecond(0),
end: Timestamp::new_millisecond(1001),
num_rows: 1024,
identifier: file_index,
}
}
async fn make_test_pruner_with_predicate(
num_files: usize,
row_groups_per_file: u64,
predicate_exprs: &[Expr],
) -> (SchedulerEnv, Arc<Pruner>) {
let env = SchedulerEnv::new().await;
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
let predicate = PredicateGroup::new(&metadata, predicate_exprs).unwrap();
let files: Vec<FileHandle> = (0..num_files)
.map(|_| {
let meta = FileMeta {
region_id: RegionId::new(123, 456),
file_id: FileId::random(),
time_range: (
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
),
num_row_groups: row_groups_per_file,
num_rows: row_groups_per_file * 1024,
level: 0,
..Default::default()
};
FileHandle::new(meta, new_noop_file_purger())
})
.collect();
let input = ScanInput::new(env.access_layer.clone(), mapper)
.with_files(files)
.with_predicate(predicate)
.with_append_mode(true);
let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
let pruner = Arc::new(Pruner::new(stream_ctx, 1));
(env, pruner)
}
fn make_partition_metrics() -> PartitionMetrics {
let metrics_set = ExecutionPlanMetricsSet::new();
PartitionMetrics::new(
RegionId::new(123, 456),
0,
"test",
Instant::now(),
false,
&metrics_set,
)
}
#[test]
fn should_cache_builder_when_ranges_remain() {
let entry = FileBuilderEntry {
builder: None,
remaining_ranges: 3,
waiters: Vec::new(),
};
assert!(should_cache_builder(&entry));
}
#[test]
fn should_not_cache_builder_when_no_ranges_remain() {
let entry = FileBuilderEntry {
builder: None,
remaining_ranges: 0,
waiters: Vec::new(),
};
assert!(!should_cache_builder(&entry));
}
#[test]
fn should_not_cache_builder_when_already_cached() {
let entry = FileBuilderEntry {
builder: Some(Arc::new(FileRangeBuilder::default())),
remaining_ranges: 1,
waiters: Vec::new(),
};
assert!(!should_cache_builder(&entry));
}
#[test]
fn cache_builder_records_metrics() {
let mut entry = FileBuilderEntry {
builder: None,
remaining_ranges: 1,
waiters: Vec::new(),
};
let builder = Arc::new(FileRangeBuilder::default());
let mut reader_metrics = ReaderMetrics::default();
assert!(cache_builder_if_needed(
&mut entry,
&builder,
&mut reader_metrics
));
assert!(entry.builder.is_some());
assert_eq!(
reader_metrics.metadata_mem_size,
builder.memory_size() as isize
);
assert_eq!(reader_metrics.num_range_builders, 1);
if entry.builder.take().is_some() {
PRUNER_ACTIVE_BUILDERS.dec();
}
}
#[tokio::test]
async fn skip_file_range_decrements_and_clears_builder() {
let (_env, pruner) = make_test_pruner(1).await;
// Simulate 3 partition ranges for file 0.
let ranges: Vec<PartitionRange> = (0..3).map(|_| file_partition_range(0)).collect();
pruner.add_partition_ranges(&ranges);
assert_eq!(pruner.test_remaining_ranges(0), 3);
// Manually set a cached builder (simulating a previous cache hit).
{
let mut entry = pruner.inner.file_entries[0].lock().unwrap();
entry.builder = Some(Arc::new(FileRangeBuilder::default()));
PRUNER_ACTIVE_BUILDERS.inc();
}
assert!(pruner.test_has_builder(0));
// Skip all 3 ranges; the third should clear the builder.
let mut reader_metrics = ReaderMetrics::default();
for i in 0..3 {
let index = RowGroupIndex {
index: 0,
row_group_index: i as i64,
};
pruner.skip_file_range(index, &mut reader_metrics);
}
assert_eq!(pruner.test_remaining_ranges(0), 0);
assert!(!pruner.test_has_builder(0));
}
#[tokio::test]
async fn worker_does_not_cache_after_skip_file_range_consumed_all() {
let (_env, pruner) = make_test_pruner(1).await;
// Simulate one range for file 0.
let ranges = vec![file_partition_range(0)];
pruner.add_partition_ranges(&ranges);
assert_eq!(pruner.test_remaining_ranges(0), 1);
// Simulate skip_file_range consuming the last range BEFORE the
// background worker finishes. This mirrors the race: a dynamic filter
// tightens and manifest-prune fast-skip zeros out remaining_ranges.
let mut reader_metrics = ReaderMetrics::default();
let index = RowGroupIndex {
index: 0,
row_group_index: 0,
};
pruner.skip_file_range(index, &mut reader_metrics);
assert_eq!(pruner.test_remaining_ranges(0), 0);
assert!(!pruner.test_has_builder(0));
// Now simulate the worker completing: check the caching guard.
let entry = pruner.inner.file_entries[0].lock().unwrap();
let should_cache = should_cache_builder(&entry);
drop(entry);
assert!(!should_cache);
// Ensure the gauge was not incremented for a stale builder.
// (skip_file_range already decremented it if there was one, but here
// there was none, so the gauge should be at baseline.)
}
#[tokio::test]
async fn worker_caches_when_ranges_remain() {
let (_env, pruner) = make_test_pruner(1).await;
// Simulate 2 ranges for file 0.
let ranges: Vec<PartitionRange> = (0..2).map(|_| file_partition_range(0)).collect();
pruner.add_partition_ranges(&ranges);
assert_eq!(pruner.test_remaining_ranges(0), 2);
// Consume only 1 range.
let mut reader_metrics = ReaderMetrics::default();
let index = RowGroupIndex {
index: 0,
row_group_index: 0,
};
pruner.skip_file_range(index, &mut reader_metrics);
assert_eq!(pruner.test_remaining_ranges(0), 1);
// The worker should still cache because remaining_ranges > 0.
let entry = pruner.inner.file_entries[0].lock().unwrap();
assert!(should_cache_builder(&entry));
}
// ── Corner case: fast-skip across multiple row groups ─────────────
/// 1 file × 3 row groups, predicate `ts > 10000ms` prunes the file at
/// manifest level. Fast-skipping each of the 3 row groups must return true
/// and decrement remaining_ranges to 0.
#[tokio::test]
async fn try_skip_manifest_pruned_file_range_multi_row_groups() {
let predicate_exprs: Vec<Expr> =
vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(10_000), None)))];
let (_env, pruner) = make_test_pruner_with_predicate(1, 3, &predicate_exprs).await;
let ranges = pruner.inner.stream_ctx.partition_ranges();
assert_eq!(ranges.len(), 3);
pruner.add_partition_ranges(&ranges);
assert_eq!(pruner.test_remaining_ranges(0), 3);
let partition_pruner = Arc::new(PartitionPruner::new(pruner.clone(), &ranges));
let partition_metrics = make_partition_metrics();
// Fast-skip each of the 3 row groups.
for rg in 0..3 {
let index = RowGroupIndex {
index: 0, // file_index == 0, no memtables
row_group_index: rg,
};
let skipped =
partition_pruner.try_skip_manifest_pruned_file_range(index, &partition_metrics);
assert!(skipped, "row group {} should be skipped", rg);
}
// All refs consumed.
assert_eq!(pruner.test_remaining_ranges(0), 0);
// manifest_pruned_files is CAS'd exactly once (first call).
assert!(pruner.test_is_manifest_pruned(0));
}
/// A file whose manifest time range may contain matching rows must not be
/// fast-skipped. This protects query correctness over metrics precision.
#[tokio::test]
async fn try_skip_manifest_pruned_file_range_keeps_overlapping_file() {
let predicate_exprs: Vec<Expr> =
vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(500), None)))];
let (_env, pruner) = make_test_pruner_with_predicate(1, 2, &predicate_exprs).await;
let ranges = pruner.inner.stream_ctx.partition_ranges();
assert_eq!(ranges.len(), 2);
pruner.add_partition_ranges(&ranges);
assert_eq!(pruner.test_remaining_ranges(0), 2);
let partition_pruner = Arc::new(PartitionPruner::new(pruner.clone(), &ranges));
let partition_metrics = make_partition_metrics();
let range_meta = &pruner.inner.stream_ctx.ranges[ranges[0].identifier];
let index = range_meta.row_group_indices[0];
let skipped =
partition_pruner.try_skip_manifest_pruned_file_range(index, &partition_metrics);
assert!(!skipped);
assert_eq!(pruner.test_remaining_ranges(0), 2);
assert!(!pruner.test_is_manifest_pruned(0));
}
// ── Corner case: add_partition_ranges resets the manifest-pruned flag ──
#[tokio::test]
async fn add_partition_ranges_resets_manifest_pruned_flag() {
let predicate_exprs: Vec<Expr> =
vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(10_000), None)))];
let (_env, pruner) = make_test_pruner_with_predicate(1, 1, &predicate_exprs).await;
// Mark file 0 as manifest-pruned.
let mut reader_metrics = ReaderMetrics::default();
let marked = pruner
.inner
.try_mark_manifest_pruned(0, &mut reader_metrics);
assert!(marked);
assert!(pruner.test_is_manifest_pruned(0));
assert_eq!(reader_metrics.filter_metrics.files_time_range_pruned, 1);
// Calling add_partition_ranges must reset the flag.
let ranges = vec![file_partition_range(0)];
pruner.add_partition_ranges(&ranges);
assert!(!pruner.test_is_manifest_pruned(0));
// remaining_ranges was also incremented.
assert_eq!(pruner.test_remaining_ranges(0), 1);
}
// ── Corner case: prune_file_directly short-circuits via manifest prune ──
#[tokio::test]
async fn prune_file_directly_manifest_pruned_returns_empty_builder() {
let predicate_exprs: Vec<Expr> =
vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(10_000), None)))];
let (_env, pruner) = make_test_pruner_with_predicate(1, 1, &predicate_exprs).await;
// Ensure there is no cached builder yet.
assert!(!pruner.test_has_builder(0));
let mut reader_metrics = ReaderMetrics::default();
let builder = pruner
.prune_file_directly(0, PreFilterMode::SkipFields, &mut reader_metrics)
.await
.unwrap();
// Should be the default (empty) builder.
assert_eq!(
builder.memory_size(),
FileRangeBuilder::default().memory_size()
);
// builder must NOT be cached — the manifest-pruned flag is enough.
assert!(!pruner.test_has_builder(0));
// files_time_range_pruned was recorded.
assert_eq!(reader_metrics.filter_metrics.files_time_range_pruned, 1);
}
// ── Corner case: try_mark_manifest_pruned does not double-count ───
#[tokio::test]
async fn try_mark_manifest_pruned_only_counts_first_cas() {
let predicate_exprs: Vec<Expr> =
vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(10_000), None)))];
let (_env, pruner) = make_test_pruner_with_predicate(1, 1, &predicate_exprs).await;
// First call: CAS succeeds, metric incremented.
let mut reader_metrics = ReaderMetrics::default();
let marked = pruner
.inner
.try_mark_manifest_pruned(0, &mut reader_metrics);
assert!(marked);
assert_eq!(reader_metrics.filter_metrics.files_time_range_pruned, 1);
assert!(pruner.test_is_manifest_pruned(0));
// Second call: already true, no metric delta.
let mut reader_metrics2 = ReaderMetrics::default();
let marked2 = pruner
.inner
.try_mark_manifest_pruned(0, &mut reader_metrics2);
assert!(marked2);
assert_eq!(reader_metrics2.filter_metrics.files_time_range_pruned, 0);
}
}

View File

@@ -1056,7 +1056,7 @@ impl ScanInput {
ranges
}
fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
pub(crate) fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
if self.should_skip_region_partition(file) {
self.predicate.predicate_without_region().cloned()
} else {
@@ -1095,7 +1095,35 @@ impl ScanInput {
})
}
/// Checks whether a file can be definitively pruned using only its manifest-level
/// time range and the current predicate, without reading any parquet metadata.
///
/// Returns `true` if [PruningStatistics] proves the file cannot contain matching rows.
#[inline]
pub(crate) fn can_manifest_prune_file(&self, file: &FileHandle) -> bool {
let predicate = self.predicate_for_file(file);
self.manifest_prunes_file(file, predicate.as_ref())
}
fn manifest_prunes_file(&self, file: &FileHandle, predicate: Option<&Predicate>) -> bool {
if let Some(pred) = predicate
&& !pred.is_empty()
&& let Some(file_level_stats) = self.try_file_level_pruning_stats(file)
{
let pruning_results = pred.prune_with_stats(
&file_level_stats,
self.mapper.metadata().schema.arrow_schema(),
);
pruning_results.first() == Some(&false)
} else {
false
}
}
/// Prunes a file to scan and returns the builder to build readers.
///
/// This is the public entry point used by direct tests and non-pruner callers.
/// It performs its own manifest-level pruning check internally.
#[tracing::instrument(
skip_all,
fields(
@@ -1112,22 +1140,29 @@ impl ScanInput {
let predicate = self.predicate_for_file(file);
// Early file-level pruning using manifest time range before any parquet metadata access.
// This avoids I/O for files that definitely can't match the current predicate snapshot,
// especially after TopK dynamic filters have established a timestamp threshold.
if let Some(ref pred) = predicate
&& !pred.is_empty()
&& let Some(file_level_stats) = self.try_file_level_pruning_stats(file)
{
let pruning_results = pred.prune_with_stats(
&file_level_stats,
self.mapper.metadata().schema.arrow_schema(),
);
if pruning_results.first() == Some(&false) {
reader_metrics.filter_metrics.files_time_range_pruned += 1;
return Ok(FileRangeBuilder::default());
}
if self.manifest_prunes_file(file, predicate.as_ref()) {
reader_metrics.filter_metrics.files_time_range_pruned += 1;
return Ok(FileRangeBuilder::default());
}
self.prune_file_after_manifest_check(file, pre_filter_mode, predicate, reader_metrics)
.await
}
/// Second half of `prune_file` — performs the actual parquet metadata /
/// reader setup. Callers that already performed manifest-level pruning
/// (e.g. the `Pruner` via its shared `manifest_pruned_files` cache) should
/// call this directly to avoid a redundant manifest check.
///
/// `predicate` is the result of `self.predicate_for_file(file)` computed
/// externally so the caller can reuse it if needed.
pub(crate) async fn prune_file_after_manifest_check(
&self,
file: &FileHandle,
pre_filter_mode: PreFilterMode,
predicate: Option<Predicate>,
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let may_build_selective_row_selection = predicate.is_some();
let decode_pk_values = !self.compaction
&& self
@@ -1955,9 +1990,11 @@ mod tests {
use std::sync::Arc;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::physical_plan::expressions::lit as physical_lit;
use datafusion::physical_plan::expressions::{
binary as physical_binary, col as physical_col, lit as physical_lit,
};
use datafusion_common::ScalarValue;
use datafusion_expr::{col, lit};
use datafusion_expr::{Operator, col, lit};
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit as ArrowTimeUnit,
};
@@ -2419,6 +2456,39 @@ mod tests {
assert!(ranges.is_empty());
}
#[tokio::test]
async fn test_manifest_pruning_observes_dynamic_filter_update() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
let arrow_schema = metadata.schema.arrow_schema();
let ts_expr = physical_col("ts", arrow_schema.as_ref()).unwrap();
let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![ts_expr.clone()],
physical_lit(true),
));
predicate_group.add_dyn_filters(vec![dyn_filter.clone()]);
let input = ScanInput::new(SchedulerEnv::new().await.access_layer.clone(), mapper)
.with_predicate(predicate_group);
let file = file_handle_with_time_range(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
);
assert!(!input.can_manifest_prune_file(&file));
let updated = physical_binary(
ts_expr,
Operator::Gt,
physical_lit(ScalarValue::TimestampMillisecond(Some(1000), None)),
arrow_schema.as_ref(),
)
.unwrap();
dyn_filter.update(updated).unwrap();
assert!(input.can_manifest_prune_file(&file));
}
#[tokio::test]
async fn test_range_pre_filter_mode() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));

View File

@@ -665,6 +665,13 @@ pub(crate) async fn build_flat_sources(
);
ordered_sources[position] = Some(Box::pin(stream) as _);
} else if stream_ctx.is_file_range_index(*index) {
// Common manifest-level fast-skip shared by SeqScan and UnorderedScan.
// Compaction should keep reading its selected input ranges completely.
if !compaction
&& partition_pruner.try_skip_manifest_pruned_file_range(*index, part_metrics)
{
continue;
}
if let Some(semaphore_ref) = semaphore.as_ref() {
// run in parallel, controlled by semaphore
let stream_ctx = stream_ctx.clone();

View File

@@ -133,6 +133,12 @@ impl UnorderedScan {
yield record_batch?;
}
} else if stream_ctx.is_file_range_index(*index) {
// Common manifest-level fast-skip shared by UnorderedScan and SeqScan.
if partition_pruner
.try_skip_manifest_pruned_file_range(*index, &part_metrics)
{
continue;
}
let stream = scan_flat_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),