diff --git a/src/mito2/src/read/scan_input_stats.rs b/src/mito2/src/read/scan_input_stats.rs index 4e43d0cc4d..46cef7ff27 100644 --- a/src/mito2/src/read/scan_input_stats.rs +++ b/src/mito2/src/read/scan_input_stats.rs @@ -39,7 +39,7 @@ use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::stats::RowGroupPruningStats; -pub(crate) fn build_scan_input_stats( +pub fn build_scan_input_stats( input: &ScanInput, metadata: &RegionMetadata, ) -> std::result::Result { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5cb2d75e25..227a1a55e4 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -806,6 +806,7 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { } /// Common input for different scanners. +#[derive(Clone)] pub struct ScanInput { /// Region SST access layer. access_layer: AccessLayerRef, @@ -929,6 +930,26 @@ impl ScanInput { self } + /// Excludes SST files by their original ordinal in this scan input. + #[must_use] + pub(crate) fn with_excluded_file_ordinals(mut self, excluded_file_ordinals: &[usize]) -> Self { + if excluded_file_ordinals.is_empty() { + return self; + } + + let excluded = excluded_file_ordinals + .iter() + .copied() + .collect::>(); + self.files = self + .files + .into_iter() + .enumerate() + .filter_map(|(ordinal, file)| (!excluded.contains(&ordinal)).then_some(file)) + .collect(); + self + } + /// Sets cache for this query. #[must_use] pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self { @@ -1916,6 +1937,12 @@ mod tests { .with_files(vec![file]) } + fn new_test_file(file_id: store_api::storage::FileId) -> FileHandle { + let mut meta = crate::sst::file::FileMeta::default(); + meta.file_id = file_id; + FileHandle::new(meta, Arc::new(crate::sst::file_purger::NoopFilePurger)) + } + #[tokio::test] async fn test_build_read_column_ids_includes_filters() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); @@ -2122,6 +2149,27 @@ mod tests { assert!(build_scan_fingerprint(&no_files).is_none()); } + #[tokio::test] + async fn test_scan_input_excludes_file_ordinals_only_from_ssts() { + use store_api::storage::FileId; + + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let input = new_scan_input(metadata, vec![]).await.with_files(vec![ + new_test_file(FileId::random()), + new_test_file(FileId::random()), + new_test_file(FileId::random()), + ]); + + let remaining = input.clone().with_excluded_file_ordinals(&[1]); + + assert_eq!(input.num_memtables(), remaining.num_memtables()); + assert_eq!(remaining.num_files(), 2); + assert_eq!( + remaining.file_ids(), + vec![input.file_ids()[0], input.file_ids()[2]] + ); + } + #[tokio::test] async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() { let base = metadata_with_primary_key(vec![0, 1], false); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index b9a5fe85ed..04b271c879 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -116,6 +116,18 @@ impl SeqScan { Ok(Box::pin(aggr_stream)) } + fn rebuild_input(&mut self, input: ScanInput) { + // IMPORTANT: when `ScanInput.files` changes, all derived state must move together. + // `partition_ranges()` identifiers and `Pruner` internals are tied to this exact + // `StreamContext`, so reusing the old ones would make them point at stale file/range + // layouts. + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); + self.properties.partitions = vec![stream_ctx.partition_ranges()]; + let num_workers = common_stat::get_total_cpu_cores().max(1); + self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers)); + self.stream_ctx = stream_ctx; + } + /// Scan [`Batch`] in all partitions one by one. pub(crate) fn scan_all_partitions(&self) -> Result { let metrics_set = ExecutionPlanMetricsSet::new(); @@ -655,6 +667,19 @@ impl RegionScanner for SeqScan { } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + request.validate()?; + + if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() { + // IMPORTANT: exclusion changes the underlying file set, so rebuild the input-bound + // runtime state first, then apply ordinary property updates from `prepare()`. + let input = self + .stream_ctx + .input + .clone() + .with_excluded_file_ordinals(&excluded_file_ordinals); + self.rebuild_input(input); + } + self.properties.prepare(request); self.check_scan_limit().map_err(BoxedError::new)?; diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 39764183e6..d3996e7961 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -310,6 +310,18 @@ impl SeriesScan { Ok(()) } + + fn rebuild_input(&mut self, input: ScanInput) { + // IMPORTANT: `SeriesScan` keeps extra runtime state (`receivers`) on top of the shared + // `StreamContext`/`Pruner` pair. Once the file set changes, all of them must be reset + // together so the distributor does not keep using channels from the old partition layout. + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); + self.properties.partitions = vec![stream_ctx.partition_ranges()]; + let num_workers = common_stat::get_total_cpu_cores().max(1); + self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers)); + self.stream_ctx = stream_ctx; + self.receivers = Mutex::new(Vec::new()); + } } fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) { @@ -350,6 +362,19 @@ impl RegionScanner for SeriesScan { } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + request.validate()?; + + if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() { + // IMPORTANT: exclusion is effectively a new scan input for `SeriesScan`, so rebuild + // before applying the remaining prepare options. + let input = self + .stream_ctx + .input + .clone() + .with_excluded_file_ordinals(&excluded_file_ordinals); + self.rebuild_input(input); + } + self.properties.prepare(request); self.check_scan_limit().map_err(BoxedError::new)?; diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index ea2a86e09d..276f01f717 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -452,6 +452,16 @@ impl UnorderedScan { }; Ok(Box::pin(stream)) } + + fn rebuild_input(&mut self, input: ScanInput) { + // IMPORTANT: file exclusion changes the range layout seen by this scanner, so the + // `StreamContext`, partition ranges, and `Pruner` must be rebuilt as one snapshot. + let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input)); + self.properties.partitions = vec![stream_ctx.partition_ranges()]; + let num_workers = common_stat::get_total_cpu_cores().max(1); + self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers)); + self.stream_ctx = stream_ctx; + } } impl RegionScanner for UnorderedScan { @@ -472,6 +482,19 @@ impl RegionScanner for UnorderedScan { } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + request.validate()?; + + if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() { + // IMPORTANT: rebuild first so later property updates apply to the new input shape, + // not the stale one. + let input = self + .stream_ctx + .input + .clone() + .with_excluded_file_ordinals(&excluded_file_ordinals); + self.rebuild_input(input); + } + self.properties.prepare(request); Ok(()) diff --git a/src/query/src/optimizer/aggr_stats.rs b/src/query/src/optimizer/aggr_stats.rs index 31af3469eb..0ab4432c47 100644 --- a/src/query/src/optimizer/aggr_stats.rs +++ b/src/query/src/optimizer/aggr_stats.rs @@ -97,6 +97,9 @@ impl AggregateStats { return Ok(Transformed::no(plan)); } + // Subtask 03 only adds the scan-side exclusion plumbing. The optimizer must not + // exclude stats-covered files until subtask 04 also materializes their + // stats-derived partial state and merges it back into the aggregate result. Ok(Transformed::no(plan)) })? .data; diff --git a/src/query/src/optimizer/aggr_stats/split.rs b/src/query/src/optimizer/aggr_stats/split.rs index e49183d96b..cd2c44ed84 100644 --- a/src/query/src/optimizer/aggr_stats/split.rs +++ b/src/query/src/optimizer/aggr_stats/split.rs @@ -76,6 +76,53 @@ pub(super) type FieldCountFileSplit = FileSplit; pub(super) type TimeFileSplit = FileSplit; pub(super) type FieldMinMaxFileSplit = FileSplit; +fn stats_file_ordinals(aggregate: &StatsAgg, scan_input_stats: &RegionScanStats) -> Vec { + match aggregate { + StatsAgg::CountStar => split_count_star_files(scan_input_stats).stats_file_ordinals, + StatsAgg::CountField { column_name, .. } => { + split_count_field_files(scan_input_stats, column_name).stats_file_ordinals + } + StatsAgg::CountTimeIndex { .. } + | StatsAgg::MinTimeIndex { .. } + | StatsAgg::MaxTimeIndex { .. } => split_time_files(scan_input_stats).stats_file_ordinals, + StatsAgg::MinField { column_name, .. } | StatsAgg::MaxField { column_name, .. } => { + split_min_max_field_files(scan_input_stats, column_name).stats_file_ordinals + } + } +} + +/// Returns file ordinals that every aggregate in the list can answer from stats. +#[allow(dead_code)] +pub(super) fn common_stats_file_ordinals( + aggregates: &[StatsAgg], + scan_input_stats: &RegionScanStats, +) -> Vec { + let Some(first) = aggregates.first() else { + return Vec::new(); + }; + + let mut common = stats_file_ordinals(first, scan_input_stats) + .into_iter() + .collect::>(); + + for aggregate in &aggregates[1..] { + let ordinals = stats_file_ordinals(aggregate, scan_input_stats) + .into_iter() + .collect::>(); + common.retain(|ordinal| ordinals.contains(ordinal)); + } + + scan_input_stats + .files + .iter() + .filter_map(|file| { + common + .contains(&file.file_ordinal) + .then_some(file.file_ordinal) + }) + .collect() +} + pub(super) trait StatsAggExt { fn has_stats_files(&self, scan_input_stats: &RegionScanStats) -> bool; } diff --git a/src/query/src/optimizer/aggr_stats/tests.rs b/src/query/src/optimizer/aggr_stats/tests.rs index c98e6e5fee..7fbbac1197 100644 --- a/src/query/src/optimizer/aggr_stats/tests.rs +++ b/src/query/src/optimizer/aggr_stats/tests.rs @@ -42,8 +42,9 @@ use table::test_util::EmptyTable; use super::StatsAgg; use super::check::{RejectReason, RewriteCheck, is_supported_aggregate_name}; use super::split::{ - FileStatsRequirement, StatsAggExt, has_partition_expr_mismatch, partial_state_from_stats, - split_count_field_files, split_count_star_files, split_min_max_field_files, split_time_files, + FileStatsRequirement, StatsAggExt, common_stats_file_ordinals, has_partition_expr_mismatch, + partial_state_from_stats, split_count_field_files, split_count_star_files, + split_min_max_field_files, split_time_files, }; use crate::parser::QueryLanguageParser; use crate::tests::new_query_engine_with_table; @@ -556,6 +557,81 @@ fn test_split_min_max_field_files() { assert_eq!(split.stats.max, Some(Value::Int64(9))); } +#[test] +fn test_common_stats_file_ordinals_intersects_supported_aggregates() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(4)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + exact_num_rows: Some(5), + time_range: Some((test_timestamp(50), test_timestamp(60))), + field_stats: field_stats(Some(4), Some(Value::Int64(1)), Some(Value::Int64(7))), + partition_expr_matches_region: true, + }, + ], + }; + + let aggregates = vec![ + StatsAgg::CountStar, + StatsAgg::CountField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }, + StatsAgg::MaxField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }, + ]; + + assert_eq!(common_stats_file_ordinals(&aggregates, &stats), vec![0, 2]); +} + +#[test] +fn test_common_stats_file_ordinals_returns_only_shared_stats_eligible_files() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(4)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + ], + }; + + let aggregates = vec![ + StatsAgg::CountStar, + StatsAgg::CountField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }, + ]; + + assert_eq!(common_stats_file_ordinals(&aggregates, &stats), vec![0]); +} + #[test] fn test_partial_state_from_stats_count_star() { let aggregate = StatsAgg::CountStar; diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 54507c58f3..3a79eba033 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -22,7 +22,8 @@ use std::sync::{Arc, Mutex}; use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; use api::region::RegionResponse; use async_trait::async_trait; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream}; use common_time::Timestamp; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -388,6 +389,13 @@ pub struct PrepareRequest { pub distinguish_partition_range: Option, /// The expected number of target partitions. pub target_partitions: Option, + /// SST file ordinals to exclude from the current scan input. + /// + /// IMPORTANT: this is not a lightweight property toggle. Scanner implementations + /// may need to rebuild their `ScanInput`-derived runtime state (`StreamContext`, + /// `Pruner`, partition ranges, and similar caches) before applying other prepare + /// settings. + pub excluded_file_ordinals: Option>, } impl PrepareRequest { @@ -408,6 +416,24 @@ impl PrepareRequest { self.target_partitions = Some(target_partitions); self } + + /// Sets SST file ordinals to exclude from the current scan input. + pub fn with_excluded_file_ordinals(mut self, excluded_file_ordinals: Vec) -> Self { + self.excluded_file_ordinals = Some(excluded_file_ordinals); + self + } + + pub fn validate(&self) -> Result<(), BoxedError> { + if self.ranges.is_some() && self.excluded_file_ordinals.is_some() { + return Err(BoxedError::new(PlainError::new( + "PrepareRequest does not allow mixing ranges with excluded_file_ordinals" + .to_string(), + StatusCode::InvalidArguments, + ))); + } + + Ok(()) + } } /// Necessary context of the query for the scanner. @@ -433,9 +459,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Returns the metadata of the region. fn metadata(&self) -> RegionMetadataRef; - /// Prepares the scanner with the given partition ranges. + /// Prepares the scanner with planner-side overrides. /// - /// This method is for the planner to adjust the scanner's behavior based on the partition ranges. + /// IMPORTANT: some requests only tweak properties, but others (such as + /// `excluded_file_ordinals`) require the scanner to rebuild any runtime state derived + /// from the current `ScanInput` before applying the rest of the request. fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>; /// Scans the partition and returns a stream of record batches. @@ -474,6 +502,32 @@ pub type RegionScannerRef = Box; pub type BatchResponses = Vec<(RegionId, Result)>; +#[cfg(test)] +mod tests { + use common_time::Timestamp; + + use super::{PartitionRange, PrepareRequest}; + + #[test] + fn test_prepare_request_rejects_ranges_and_excluded_file_ordinals_together() { + let err = PrepareRequest::default() + .with_ranges(vec![vec![PartitionRange { + start: Timestamp::new_millisecond(0), + end: Timestamp::new_millisecond(1), + num_rows: 1, + identifier: 0, + }]]) + .with_excluded_file_ordinals(vec![1]) + .validate() + .unwrap_err(); + + assert!( + err.to_string() + .contains("does not allow mixing ranges with excluded_file_ordinals") + ); + } +} + /// Represents the statistics of a region. #[derive(Debug, Deserialize, Serialize, Default)] pub struct RegionStatistic { diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 94f35a8167..f20f3133a1 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -304,6 +304,32 @@ impl RegionScanExec { }) } + pub fn with_excluded_file_ordinals( + &self, + excluded_file_ordinals: Vec, + ) -> Result { + { + let mut scanner = self.scanner.lock().unwrap(); + scanner.prepare( + PrepareRequest::default().with_excluded_file_ordinals(excluded_file_ordinals), + )?; + } + + Ok(Self { + scanner: self.scanner.clone(), + arrow_schema: self.arrow_schema.clone(), + output_ordering: self.output_ordering.clone(), + metric: self.metric.clone(), + properties: self.properties.clone(), + append_mode: self.append_mode, + total_rows: self.total_rows, + is_partition_set: self.is_partition_set, + distribution: self.distribution, + explain_verbose: self.explain_verbose, + query_memory_permit: self.query_memory_permit.clone(), + }) + } + pub fn distribution(&self) -> Option { self.distribution }