From ba073045e208528f47aeda8e5c0c3fd2199aae87 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 2 Jul 2026 14:25:16 +0800 Subject: [PATCH] perf(mito): skip manifest-pruned file ranges (#8366) * perf(mito): skip manifest-pruned file ranges Signed-off-by: discord9 * test(mito): allow empty prune benchmark output Signed-off-by: discord9 * fix(mito): avoid caching stale pruned builders Signed-off-by: discord9 * chore(mito): address pruner clippy Signed-off-by: discord9 * fix(mito): account worker pruner builder metrics Signed-off-by: discord9 * test(mito): keep empty prune benchmark local Signed-off-by: discord9 * refactor(mito): share manifest-pruned range skip Signed-off-by: discord9 * chore(mito): shorten prune cache comment Signed-off-by: discord9 * fix(mito): keep manifest prune state in pruner Signed-off-by: discord9 * test(mito): cover manifest prune fast skip edge cases Signed-off-by: discord9 * chore: fix typo in logical table alter Signed-off-by: discord9 * chore(mito): address pruner review comments Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/mito2/src/read/pruner.rs | 623 ++++++++++++++++++++++++++- src/mito2/src/read/scan_region.rs | 104 ++++- src/mito2/src/read/seq_scan.rs | 7 + src/mito2/src/read/unordered_scan.rs | 6 + 4 files changed, 703 insertions(+), 37 deletions(-) diff --git a/src/mito2/src/read/pruner.rs b/src/mito2/src/read/pruner.rs index b4f25e5da1..497329826c 100644 --- a/src/mito2/src/read/pruner.rs +++ b/src/mito2/src/read/pruner.rs @@ -15,7 +15,7 @@ //! Pruner for parallel file pruning across scanner partitions. use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -117,6 +117,35 @@ impl PartitionPruner { Ok(ranges) } + /// Checks whether the file range at `index` can be skipped because the + /// current predicate definitively prunes it at manifest level (no I/O). + /// + /// Returns `true` if the range was skipped. When skipped, this method + /// balances the pruner's per-file reference count and merges the resulting + /// reader metrics into `part_metrics`. + /// + /// Uses shared per-file state so repeated row groups can skip cheaply after + /// the first manifest-prune decision. + pub fn try_skip_manifest_pruned_file_range( + &self, + index: RowGroupIndex, + part_metrics: &PartitionMetrics, + ) -> bool { + let Some(file_index) = self.file_index(index) else { + return false; + }; + let mut reader_metrics = ReaderMetrics::default(); + let pruned = self + .pruner + .inner + .try_mark_manifest_pruned(file_index, &mut reader_metrics); + if pruned { + self.pruner.skip_file_range(index, &mut reader_metrics); + part_metrics.merge_reader_metrics(&reader_metrics, None); + } + pruned + } + /// Pre-fetches upcoming files starting from the given position. fn prefetch_upcoming_files(&self, current_pos: usize, partition_metrics: &PartitionMetrics) { let start = current_pos + 1; @@ -139,6 +168,14 @@ impl PartitionPruner { .copied() .unwrap_or(PreFilterMode::SkipFields) } + + fn file_index(&self, index: RowGroupIndex) -> Option { + self.pruner + .inner + .stream_ctx + .is_file_range_index(index) + .then(|| index.index - self.pruner.inner.stream_ctx.input.num_memtables()) + } } /// A pruner that prunes files for all partitions of a scanner. @@ -155,6 +192,40 @@ struct PrunerInner { file_entries: Vec>, /// StreamContext containing all context needed for pruning. stream_ctx: Arc, + /// Positive manifest-prune cache shared across all scan partitions. + /// + /// SAFETY: cached positives are valid because dynamic filters only tighten; + /// negative decisions are not cached. Reset by `add_partition_ranges()` for + /// each fresh batch of partition ranges. + manifest_pruned_files: Vec, +} + +impl PrunerInner { + /// Checks whether manifest-level pruning proves this file is empty given the + /// current predicate. If true, CAS the shared cache from false→true and + /// record `files_time_range_pruned` in `reader_metrics`. + /// + /// Returns `true` if already cached or newly proven pruned. + fn try_mark_manifest_pruned( + &self, + file_index: usize, + reader_metrics: &mut ReaderMetrics, + ) -> bool { + if self.manifest_pruned_files[file_index].load(Ordering::Relaxed) { + return true; + } + let file = &self.stream_ctx.input.files[file_index]; + if !self.stream_ctx.input.can_manifest_prune_file(file) { + return false; + } + if self.manifest_pruned_files[file_index] + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + reader_metrics.filter_metrics.files_time_range_pruned += 1; + } + true + } } /// Per-file state tracking. @@ -196,6 +267,8 @@ impl Pruner { }) }) .collect(); + let manifest_pruned_files: Vec = + (0..num_files).map(|_| AtomicBool::new(false)).collect(); // Create channels and collect senders let mut worker_senders = Vec::with_capacity(num_workers); let mut receivers = Vec::with_capacity(num_workers); @@ -209,6 +282,7 @@ impl Pruner { num_workers, file_entries, stream_ctx, + manifest_pruned_files, }); // Spawn worker tasks with their receivers @@ -225,8 +299,14 @@ impl Pruner { } } - /// Adds reference counts for all partitions' ranges. + /// Adds reference counts for all partitions' ranges and resets the full + /// manifest-prune cache so that dynamic-filter updates are visible to the + /// fresh scan. pub fn add_partition_ranges(&self, partition_ranges: &[PartitionRange]) { + for pruned in &self.inner.manifest_pruned_files { + pruned.store(false, Ordering::Relaxed); + } + // Add reference counts for each partition range let num_memtables = self.inner.stream_ctx.input.num_memtables(); for part_range in partition_ranges { @@ -277,6 +357,19 @@ impl Pruner { Ok(ranges) } + /// Skips a file range that has been pruned before entering the file pruner. + /// + /// This keeps the pruner's per-file reference counts balanced with + /// `add_partition_ranges()`. It may also clear a cached builder when this was the + /// last remaining range for the file. + pub fn skip_file_range(&self, index: RowGroupIndex, reader_metrics: &mut ReaderMetrics) { + if !self.inner.stream_ctx.is_file_range_index(index) { + return; + } + let file_index = index.index - self.inner.stream_ctx.input.num_memtables(); + self.decrement_and_maybe_clear(file_index, reader_metrics); + } + /// Gets or creates the FileRangeBuilder for a file. async fn get_file_builder( &self, @@ -374,25 +467,33 @@ impl Pruner { pre_filter_mode: PreFilterMode, reader_metrics: &mut ReaderMetrics, ) -> Result> { + // Check manifest-level prune first (shared cache, no I/O). + if self + .inner + .try_mark_manifest_pruned(file_index, reader_metrics) + { + let arc_builder = Arc::new(FileRangeBuilder::default()); + // Do NOT cache an empty manifest-pruned builder; the cache flag + // already records the decision. + return Ok(arc_builder); + } + let file = &self.inner.stream_ctx.input.files[file_index]; + let predicate = self.inner.stream_ctx.input.predicate_for_file(file); let builder = self .inner .stream_ctx .input - .prune_file(file, pre_filter_mode, reader_metrics) + .prune_file_after_manifest_check(file, pre_filter_mode, predicate, reader_metrics) .await?; let arc_builder = Arc::new(builder); - // Caches the builder + // Caches the builder only if the file still has remaining ranges. + // `skip_file_range` may have already consumed all ranges for this file. { let mut entry = self.inner.file_entries[file_index].lock().unwrap(); - if entry.builder.is_none() { - reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize; - reader_metrics.num_range_builders += 1; - entry.builder = Some(arc_builder.clone()); - PRUNER_ACTIVE_BUILDERS.inc(); - } + cache_builder_if_needed(&mut entry, &arc_builder, reader_metrics); } Ok(arc_builder) @@ -444,7 +545,6 @@ impl Pruner { } worker_cache_miss += 1; - // Do the actual pruning (outside lock) let file = &inner.stream_ctx.input.files[file_index]; pruned_files.push(file.file_id().file_id()); let explain_verbose = partition_metrics @@ -455,24 +555,45 @@ impl Pruner { filter_metrics: new_filter_metrics(explain_verbose), ..Default::default() }; - let result = inner - .stream_ctx - .input - .prune_file(file, pre_filter_mode, &mut metrics) - .await; + + // Check manifest-level prune first (shared cache, no I/O). + let result = if inner.try_mark_manifest_pruned(file_index, &mut metrics) { + // Manifest-level pruning proved the file empty — produce a + // default builder without reading any parquet metadata. + Ok(FileRangeBuilder::default()) + } else { + let predicate = inner.stream_ctx.input.predicate_for_file(file); + inner + .stream_ctx + .input + .prune_file_after_manifest_check(file, pre_filter_mode, predicate, &mut metrics) + .await + }; // Update state and notify waiters let mut entry = inner.file_entries[file_index].lock().unwrap(); match result { Ok(builder) => { let arc_builder = Arc::new(builder); - entry.builder = Some(arc_builder.clone()); - PRUNER_ACTIVE_BUILDERS.inc(); + let is_background = response_tx.is_none(); + + // Only cache the builder if the file still has remaining ranges. + // If remaining_ranges == 0, a concurrent `skip_file_range` (e.g. from a + // dynamic filter tightening via manifest-prune fast-skip) already consumed + // all ranges and may have cleared a previously cached builder. + // Skip caching manifest-pruned empty builders; the cache flag is enough. + let did_cache = + if inner.manifest_pruned_files[file_index].load(Ordering::Relaxed) { + false + } else { + cache_builder_if_needed(&mut entry, &arc_builder, &mut metrics) + }; // Notify all waiters for waiter in entry.waiters.drain(..) { let _ = waiter.send(Ok(arc_builder.clone())); } + // Always respond to foreground caller, even if we did not cache. if let Some(response_tx) = response_tx { let _ = response_tx.send(Ok(arc_builder)); } @@ -485,8 +606,13 @@ impl Pruner { metrics ); - // Merge metrics to partition if provided - if let Some(part_metrics) = &partition_metrics { + // Merge metrics if this is a foreground request, or if the builder + // was cached. Skip stale per-file metrics + // for background requests that completed after the file was already + // fully skipped. + if (!is_background || did_cache) + && let Some(part_metrics) = &partition_metrics + { let per_file_metrics = if part_metrics.explain_verbose() { let file_id = file.file_id(); let mut map = HashMap::new(); @@ -525,3 +651,460 @@ impl Pruner { ); } } + +#[cfg(test)] +impl Pruner { + /// Returns the remaining range count for a file (test-only). + fn test_remaining_ranges(&self, file_index: usize) -> usize { + self.inner.file_entries[file_index] + .lock() + .unwrap() + .remaining_ranges + } + + /// Returns whether a cached builder exists for a file (test-only). + fn test_has_builder(&self, file_index: usize) -> bool { + self.inner.file_entries[file_index] + .lock() + .unwrap() + .builder + .is_some() + } + + /// Returns the manifest-pruned flag for a file (test-only). + fn test_is_manifest_pruned(&self, file_index: usize) -> bool { + self.inner.manifest_pruned_files[file_index].load(Ordering::Relaxed) + } + + /// Clears a cached builder for a file, simulating stale cleanup (test-only). + #[allow(dead_code)] + fn test_clear_builder(&self, file_index: usize) { + let mut entry = self.inner.file_entries[file_index].lock().unwrap(); + if entry.builder.take().is_some() { + PRUNER_ACTIVE_BUILDERS.dec(); + } + } +} + +/// Returns true if a freshly pruned builder should be cached for this file. +fn should_cache_builder(entry: &FileBuilderEntry) -> bool { + entry.builder.is_none() && entry.remaining_ranges > 0 +} + +/// Caches a freshly pruned builder if the file still has remaining ranges, and +/// records the corresponding builder memory/count deltas for verbose metrics. +fn cache_builder_if_needed( + entry: &mut FileBuilderEntry, + builder: &Arc, + reader_metrics: &mut ReaderMetrics, +) -> bool { + if should_cache_builder(entry) { + reader_metrics.metadata_mem_size += builder.memory_size() as isize; + reader_metrics.num_range_builders += 1; + entry.builder = Some(builder.clone()); + PRUNER_ACTIVE_BUILDERS.inc(); + true + } else { + false + } +} + +#[cfg(test)] +mod tests { + use common_time::Timestamp; + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; + use datafusion_common::ScalarValue; + use datafusion_expr::{Expr, col, lit}; + use store_api::region_engine::PartitionRange; + use store_api::storage::{FileId, RegionId}; + + use super::*; + use crate::read::flat_projection::FlatProjectionMapper; + use crate::read::range::RowGroupIndex; + use crate::read::scan_region::{PredicateGroup, ScanInput}; + use crate::read::scan_util::PartitionMetrics; + use crate::sst::file::{FileHandle, FileMeta}; + use crate::sst::parquet::reader::ReaderMetrics; + use crate::test_util::memtable_util::metadata_with_primary_key; + use crate::test_util::new_noop_file_purger; + use crate::test_util::scheduler_util::SchedulerEnv; + + async fn make_test_pruner(num_files: usize) -> (SchedulerEnv, Arc) { + let env = SchedulerEnv::new().await; + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap(); + + let files: Vec = (0..num_files) + .map(|_| { + let meta = FileMeta { + region_id: RegionId::new(123, 456), + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1000), + ), + num_row_groups: 1, + num_rows: 1024, + level: 0, + ..Default::default() + }; + FileHandle::new(meta, new_noop_file_purger()) + }) + .collect(); + + let input = ScanInput::new(env.access_layer.clone(), mapper) + .with_files(files) + .with_append_mode(true); + let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input)); + let pruner = Arc::new(Pruner::new(stream_ctx, 1)); + (env, pruner) + } + + /// Builds a minimal `PartitionRange` that references `file_index`. + /// `add_partition_ranges` will look up `stream_ctx.ranges[identifier]` + /// and find `row_group_indices[0] == RowGroupIndex { index: file_index, + /// row_group_index: 0 }` because `unordered_scan_ranges` with + /// `num_row_groups=1` produces one range per file. + fn file_partition_range(file_index: usize) -> PartitionRange { + PartitionRange { + start: Timestamp::new_millisecond(0), + end: Timestamp::new_millisecond(1001), + num_rows: 1024, + identifier: file_index, + } + } + + async fn make_test_pruner_with_predicate( + num_files: usize, + row_groups_per_file: u64, + predicate_exprs: &[Expr], + ) -> (SchedulerEnv, Arc) { + let env = SchedulerEnv::new().await; + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap(); + let predicate = PredicateGroup::new(&metadata, predicate_exprs).unwrap(); + + let files: Vec = (0..num_files) + .map(|_| { + let meta = FileMeta { + region_id: RegionId::new(123, 456), + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1000), + ), + num_row_groups: row_groups_per_file, + num_rows: row_groups_per_file * 1024, + level: 0, + ..Default::default() + }; + FileHandle::new(meta, new_noop_file_purger()) + }) + .collect(); + + let input = ScanInput::new(env.access_layer.clone(), mapper) + .with_files(files) + .with_predicate(predicate) + .with_append_mode(true); + let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input)); + let pruner = Arc::new(Pruner::new(stream_ctx, 1)); + (env, pruner) + } + + fn make_partition_metrics() -> PartitionMetrics { + let metrics_set = ExecutionPlanMetricsSet::new(); + PartitionMetrics::new( + RegionId::new(123, 456), + 0, + "test", + Instant::now(), + false, + &metrics_set, + ) + } + + #[test] + fn should_cache_builder_when_ranges_remain() { + let entry = FileBuilderEntry { + builder: None, + remaining_ranges: 3, + waiters: Vec::new(), + }; + assert!(should_cache_builder(&entry)); + } + + #[test] + fn should_not_cache_builder_when_no_ranges_remain() { + let entry = FileBuilderEntry { + builder: None, + remaining_ranges: 0, + waiters: Vec::new(), + }; + assert!(!should_cache_builder(&entry)); + } + + #[test] + fn should_not_cache_builder_when_already_cached() { + let entry = FileBuilderEntry { + builder: Some(Arc::new(FileRangeBuilder::default())), + remaining_ranges: 1, + waiters: Vec::new(), + }; + assert!(!should_cache_builder(&entry)); + } + + #[test] + fn cache_builder_records_metrics() { + let mut entry = FileBuilderEntry { + builder: None, + remaining_ranges: 1, + waiters: Vec::new(), + }; + let builder = Arc::new(FileRangeBuilder::default()); + let mut reader_metrics = ReaderMetrics::default(); + + assert!(cache_builder_if_needed( + &mut entry, + &builder, + &mut reader_metrics + )); + assert!(entry.builder.is_some()); + assert_eq!( + reader_metrics.metadata_mem_size, + builder.memory_size() as isize + ); + assert_eq!(reader_metrics.num_range_builders, 1); + + if entry.builder.take().is_some() { + PRUNER_ACTIVE_BUILDERS.dec(); + } + } + + #[tokio::test] + async fn skip_file_range_decrements_and_clears_builder() { + let (_env, pruner) = make_test_pruner(1).await; + + // Simulate 3 partition ranges for file 0. + let ranges: Vec = (0..3).map(|_| file_partition_range(0)).collect(); + pruner.add_partition_ranges(&ranges); + assert_eq!(pruner.test_remaining_ranges(0), 3); + + // Manually set a cached builder (simulating a previous cache hit). + { + let mut entry = pruner.inner.file_entries[0].lock().unwrap(); + entry.builder = Some(Arc::new(FileRangeBuilder::default())); + PRUNER_ACTIVE_BUILDERS.inc(); + } + assert!(pruner.test_has_builder(0)); + + // Skip all 3 ranges; the third should clear the builder. + let mut reader_metrics = ReaderMetrics::default(); + for i in 0..3 { + let index = RowGroupIndex { + index: 0, + row_group_index: i as i64, + }; + pruner.skip_file_range(index, &mut reader_metrics); + } + + assert_eq!(pruner.test_remaining_ranges(0), 0); + assert!(!pruner.test_has_builder(0)); + } + + #[tokio::test] + async fn worker_does_not_cache_after_skip_file_range_consumed_all() { + let (_env, pruner) = make_test_pruner(1).await; + + // Simulate one range for file 0. + let ranges = vec![file_partition_range(0)]; + pruner.add_partition_ranges(&ranges); + assert_eq!(pruner.test_remaining_ranges(0), 1); + + // Simulate skip_file_range consuming the last range BEFORE the + // background worker finishes. This mirrors the race: a dynamic filter + // tightens and manifest-prune fast-skip zeros out remaining_ranges. + let mut reader_metrics = ReaderMetrics::default(); + let index = RowGroupIndex { + index: 0, + row_group_index: 0, + }; + pruner.skip_file_range(index, &mut reader_metrics); + assert_eq!(pruner.test_remaining_ranges(0), 0); + assert!(!pruner.test_has_builder(0)); + + // Now simulate the worker completing: check the caching guard. + let entry = pruner.inner.file_entries[0].lock().unwrap(); + let should_cache = should_cache_builder(&entry); + drop(entry); + + assert!(!should_cache); + + // Ensure the gauge was not incremented for a stale builder. + // (skip_file_range already decremented it if there was one, but here + // there was none, so the gauge should be at baseline.) + } + + #[tokio::test] + async fn worker_caches_when_ranges_remain() { + let (_env, pruner) = make_test_pruner(1).await; + + // Simulate 2 ranges for file 0. + let ranges: Vec = (0..2).map(|_| file_partition_range(0)).collect(); + pruner.add_partition_ranges(&ranges); + assert_eq!(pruner.test_remaining_ranges(0), 2); + + // Consume only 1 range. + let mut reader_metrics = ReaderMetrics::default(); + let index = RowGroupIndex { + index: 0, + row_group_index: 0, + }; + pruner.skip_file_range(index, &mut reader_metrics); + assert_eq!(pruner.test_remaining_ranges(0), 1); + + // The worker should still cache because remaining_ranges > 0. + let entry = pruner.inner.file_entries[0].lock().unwrap(); + assert!(should_cache_builder(&entry)); + } + + // ── Corner case: fast-skip across multiple row groups ───────────── + + /// 1 file × 3 row groups, predicate `ts > 10000ms` prunes the file at + /// manifest level. Fast-skipping each of the 3 row groups must return true + /// and decrement remaining_ranges to 0. + #[tokio::test] + async fn try_skip_manifest_pruned_file_range_multi_row_groups() { + let predicate_exprs: Vec = + vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(10_000), None)))]; + let (_env, pruner) = make_test_pruner_with_predicate(1, 3, &predicate_exprs).await; + + let ranges = pruner.inner.stream_ctx.partition_ranges(); + assert_eq!(ranges.len(), 3); + pruner.add_partition_ranges(&ranges); + assert_eq!(pruner.test_remaining_ranges(0), 3); + + let partition_pruner = Arc::new(PartitionPruner::new(pruner.clone(), &ranges)); + let partition_metrics = make_partition_metrics(); + + // Fast-skip each of the 3 row groups. + for rg in 0..3 { + let index = RowGroupIndex { + index: 0, // file_index == 0, no memtables + row_group_index: rg, + }; + let skipped = + partition_pruner.try_skip_manifest_pruned_file_range(index, &partition_metrics); + assert!(skipped, "row group {} should be skipped", rg); + } + + // All refs consumed. + assert_eq!(pruner.test_remaining_ranges(0), 0); + // manifest_pruned_files is CAS'd exactly once (first call). + assert!(pruner.test_is_manifest_pruned(0)); + } + + /// A file whose manifest time range may contain matching rows must not be + /// fast-skipped. This protects query correctness over metrics precision. + #[tokio::test] + async fn try_skip_manifest_pruned_file_range_keeps_overlapping_file() { + let predicate_exprs: Vec = + vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(500), None)))]; + let (_env, pruner) = make_test_pruner_with_predicate(1, 2, &predicate_exprs).await; + + let ranges = pruner.inner.stream_ctx.partition_ranges(); + assert_eq!(ranges.len(), 2); + pruner.add_partition_ranges(&ranges); + assert_eq!(pruner.test_remaining_ranges(0), 2); + + let partition_pruner = Arc::new(PartitionPruner::new(pruner.clone(), &ranges)); + let partition_metrics = make_partition_metrics(); + let range_meta = &pruner.inner.stream_ctx.ranges[ranges[0].identifier]; + let index = range_meta.row_group_indices[0]; + + let skipped = + partition_pruner.try_skip_manifest_pruned_file_range(index, &partition_metrics); + + assert!(!skipped); + assert_eq!(pruner.test_remaining_ranges(0), 2); + assert!(!pruner.test_is_manifest_pruned(0)); + } + + // ── Corner case: add_partition_ranges resets the manifest-pruned flag ── + + #[tokio::test] + async fn add_partition_ranges_resets_manifest_pruned_flag() { + let predicate_exprs: Vec = + vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(10_000), None)))]; + let (_env, pruner) = make_test_pruner_with_predicate(1, 1, &predicate_exprs).await; + + // Mark file 0 as manifest-pruned. + let mut reader_metrics = ReaderMetrics::default(); + let marked = pruner + .inner + .try_mark_manifest_pruned(0, &mut reader_metrics); + assert!(marked); + assert!(pruner.test_is_manifest_pruned(0)); + assert_eq!(reader_metrics.filter_metrics.files_time_range_pruned, 1); + + // Calling add_partition_ranges must reset the flag. + let ranges = vec![file_partition_range(0)]; + pruner.add_partition_ranges(&ranges); + assert!(!pruner.test_is_manifest_pruned(0)); + // remaining_ranges was also incremented. + assert_eq!(pruner.test_remaining_ranges(0), 1); + } + + // ── Corner case: prune_file_directly short-circuits via manifest prune ── + + #[tokio::test] + async fn prune_file_directly_manifest_pruned_returns_empty_builder() { + let predicate_exprs: Vec = + vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(10_000), None)))]; + let (_env, pruner) = make_test_pruner_with_predicate(1, 1, &predicate_exprs).await; + + // Ensure there is no cached builder yet. + assert!(!pruner.test_has_builder(0)); + + let mut reader_metrics = ReaderMetrics::default(); + let builder = pruner + .prune_file_directly(0, PreFilterMode::SkipFields, &mut reader_metrics) + .await + .unwrap(); + + // Should be the default (empty) builder. + assert_eq!( + builder.memory_size(), + FileRangeBuilder::default().memory_size() + ); + // builder must NOT be cached — the manifest-pruned flag is enough. + assert!(!pruner.test_has_builder(0)); + // files_time_range_pruned was recorded. + assert_eq!(reader_metrics.filter_metrics.files_time_range_pruned, 1); + } + + // ── Corner case: try_mark_manifest_pruned does not double-count ─── + + #[tokio::test] + async fn try_mark_manifest_pruned_only_counts_first_cas() { + let predicate_exprs: Vec = + vec![col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(10_000), None)))]; + let (_env, pruner) = make_test_pruner_with_predicate(1, 1, &predicate_exprs).await; + + // First call: CAS succeeds, metric incremented. + let mut reader_metrics = ReaderMetrics::default(); + let marked = pruner + .inner + .try_mark_manifest_pruned(0, &mut reader_metrics); + assert!(marked); + assert_eq!(reader_metrics.filter_metrics.files_time_range_pruned, 1); + assert!(pruner.test_is_manifest_pruned(0)); + + // Second call: already true, no metric delta. + let mut reader_metrics2 = ReaderMetrics::default(); + let marked2 = pruner + .inner + .try_mark_manifest_pruned(0, &mut reader_metrics2); + assert!(marked2); + assert_eq!(reader_metrics2.filter_metrics.files_time_range_pruned, 0); + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 0ca5439170..0101b529d1 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -1056,7 +1056,7 @@ impl ScanInput { ranges } - fn predicate_for_file(&self, file: &FileHandle) -> Option { + pub(crate) fn predicate_for_file(&self, file: &FileHandle) -> Option { if self.should_skip_region_partition(file) { self.predicate.predicate_without_region().cloned() } else { @@ -1095,7 +1095,35 @@ impl ScanInput { }) } + /// Checks whether a file can be definitively pruned using only its manifest-level + /// time range and the current predicate, without reading any parquet metadata. + /// + /// Returns `true` if [PruningStatistics] proves the file cannot contain matching rows. + #[inline] + pub(crate) fn can_manifest_prune_file(&self, file: &FileHandle) -> bool { + let predicate = self.predicate_for_file(file); + self.manifest_prunes_file(file, predicate.as_ref()) + } + + fn manifest_prunes_file(&self, file: &FileHandle, predicate: Option<&Predicate>) -> bool { + if let Some(pred) = predicate + && !pred.is_empty() + && let Some(file_level_stats) = self.try_file_level_pruning_stats(file) + { + let pruning_results = pred.prune_with_stats( + &file_level_stats, + self.mapper.metadata().schema.arrow_schema(), + ); + pruning_results.first() == Some(&false) + } else { + false + } + } + /// Prunes a file to scan and returns the builder to build readers. + /// + /// This is the public entry point used by direct tests and non-pruner callers. + /// It performs its own manifest-level pruning check internally. #[tracing::instrument( skip_all, fields( @@ -1112,22 +1140,29 @@ impl ScanInput { let predicate = self.predicate_for_file(file); // Early file-level pruning using manifest time range before any parquet metadata access. - // This avoids I/O for files that definitely can't match the current predicate snapshot, - // especially after TopK dynamic filters have established a timestamp threshold. - if let Some(ref pred) = predicate - && !pred.is_empty() - && let Some(file_level_stats) = self.try_file_level_pruning_stats(file) - { - let pruning_results = pred.prune_with_stats( - &file_level_stats, - self.mapper.metadata().schema.arrow_schema(), - ); - if pruning_results.first() == Some(&false) { - reader_metrics.filter_metrics.files_time_range_pruned += 1; - return Ok(FileRangeBuilder::default()); - } + if self.manifest_prunes_file(file, predicate.as_ref()) { + reader_metrics.filter_metrics.files_time_range_pruned += 1; + return Ok(FileRangeBuilder::default()); } + self.prune_file_after_manifest_check(file, pre_filter_mode, predicate, reader_metrics) + .await + } + + /// Second half of `prune_file` — performs the actual parquet metadata / + /// reader setup. Callers that already performed manifest-level pruning + /// (e.g. the `Pruner` via its shared `manifest_pruned_files` cache) should + /// call this directly to avoid a redundant manifest check. + /// + /// `predicate` is the result of `self.predicate_for_file(file)` computed + /// externally so the caller can reuse it if needed. + pub(crate) async fn prune_file_after_manifest_check( + &self, + file: &FileHandle, + pre_filter_mode: PreFilterMode, + predicate: Option, + reader_metrics: &mut ReaderMetrics, + ) -> Result { let may_build_selective_row_selection = predicate.is_some(); let decode_pk_values = !self.compaction && self @@ -1955,9 +1990,11 @@ mod tests { use std::sync::Arc; use common_time::timestamp::{TimeUnit, Timestamp}; - use datafusion::physical_plan::expressions::lit as physical_lit; + use datafusion::physical_plan::expressions::{ + binary as physical_binary, col as physical_col, lit as physical_lit, + }; use datafusion_common::ScalarValue; - use datafusion_expr::{col, lit}; + use datafusion_expr::{Operator, col, lit}; use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit as ArrowTimeUnit, }; @@ -2419,6 +2456,39 @@ mod tests { assert!(ranges.is_empty()); } + #[tokio::test] + async fn test_manifest_pruning_observes_dynamic_filter_update() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap(); + let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap(); + let arrow_schema = metadata.schema.arrow_schema(); + let ts_expr = physical_col("ts", arrow_schema.as_ref()).unwrap(); + let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![ts_expr.clone()], + physical_lit(true), + )); + predicate_group.add_dyn_filters(vec![dyn_filter.clone()]); + let input = ScanInput::new(SchedulerEnv::new().await.access_layer.clone(), mapper) + .with_predicate(predicate_group); + let file = file_handle_with_time_range( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1000), + ); + + assert!(!input.can_manifest_prune_file(&file)); + + let updated = physical_binary( + ts_expr, + Operator::Gt, + physical_lit(ScalarValue::TimestampMillisecond(Some(1000), None)), + arrow_schema.as_ref(), + ) + .unwrap(); + dyn_filter.update(updated).unwrap(); + + assert!(input.can_manifest_prune_file(&file)); + } + #[tokio::test] async fn test_range_pre_filter_mode() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 88349fd63b..aa4087abde 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -665,6 +665,13 @@ pub(crate) async fn build_flat_sources( ); ordered_sources[position] = Some(Box::pin(stream) as _); } else if stream_ctx.is_file_range_index(*index) { + // Common manifest-level fast-skip shared by SeqScan and UnorderedScan. + // Compaction should keep reading its selected input ranges completely. + if !compaction + && partition_pruner.try_skip_manifest_pruned_file_range(*index, part_metrics) + { + continue; + } if let Some(semaphore_ref) = semaphore.as_ref() { // run in parallel, controlled by semaphore let stream_ctx = stream_ctx.clone(); diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 0c54084bff..bb2af12a35 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -133,6 +133,12 @@ impl UnorderedScan { yield record_batch?; } } else if stream_ctx.is_file_range_index(*index) { + // Common manifest-level fast-skip shared by UnorderedScan and SeqScan. + if partition_pruner + .try_skip_manifest_pruned_file_range(*index, &part_metrics) + { + continue; + } let stream = scan_flat_file_ranges( stream_ctx.clone(), part_metrics.clone(),