feat: selective file scan

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-15 21:29:02 +08:00
parent 3e53a562cf
commit ca0a9a2d5d
10 changed files with 333 additions and 6 deletions

View File

@@ -39,7 +39,7 @@ use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::stats::RowGroupPruningStats;
pub(crate) fn build_scan_input_stats(
pub fn build_scan_input_stats(
input: &ScanInput,
metadata: &RegionMetadata,
) -> std::result::Result<RegionScanInputStats, BoxedError> {

View File

@@ -806,6 +806,7 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
}
/// Common input for different scanners.
#[derive(Clone)]
pub struct ScanInput {
/// Region SST access layer.
access_layer: AccessLayerRef,
@@ -929,6 +930,26 @@ impl ScanInput {
self
}
/// Excludes SST files by their original ordinal in this scan input.
#[must_use]
pub(crate) fn with_excluded_file_ordinals(mut self, excluded_file_ordinals: &[usize]) -> Self {
if excluded_file_ordinals.is_empty() {
return self;
}
let excluded = excluded_file_ordinals
.iter()
.copied()
.collect::<HashSet<_>>();
self.files = self
.files
.into_iter()
.enumerate()
.filter_map(|(ordinal, file)| (!excluded.contains(&ordinal)).then_some(file))
.collect();
self
}
/// Sets cache for this query.
#[must_use]
pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
@@ -1916,6 +1937,12 @@ mod tests {
.with_files(vec![file])
}
fn new_test_file(file_id: store_api::storage::FileId) -> FileHandle {
let mut meta = crate::sst::file::FileMeta::default();
meta.file_id = file_id;
FileHandle::new(meta, Arc::new(crate::sst::file_purger::NoopFilePurger))
}
#[tokio::test]
async fn test_build_read_column_ids_includes_filters() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
@@ -2122,6 +2149,27 @@ mod tests {
assert!(build_scan_fingerprint(&no_files).is_none());
}
#[tokio::test]
async fn test_scan_input_excludes_file_ordinals_only_from_ssts() {
use store_api::storage::FileId;
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let input = new_scan_input(metadata, vec![]).await.with_files(vec![
new_test_file(FileId::random()),
new_test_file(FileId::random()),
new_test_file(FileId::random()),
]);
let remaining = input.clone().with_excluded_file_ordinals(&[1]);
assert_eq!(input.num_memtables(), remaining.num_memtables());
assert_eq!(remaining.num_files(), 2);
assert_eq!(
remaining.file_ids(),
vec![input.file_ids()[0], input.file_ids()[2]]
);
}
#[tokio::test]
async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
let base = metadata_with_primary_key(vec![0, 1], false);

View File

@@ -116,6 +116,18 @@ impl SeqScan {
Ok(Box::pin(aggr_stream))
}
fn rebuild_input(&mut self, input: ScanInput) {
// IMPORTANT: when `ScanInput.files` changes, all derived state must move together.
// `partition_ranges()` identifiers and `Pruner` internals are tied to this exact
// `StreamContext`, so reusing the old ones would make them point at stale file/range
// layouts.
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
self.properties.partitions = vec![stream_ctx.partition_ranges()];
let num_workers = common_stat::get_total_cpu_cores().max(1);
self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
self.stream_ctx = stream_ctx;
}
/// Scan [`Batch`] in all partitions one by one.
pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
let metrics_set = ExecutionPlanMetricsSet::new();
@@ -655,6 +667,19 @@ impl RegionScanner for SeqScan {
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
request.validate()?;
if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() {
// IMPORTANT: exclusion changes the underlying file set, so rebuild the input-bound
// runtime state first, then apply ordinary property updates from `prepare()`.
let input = self
.stream_ctx
.input
.clone()
.with_excluded_file_ordinals(&excluded_file_ordinals);
self.rebuild_input(input);
}
self.properties.prepare(request);
self.check_scan_limit().map_err(BoxedError::new)?;

View File

@@ -310,6 +310,18 @@ impl SeriesScan {
Ok(())
}
fn rebuild_input(&mut self, input: ScanInput) {
// IMPORTANT: `SeriesScan` keeps extra runtime state (`receivers`) on top of the shared
// `StreamContext`/`Pruner` pair. Once the file set changes, all of them must be reset
// together so the distributor does not keep using channels from the old partition layout.
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
self.properties.partitions = vec![stream_ctx.partition_ranges()];
let num_workers = common_stat::get_total_cpu_cores().max(1);
self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
self.stream_ctx = stream_ctx;
self.receivers = Mutex::new(Vec::new());
}
}
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
@@ -350,6 +362,19 @@ impl RegionScanner for SeriesScan {
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
request.validate()?;
if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() {
// IMPORTANT: exclusion is effectively a new scan input for `SeriesScan`, so rebuild
// before applying the remaining prepare options.
let input = self
.stream_ctx
.input
.clone()
.with_excluded_file_ordinals(&excluded_file_ordinals);
self.rebuild_input(input);
}
self.properties.prepare(request);
self.check_scan_limit().map_err(BoxedError::new)?;

View File

@@ -452,6 +452,16 @@ impl UnorderedScan {
};
Ok(Box::pin(stream))
}
fn rebuild_input(&mut self, input: ScanInput) {
// IMPORTANT: file exclusion changes the range layout seen by this scanner, so the
// `StreamContext`, partition ranges, and `Pruner` must be rebuilt as one snapshot.
let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
self.properties.partitions = vec![stream_ctx.partition_ranges()];
let num_workers = common_stat::get_total_cpu_cores().max(1);
self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
self.stream_ctx = stream_ctx;
}
}
impl RegionScanner for UnorderedScan {
@@ -472,6 +482,19 @@ impl RegionScanner for UnorderedScan {
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
request.validate()?;
if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() {
// IMPORTANT: rebuild first so later property updates apply to the new input shape,
// not the stale one.
let input = self
.stream_ctx
.input
.clone()
.with_excluded_file_ordinals(&excluded_file_ordinals);
self.rebuild_input(input);
}
self.properties.prepare(request);
Ok(())

View File

@@ -97,6 +97,9 @@ impl AggregateStats {
return Ok(Transformed::no(plan));
}
// Subtask 03 only adds the scan-side exclusion plumbing. The optimizer must not
// exclude stats-covered files until subtask 04 also materializes their
// stats-derived partial state and merges it back into the aggregate result.
Ok(Transformed::no(plan))
})?
.data;

View File

@@ -76,6 +76,53 @@ pub(super) type FieldCountFileSplit = FileSplit<usize>;
pub(super) type TimeFileSplit = FileSplit<TimeBounds>;
pub(super) type FieldMinMaxFileSplit = FileSplit<ValueBounds>;
fn stats_file_ordinals(aggregate: &StatsAgg, scan_input_stats: &RegionScanStats) -> Vec<usize> {
match aggregate {
StatsAgg::CountStar => split_count_star_files(scan_input_stats).stats_file_ordinals,
StatsAgg::CountField { column_name, .. } => {
split_count_field_files(scan_input_stats, column_name).stats_file_ordinals
}
StatsAgg::CountTimeIndex { .. }
| StatsAgg::MinTimeIndex { .. }
| StatsAgg::MaxTimeIndex { .. } => split_time_files(scan_input_stats).stats_file_ordinals,
StatsAgg::MinField { column_name, .. } | StatsAgg::MaxField { column_name, .. } => {
split_min_max_field_files(scan_input_stats, column_name).stats_file_ordinals
}
}
}
/// Returns file ordinals that every aggregate in the list can answer from stats.
#[allow(dead_code)]
pub(super) fn common_stats_file_ordinals(
aggregates: &[StatsAgg],
scan_input_stats: &RegionScanStats,
) -> Vec<usize> {
let Some(first) = aggregates.first() else {
return Vec::new();
};
let mut common = stats_file_ordinals(first, scan_input_stats)
.into_iter()
.collect::<std::collections::BTreeSet<_>>();
for aggregate in &aggregates[1..] {
let ordinals = stats_file_ordinals(aggregate, scan_input_stats)
.into_iter()
.collect::<std::collections::BTreeSet<_>>();
common.retain(|ordinal| ordinals.contains(ordinal));
}
scan_input_stats
.files
.iter()
.filter_map(|file| {
common
.contains(&file.file_ordinal)
.then_some(file.file_ordinal)
})
.collect()
}
pub(super) trait StatsAggExt {
fn has_stats_files(&self, scan_input_stats: &RegionScanStats) -> bool;
}

View File

@@ -42,8 +42,9 @@ use table::test_util::EmptyTable;
use super::StatsAgg;
use super::check::{RejectReason, RewriteCheck, is_supported_aggregate_name};
use super::split::{
FileStatsRequirement, StatsAggExt, has_partition_expr_mismatch, partial_state_from_stats,
split_count_field_files, split_count_star_files, split_min_max_field_files, split_time_files,
FileStatsRequirement, StatsAggExt, common_stats_file_ordinals, has_partition_expr_mismatch,
partial_state_from_stats, split_count_field_files, split_count_star_files,
split_min_max_field_files, split_time_files,
};
use crate::parser::QueryLanguageParser;
use crate::tests::new_query_engine_with_table;
@@ -556,6 +557,81 @@ fn test_split_min_max_field_files() {
assert_eq!(split.stats.max, Some(Value::Int64(9)));
}
#[test]
fn test_common_stats_file_ordinals_intersects_supported_aggregates() {
let stats = RegionScanInputStats {
files: vec![
RegionScanFileInputStats {
file_ordinal: 0,
exact_num_rows: Some(3),
time_range: Some((test_timestamp(10), test_timestamp(20))),
field_stats: field_stats(Some(2), Some(Value::Int64(4)), Some(Value::Int64(9))),
partition_expr_matches_region: true,
},
RegionScanFileInputStats {
file_ordinal: 1,
exact_num_rows: Some(4),
time_range: Some((test_timestamp(30), test_timestamp(40))),
field_stats: HashMap::new(),
partition_expr_matches_region: true,
},
RegionScanFileInputStats {
file_ordinal: 2,
exact_num_rows: Some(5),
time_range: Some((test_timestamp(50), test_timestamp(60))),
field_stats: field_stats(Some(4), Some(Value::Int64(1)), Some(Value::Int64(7))),
partition_expr_matches_region: true,
},
],
};
let aggregates = vec![
StatsAgg::CountStar,
StatsAgg::CountField {
column_name: "value".to_string(),
arg_type: DataType::Int64,
},
StatsAgg::MaxField {
column_name: "value".to_string(),
arg_type: DataType::Int64,
},
];
assert_eq!(common_stats_file_ordinals(&aggregates, &stats), vec![0, 2]);
}
#[test]
fn test_common_stats_file_ordinals_returns_only_shared_stats_eligible_files() {
let stats = RegionScanInputStats {
files: vec![
RegionScanFileInputStats {
file_ordinal: 0,
exact_num_rows: Some(3),
time_range: Some((test_timestamp(10), test_timestamp(20))),
field_stats: field_stats(Some(2), Some(Value::Int64(4)), Some(Value::Int64(9))),
partition_expr_matches_region: true,
},
RegionScanFileInputStats {
file_ordinal: 1,
exact_num_rows: Some(4),
time_range: Some((test_timestamp(30), test_timestamp(40))),
field_stats: HashMap::new(),
partition_expr_matches_region: true,
},
],
};
let aggregates = vec![
StatsAgg::CountStar,
StatsAgg::CountField {
column_name: "value".to_string(),
arg_type: DataType::Int64,
},
];
assert_eq!(common_stats_file_ordinals(&aggregates, &stats), vec![0]);
}
#[test]
fn test_partial_state_from_stats_count_star() {
let aggregate = StatsAgg::CountStar;

View File

@@ -22,7 +22,8 @@ use std::sync::{Arc, Mutex};
use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream};
use common_time::Timestamp;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -388,6 +389,13 @@ pub struct PrepareRequest {
pub distinguish_partition_range: Option<bool>,
/// The expected number of target partitions.
pub target_partitions: Option<usize>,
/// SST file ordinals to exclude from the current scan input.
///
/// IMPORTANT: this is not a lightweight property toggle. Scanner implementations
/// may need to rebuild their `ScanInput`-derived runtime state (`StreamContext`,
/// `Pruner`, partition ranges, and similar caches) before applying other prepare
/// settings.
pub excluded_file_ordinals: Option<Vec<usize>>,
}
impl PrepareRequest {
@@ -408,6 +416,24 @@ impl PrepareRequest {
self.target_partitions = Some(target_partitions);
self
}
/// Sets SST file ordinals to exclude from the current scan input.
pub fn with_excluded_file_ordinals(mut self, excluded_file_ordinals: Vec<usize>) -> Self {
self.excluded_file_ordinals = Some(excluded_file_ordinals);
self
}
pub fn validate(&self) -> Result<(), BoxedError> {
if self.ranges.is_some() && self.excluded_file_ordinals.is_some() {
return Err(BoxedError::new(PlainError::new(
"PrepareRequest does not allow mixing ranges with excluded_file_ordinals"
.to_string(),
StatusCode::InvalidArguments,
)));
}
Ok(())
}
}
/// Necessary context of the query for the scanner.
@@ -433,9 +459,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Returns the metadata of the region.
fn metadata(&self) -> RegionMetadataRef;
/// Prepares the scanner with the given partition ranges.
/// Prepares the scanner with planner-side overrides.
///
/// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
/// IMPORTANT: some requests only tweak properties, but others (such as
/// `excluded_file_ordinals`) require the scanner to rebuild any runtime state derived
/// from the current `ScanInput` before applying the rest of the request.
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
/// Scans the partition and returns a stream of record batches.
@@ -474,6 +502,32 @@ pub type RegionScannerRef = Box<dyn RegionScanner>;
pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
#[cfg(test)]
mod tests {
use common_time::Timestamp;
use super::{PartitionRange, PrepareRequest};
#[test]
fn test_prepare_request_rejects_ranges_and_excluded_file_ordinals_together() {
let err = PrepareRequest::default()
.with_ranges(vec![vec![PartitionRange {
start: Timestamp::new_millisecond(0),
end: Timestamp::new_millisecond(1),
num_rows: 1,
identifier: 0,
}]])
.with_excluded_file_ordinals(vec![1])
.validate()
.unwrap_err();
assert!(
err.to_string()
.contains("does not allow mixing ranges with excluded_file_ordinals")
);
}
}
/// Represents the statistics of a region.
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct RegionStatistic {

View File

@@ -304,6 +304,32 @@ impl RegionScanExec {
})
}
pub fn with_excluded_file_ordinals(
&self,
excluded_file_ordinals: Vec<usize>,
) -> Result<Self, BoxedError> {
{
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(
PrepareRequest::default().with_excluded_file_ordinals(excluded_file_ordinals),
)?;
}
Ok(Self {
scanner: self.scanner.clone(),
arrow_schema: self.arrow_schema.clone(),
output_ordering: self.output_ordering.clone(),
metric: self.metric.clone(),
properties: self.properties.clone(),
append_mode: self.append_mode,
total_rows: self.total_rows,
is_partition_set: self.is_partition_set,
distribution: self.distribution,
explain_verbose: self.explain_verbose,
query_memory_permit: self.query_memory_permit.clone(),
})
}
pub fn distribution(&self) -> Option<TimeSeriesDistribution> {
self.distribution
}