From 08c66ab00b3268760e97cec255f36b18cd2b5c99 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 24 Apr 2026 16:34:30 +0800 Subject: [PATCH] feat: region scan skip&rewrite Signed-off-by: discord9 --- src/common/query/src/aggr_stats.rs | 64 +-- src/mito2/src/engine/partition_filter_test.rs | 92 +++- src/mito2/src/read/pruner.rs | 135 +++++- src/mito2/src/read/scan_region.rs | 242 +++++++++- src/mito2/src/read/seq_scan.rs | 11 +- src/mito2/src/read/series_scan.rs | 6 +- src/mito2/src/read/unordered_scan.rs | 6 +- src/query/src/optimizer/aggr_stats.rs | 432 +++++++++++++++++- .../src/optimizer/aggr_stats/stat_scan.rs | 93 +++- .../src/optimizer/aggr_stats/support_aggr.rs | 43 +- src/store-api/src/region_engine.rs | 38 +- src/table/src/table/scan.rs | 101 +++- 12 files changed, 1131 insertions(+), 132 deletions(-) diff --git a/src/common/query/src/aggr_stats.rs b/src/common/query/src/aggr_stats.rs index e7e07a21bc..07f0f071a2 100644 --- a/src/common/query/src/aggr_stats.rs +++ b/src/common/query/src/aggr_stats.rs @@ -18,67 +18,9 @@ use std::collections::{HashMap, HashSet}; use datafusion::parquet::file::statistics::Statistics as ParquetStats; use datafusion::scalar::ScalarValue; use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::utils::COUNT_STAR_EXPANSION; -use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_expr::aggregate::AggregateFunctionExpr; -use datafusion_physical_expr::expressions::{Column as PhysicalColumn, Literal}; use datatypes::schema::SchemaRef as RegionSchemaRef; use datatypes::value::Value; -use store_api::region_engine::FileStatsItem; - -/// Runtime requirement that has already been approved by optimizer rewrite checks. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum SupportStatAggr { - CountRows, - CountNonNull { column_name: String }, - MinValue { column_name: String }, - MaxValue { column_name: String }, -} - -impl SupportStatAggr { - pub fn is_count_supported_expr(inputs: &[std::sync::Arc]) -> bool { - match inputs { - [] => true, - [arg] if let Some(lit) = arg.as_any().downcast_ref::() => { - lit.value() == &COUNT_STAR_EXPANSION - } - [arg] => arg.as_any().downcast_ref::().is_some(), - _ => false, - } - } - - pub fn is_min_max_supported_expr(inputs: &[std::sync::Arc]) -> bool { - match inputs { - [arg] => arg.as_any().downcast_ref::().is_some(), - _ => false, - } - } - - pub fn from_aggr_expr(aggr: &AggregateFunctionExpr) -> Option { - match (aggr.fun().name(), aggr.expressions().as_slice()) { - ("count", []) => Some(Self::CountRows), - ("count", [arg]) if arg.as_any().downcast_ref::().is_some() => { - Some(Self::CountRows) - } - ("count", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { - Some(Self::CountNonNull { - column_name: col.name().to_string(), - }) - } - ("min", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { - Some(Self::MinValue { - column_name: col.name().to_string(), - }) - } - ("max", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { - Some(Self::MaxValue { - column_name: col.name().to_string(), - }) - } - _ => None, - } - } -} +use store_api::region_engine::{FileStatsItem, SupportStatAggr}; #[derive(Debug, Clone, Default, PartialEq)] pub struct FileColumnStats { @@ -219,6 +161,10 @@ fn collect_one_column_stats( } fn sum_null_counts(file_stats: &FileStatsItem, column_index: usize) -> Result> { + if file_stats.row_groups.is_empty() { + return Ok(None); + } + let mut total = 0_u64; for row_group in &file_stats.row_groups { let Some(stats) = row_group.metadata.column(column_index).statistics() else { diff --git a/src/mito2/src/engine/partition_filter_test.rs b/src/mito2/src/engine/partition_filter_test.rs index ff247d0a21..cbf1f42740 100644 --- a/src/mito2/src/engine/partition_filter_test.rs +++ b/src/mito2/src/engine/partition_filter_test.rs @@ -18,7 +18,7 @@ use api::v1::Rows; use common_recordbatch::RecordBatches; use datatypes::value::Value; use partition::expr::col; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, SupportStatAggr}; use store_api::region_request::{ EnterStagingRequest, RegionFlushRequest, RegionRequest, StagingPartitionDirective, }; @@ -172,3 +172,93 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) { flat_format ); } + +#[tokio::test] +async fn test_stats_aware_skip_requirements_skip_eligible_sst() { + test_stats_aware_skip_requirements_skip_eligible_sst_with_format(false).await; + test_stats_aware_skip_requirements_skip_eligible_sst_with_format(true).await; +} + +async fn test_stats_aware_skip_requirements_skip_eligible_sst_with_format(flat_format: bool) { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1025, 0); + let partition_expr = range_expr_string("field_0", 0., 99.); + let request = CreateRequestBuilder::new() + .partition_expr_json(Some(partition_expr)) + .build(); + let column_schemas = rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let flushed_rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, flushed_rows).await; + engine + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + let memtable_rows = Rows { + schema: column_schemas, + rows: build_rows(3, 5), + }; + put_rows(&engine, region_id, memtable_rows).await; + + let request = ScanRequest { + projection_input: Some(vec![1].into()), + ..Default::default() + }; + + let baseline_scanner = engine.scanner(region_id, request.clone()).await.unwrap(); + let baseline_batches = RecordBatches::try_collect(baseline_scanner.scan().await.unwrap()) + .await + .unwrap(); + assert_eq!( + r#"+---------+ +| field_0 | ++---------+ +| 0.0 | +| 1.0 | +| 2.0 | +| 3.0 | +| 4.0 | ++---------+"#, + baseline_batches.pretty_print().unwrap() + ); + + let mut skip_scanner = engine.scanner(region_id, request).await.unwrap(); + skip_scanner.set_stats_aware_skip_requirements(vec![SupportStatAggr::CountRows]); + let skipped_batches = RecordBatches::try_collect(skip_scanner.scan().await.unwrap()) + .await + .unwrap(); + + assert_eq!( + r#"+---------+ +| field_0 | ++---------+ +| 3.0 | +| 4.0 | ++---------+"#, + skipped_batches.pretty_print().unwrap(), + "stats-aware skip should drop the flushed SST rows and only leave memtable rows" + ); +} diff --git a/src/mito2/src/read/pruner.rs b/src/mito2/src/read/pruner.rs index 320144484d..2dfb41a482 100644 --- a/src/mito2/src/read/pruner.rs +++ b/src/mito2/src/read/pruner.rs @@ -22,7 +22,7 @@ use std::time::Instant; use common_telemetry::debug; use smallvec::SmallVec; use snafu::ResultExt; -use store_api::region_engine::PartitionRange; +use store_api::region_engine::{PartitionRange, ScannerProperties, SupportStatAggr}; use store_api::storage::FileId; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; @@ -38,6 +38,33 @@ use crate::sst::parquet::reader::ReaderMetrics; /// Number of files to pre-fetch ahead of the current position. const PREFETCH_COUNT: usize = 8; +#[derive(Clone)] +pub(crate) struct StatsAwareSkipConfig { + requirements: Arc<[SupportStatAggr]>, +} + +impl StatsAwareSkipConfig { + fn new(requirements: Vec) -> Option { + if requirements.is_empty() { + None + } else { + Some(Self { + requirements: requirements.into(), + }) + } + } + + pub(crate) fn requirements(&self) -> &[SupportStatAggr] { + &self.requirements + } +} + +pub(crate) fn stats_aware_skip_config( + properties: &ScannerProperties, +) -> Option { + StatsAwareSkipConfig::new(properties.stats_aware_skip_requirements().to_vec()) +} + /// Local pruner in a partition that supports prefetching files to prune. pub struct PartitionPruner { pruner: Arc, @@ -47,11 +74,16 @@ pub struct PartitionPruner { pre_filter_modes: Vec, /// Current position for tracking pre-fetch progress. current_position: AtomicUsize, + stats_aware_skip: Option, } impl PartitionPruner { /// Creates a new `PartitionPruner` for the given partition ranges. - pub fn new(pruner: Arc, partition_ranges: &[PartitionRange]) -> Self { + pub fn new( + pruner: Arc, + partition_ranges: &[PartitionRange], + stats_aware_skip: Option, + ) -> Self { let num_files = pruner.inner.stream_ctx.input.num_files(); let mut file_indices = Vec::with_capacity(num_files); let mut pre_filter_modes = vec![PreFilterMode::SkipFields; num_files]; @@ -84,6 +116,7 @@ impl PartitionPruner { file_indices, pre_filter_modes, current_position: AtomicUsize::new(0), + stats_aware_skip, } } @@ -103,7 +136,13 @@ impl PartitionPruner { // Delegate to underlying Pruner let ranges = self .pruner - .build_file_ranges(index, pre_filter_mode, partition_metrics, reader_metrics) + .build_file_ranges( + index, + pre_filter_mode, + partition_metrics, + reader_metrics, + self.stats_aware_skip.as_ref(), + ) .await?; // Find position and trigger pre-fetch for upcoming files @@ -129,6 +168,7 @@ impl PartitionPruner { file_index, pre_filter_mode, Some(partition_metrics.clone()), + self.stats_aware_skip.clone(), ); } } @@ -161,6 +201,8 @@ struct PrunerInner { struct FileBuilderEntry { /// Cached builder after pruning. None if not yet built or already cleared. builder: Option>, + /// Stats-aware skip requirements used to build the cached builder. + stats_aware_skip_requirements: Option>, /// Number of remaining ranges to scan for this file. /// When this reaches 0, the builder is dropped for memory cleanup. remaining_ranges: usize, @@ -168,6 +210,27 @@ struct FileBuilderEntry { waiters: Vec>>>, } +impl FileBuilderEntry { + fn clear_builder_if_skip_requirements_changed( + &mut self, + stats_aware_skip: Option<&StatsAwareSkipConfig>, + ) { + if self.builder.is_some() && !self.matches_skip_requirements(stats_aware_skip) { + self.builder = None; + self.stats_aware_skip_requirements = None; + PRUNER_ACTIVE_BUILDERS.dec(); + } + } + + fn matches_skip_requirements(&self, stats_aware_skip: Option<&StatsAwareSkipConfig>) -> bool { + match (&self.stats_aware_skip_requirements, stats_aware_skip) { + (None, None) => true, + (Some(cached), Some(current)) => cached.as_ref() == current.requirements(), + _ => false, + } + } +} + /// Request to prune a file. struct PruneRequest { /// Index of the file in ScanInput.files. @@ -178,6 +241,8 @@ struct PruneRequest { response_tx: Option>>>, /// Partition metrics for merging reader metrics. partition_metrics: Option, + /// Optional stats-aware skip config for this request. + stats_aware_skip: Option, } impl Pruner { @@ -191,6 +256,7 @@ impl Pruner { .map(|_| { Mutex::new(FileBuilderEntry { builder: None, + stats_aware_skip_requirements: None, remaining_ranges: 0, waiters: Vec::new(), }) @@ -254,6 +320,7 @@ impl Pruner { pre_filter_mode: PreFilterMode, partition_metrics: &PartitionMetrics, reader_metrics: &mut ReaderMetrics, + stats_aware_skip: Option<&StatsAwareSkipConfig>, ) -> Result> { let file_index = index.index - self.inner.stream_ctx.input.num_memtables(); @@ -264,6 +331,7 @@ impl Pruner { pre_filter_mode, partition_metrics, reader_metrics, + stats_aware_skip, ) .await?; @@ -284,10 +352,12 @@ impl Pruner { pre_filter_mode: PreFilterMode, partition_metrics: &PartitionMetrics, reader_metrics: &mut ReaderMetrics, + stats_aware_skip: Option<&StatsAwareSkipConfig>, ) -> Result> { // Fast path: checks cache { - let entry = self.inner.file_entries[file_index].lock().unwrap(); + let mut entry = self.inner.file_entries[file_index].lock().unwrap(); + entry.clear_builder_if_skip_requirements_changed(stats_aware_skip); if let Some(builder) = &entry.builder { reader_metrics.filter_metrics.pruner_cache_hit += 1; return Ok(builder.clone()); @@ -306,13 +376,19 @@ impl Pruner { pre_filter_mode, response_tx: Some(response_tx), partition_metrics: Some(partition_metrics.clone()), + stats_aware_skip: stats_aware_skip.cloned(), }; let result = if self.worker_senders[worker_idx].send(request).await.is_err() { common_telemetry::warn!("Worker channel closed, falling back to direct pruning"); // Worker channel closed, falls back to direct pruning - self.prune_file_directly(file_index, pre_filter_mode, reader_metrics) - .await + self.prune_file_directly( + file_index, + pre_filter_mode, + reader_metrics, + stats_aware_skip, + ) + .await } else { // Waits for response match response_rx.await { @@ -322,8 +398,13 @@ impl Pruner { "Response channel closed, falling back to direct pruning" ); // Channel closed, falls back to direct pruning - self.prune_file_directly(file_index, pre_filter_mode, reader_metrics) - .await + self.prune_file_directly( + file_index, + pre_filter_mode, + reader_metrics, + stats_aware_skip, + ) + .await } } }; @@ -337,10 +418,12 @@ impl Pruner { file_index: usize, pre_filter_mode: PreFilterMode, partition_metrics: Option, + stats_aware_skip: Option, ) { // Fast path: checks cache { - let entry = self.inner.file_entries[file_index].lock().unwrap(); + let mut entry = self.inner.file_entries[file_index].lock().unwrap(); + entry.clear_builder_if_skip_requirements_changed(stats_aware_skip.as_ref()); if entry.builder.is_some() { return; } @@ -355,6 +438,7 @@ impl Pruner { pre_filter_mode, response_tx: None, partition_metrics, + stats_aware_skip, }; // Sends request to worker @@ -373,13 +457,19 @@ impl Pruner { file_index: usize, pre_filter_mode: PreFilterMode, reader_metrics: &mut ReaderMetrics, + stats_aware_skip: Option<&StatsAwareSkipConfig>, ) -> Result> { let file = &self.inner.stream_ctx.input.files[file_index]; let builder = self .inner .stream_ctx .input - .prune_file(file, pre_filter_mode, reader_metrics) + .prune_file( + file, + pre_filter_mode, + reader_metrics, + stats_aware_skip.map(StatsAwareSkipConfig::requirements), + ) .await?; let arc_builder = Arc::new(builder); @@ -391,6 +481,8 @@ impl Pruner { reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize; reader_metrics.num_range_builders += 1; entry.builder = Some(arc_builder.clone()); + entry.stats_aware_skip_requirements = + stats_aware_skip.map(|config| config.requirements.clone()); PRUNER_ACTIVE_BUILDERS.inc(); } } @@ -406,6 +498,7 @@ impl Pruner { if entry.remaining_ranges == 0 && let Some(builder) = entry.builder.take() { + entry.stats_aware_skip_requirements = None; PRUNER_ACTIVE_BUILDERS.dec(); reader_metrics.metadata_mem_size -= builder.memory_size() as isize; reader_metrics.num_range_builders -= 1; @@ -428,11 +521,13 @@ impl Pruner { pre_filter_mode, response_tx, partition_metrics, + stats_aware_skip, } = request; // Check if already cached or in-progress { - let entry = inner.file_entries[file_index].lock().unwrap(); + let mut entry = inner.file_entries[file_index].lock().unwrap(); + entry.clear_builder_if_skip_requirements_changed(stats_aware_skip.as_ref()); if let Some(builder) = &entry.builder { // Cache hit - send immediately if let Some(response_tx) = response_tx { @@ -451,7 +546,14 @@ impl Pruner { let result = inner .stream_ctx .input - .prune_file(file, pre_filter_mode, &mut metrics) + .prune_file( + file, + pre_filter_mode, + &mut metrics, + stats_aware_skip + .as_ref() + .map(StatsAwareSkipConfig::requirements), + ) .await; // Update state and notify waiters @@ -459,8 +561,13 @@ impl Pruner { match result { Ok(builder) => { let arc_builder = Arc::new(builder); - entry.builder = Some(arc_builder.clone()); - PRUNER_ACTIVE_BUILDERS.inc(); + if entry.builder.is_none() { + entry.builder = Some(arc_builder.clone()); + entry.stats_aware_skip_requirements = stats_aware_skip + .as_ref() + .map(|config| config.requirements.clone()); + PRUNER_ACTIVE_BUILDERS.inc(); + } // Notify all waiters for waiter in entry.waiters.drain(..) { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 107a36eb26..213f01c323 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -22,7 +22,9 @@ use std::time::Instant; use api::v1::SemanticType; use async_stream::try_stream; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; +use common_query::aggr_stats::StatsCandidateFile; use common_recordbatch::SendableRecordBatchStream; use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::tracing::Instrument; @@ -33,13 +35,14 @@ use datafusion_common::Column; use datafusion_expr::Expr; use datafusion_expr::utils::expr_to_columns; use futures::StreamExt; +use parquet::file::metadata::ParquetMetaData; use partition::expr::PartitionExpr; use smallvec::SmallVec; use snafu::{OptionExt as _, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{ FileStatsItem, PartitionRange, QueryScanContext, RegionScannerRef, RowGroupStatsItem, - SendableFileStatsStream, + SendableFileStatsStream, SupportStatAggr, }; use store_api::storage::{ ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution, @@ -52,7 +55,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::CacheStrategy; use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES; -use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result}; +use crate::error::{ExternalSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, Result}; #[cfg(feature = "enterprise")] use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider}; use crate::memtable::{MemtableRange, RangesOptions}; @@ -171,6 +174,17 @@ impl Scanner { Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(), } } + + pub(crate) fn set_stats_aware_skip_requirements(&mut self, requirements: Vec) { + use store_api::region_engine::{PrepareRequest, RegionScanner}; + + let request = PrepareRequest::default().with_stats_aware_skip_requirements(requirements); + match self { + Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(), + Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(), + Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(), + } + } } #[cfg_attr(doc, aquamarine::aquamarine)] @@ -1088,6 +1102,29 @@ impl ScanInput { } } + fn should_skip_file_by_stats( + &self, + file: &FileHandle, + parquet_meta: &ParquetMetaData, + requirements: &[SupportStatAggr], + ) -> Result { + let file_stats = build_file_stats_item(file, parquet_meta)?; + let region_partition_expr = self.region_metadata().partition_expr.clone(); + let candidate = StatsCandidateFile::from_file_stats( + &file_stats, + region_partition_expr.as_deref(), + requirements, + &self.region_metadata().schema, + ) + .map_err(|error| { + BoxedError::new(PlainError::new(error.to_string(), StatusCode::Unexpected)) + }) + .context(ExternalSnafu { + context: "failed to classify file stats for stats-aware skip mode", + })?; + Ok(candidate.is_some()) + } + /// Prunes a file to scan and returns the builder to build readers. #[tracing::instrument( skip_all, @@ -1101,7 +1138,23 @@ impl ScanInput { file: &FileHandle, pre_filter_mode: PreFilterMode, reader_metrics: &mut ReaderMetrics, + stats_aware_skip_requirements: Option<&[SupportStatAggr]>, ) -> Result { + if let Some(requirements) = + stats_aware_skip_requirements.filter(|requirements| !requirements.is_empty()) + { + let sst_meta = self + .access_layer + .read_sst(file.clone()) + .cache(self.cache_strategy.clone()) + .expected_metadata(Some(self.region_metadata().clone())) + .read_sst_meta() + .await?; + if self.should_skip_file_by_stats(file, &sst_meta.parquet_metadata(), requirements)? { + return Ok(FileRangeBuilder::default()); + } + } + let predicate = self.predicate_for_file(file); let decode_pk_values = !self.compaction && self @@ -1454,6 +1507,40 @@ pub struct StreamContext { pub(crate) query_start: Instant, } +fn build_file_stats_item( + file: &FileHandle, + parquet_meta: &ParquetMetaData, +) -> Result { + let row_groups = parquet_meta + .row_groups() + .iter() + .enumerate() + .map(|(row_group_index, metadata)| RowGroupStatsItem { + row_group_index, + metadata: Arc::new(metadata.clone()), + }) + .collect(); + + let file_partition_expr = file + .meta_ref() + .partition_expr + .as_ref() + .map(|expr| expr.as_json_str()) + .transpose() + .map_err(|error| { + BoxedError::new(PlainError::new(error.to_string(), StatusCode::Unexpected)) + }) + .context(ExternalSnafu { + context: "failed to serialize file partition expr for stats-aware skip mode", + })?; + + Ok(FileStatsItem { + num_rows: Some(parquet_meta.file_metadata().num_rows() as u64), + file_partition_expr, + row_groups, + }) +} + pub(crate) fn scan_input_stats( input: &ScanInput, ctx: &QueryScanContext, @@ -1481,22 +1568,8 @@ pub(crate) fn scan_input_stats( .read_sst_meta() .await .map_err(BoxedError::new)?; - let parquet_meta = sst_meta.parquet_metadata(); - let row_groups = parquet_meta - .row_groups() - .iter() - .enumerate() - .map(|(row_group_index, metadata)| RowGroupStatsItem { - row_group_index, - metadata: Arc::new(metadata.clone()), - }) - .collect(); - - yield FileStatsItem { - num_rows: Some(parquet_meta.file_metadata().num_rows() as u64), - file_partition_expr: file.meta_ref().partition_expr.as_ref().map(ToString::to_string), - row_groups, - }; + yield build_file_stats_item(&file, &sst_meta.parquet_metadata()) + .map_err(BoxedError::new)?; } }) } @@ -1842,11 +1915,22 @@ impl PredicateGroup { mod tests { use std::sync::Arc; + use api::v1::SemanticType; + use bytes::Bytes; + use datafusion::parquet::arrow::ArrowWriter; + use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use datafusion::parquet::file::metadata::ParquetMetaData; + use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_plan::expressions::lit as physical_lit; use datafusion_common::ScalarValue; use datafusion_expr::{col, lit}; + use datatypes::arrow::array::StringArray; + use datatypes::arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use datatypes::arrow::record_batch::RecordBatch as ArrowRecordBatch; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; use datatypes::value::Value; - use partition::expr::col as partition_col; + use partition::expr::{PartitionExpr, col as partition_col}; use store_api::metadata::RegionMetadataBuilder; use store_api::storage::{ ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector, @@ -1857,6 +1941,8 @@ mod tests { use crate::memtable::time_partition::TimePartitions; use crate::read::range_cache::ScanRequestFingerprintBuilder; use crate::region::version::VersionBuilder; + use crate::sst::file::{FileHandle, FileMeta}; + use crate::sst::file_purger::NoopFilePurger; use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key}; use crate::test_util::scheduler_util::SchedulerEnv; @@ -1889,6 +1975,62 @@ mod tests { .with_files(vec![file]) } + fn metadata_with_partition_expr(partition_expr_json: String) -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(store_api::storage::RegionId::new(1, 1)); + builder + .push_column_metadata(store_api::metadata::ColumnMetadata { + column_schema: ColumnSchema::new("host", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(store_api::metadata::ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![1]) + .partition_expr_json(Some(partition_expr_json)); + Arc::new(builder.build_without_validation().unwrap()) + } + + fn file_with_partition_expr(partition_expr_json: &str) -> FileHandle { + let mut file_meta = FileMeta::default(); + file_meta.partition_expr = PartitionExpr::from_json_str(partition_expr_json).unwrap(); + file_meta.num_row_groups = 1; + FileHandle::new(file_meta, Arc::new(NoopFilePurger)) + } + + fn parquet_metadata_for_hosts(hosts: &[Option<&str>]) -> Arc { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "host", + DataType::Utf8, + true, + )])); + let mut buffer = std::io::Cursor::new(Vec::new()); + let mut writer = ArrowWriter::try_new( + &mut buffer, + arrow_schema.clone(), + Some(WriterProperties::builder().build()), + ) + .unwrap(); + let batch = ArrowRecordBatch::try_new( + arrow_schema, + vec![Arc::new(StringArray::from(hosts.to_vec()))], + ) + .unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer.into_inner())) + .unwrap() + .metadata() + .clone() + } + #[tokio::test] async fn test_build_read_column_ids_includes_filters() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); @@ -2133,4 +2275,64 @@ mod tests { assert_eq!(expected_mode, input.range_pre_filter_mode(source_count)); } } + + #[tokio::test] + async fn test_should_skip_file_by_stats_when_shared_classifier_accepts_file() { + let partition_expr_json = partition_col("host") + .eq(Value::String("foo".into())) + .as_json_str() + .unwrap(); + let metadata = metadata_with_partition_expr(partition_expr_json.clone()); + let env = SchedulerEnv::new().await; + let mapper = FlatProjectionMapper::new(&metadata, [0, 1].into_iter()).unwrap(); + let predicate = + PredicateGroup::new(metadata.as_ref(), &[col("host").eq(lit("foo"))]).unwrap(); + let file = file_with_partition_expr(&partition_expr_json); + let input = ScanInput::new(env.access_layer.clone(), mapper) + .with_predicate(predicate) + .with_files(vec![file.clone()]); + let parquet_meta = parquet_metadata_for_hosts(&[Some("bar"), Some("foo")]); + + let should_skip = input + .should_skip_file_by_stats( + &file, + &parquet_meta, + &[SupportStatAggr::MaxValue { + column_name: "host".to_string(), + }], + ) + .unwrap(); + + assert!(should_skip); + } + + #[tokio::test] + async fn test_should_not_skip_file_by_stats_when_required_stats_are_missing() { + let partition_expr_json = partition_col("host") + .eq(Value::String("foo".into())) + .as_json_str() + .unwrap(); + let metadata = metadata_with_partition_expr(partition_expr_json.clone()); + let env = SchedulerEnv::new().await; + let mapper = FlatProjectionMapper::new(&metadata, [0, 1].into_iter()).unwrap(); + let predicate = + PredicateGroup::new(metadata.as_ref(), &[col("host").eq(lit("foo"))]).unwrap(); + let file = file_with_partition_expr(&partition_expr_json); + let input = ScanInput::new(env.access_layer.clone(), mapper) + .with_predicate(predicate) + .with_files(vec![file.clone()]); + let parquet_meta = parquet_metadata_for_hosts(&[None, None]); + + let should_skip = input + .should_skip_file_by_stats( + &file, + &parquet_meta, + &[SupportStatAggr::MaxValue { + column_name: "host".to_string(), + }], + ) + .unwrap(); + + assert!(!should_skip); + } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 1b082afd7d..417399b840 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -40,7 +40,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu}; use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeReader; use crate::read::last_row::FlatLastRowReader; -use crate::read::pruner::{PartitionPruner, Pruner}; +use crate::read::pruner::{PartitionPruner, Pruner, stats_aware_skip_config}; use crate::read::range::RangeMeta; use crate::read::range_cache::{ build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream, @@ -158,7 +158,7 @@ impl SeqScan { pruner: Arc, ) -> Result { pruner.add_partition_ranges(partition_ranges); - let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges)); + let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges, None)); let mut sources = Vec::new(); for part_range in partition_ranges { @@ -385,12 +385,17 @@ impl SeqScan { let compaction = self.stream_ctx.input.compaction; let file_scan_semaphore = if compaction { None } else { semaphore.clone() }; let pruner = self.pruner.clone(); + let stats_aware_skip = stats_aware_skip_config(&self.properties); // Initializes ref counts for the pruner. // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream, // then the ref count won't be decremented. // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache. pruner.add_partition_ranges(&partition_ranges); - let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges)); + let partition_pruner = Arc::new(PartitionPruner::new( + pruner, + &partition_ranges, + stats_aware_skip, + )); let stream = try_stream! { part_metrics.on_first_poll(); diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 1135ecd055..81ceef6a66 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -46,7 +46,7 @@ use crate::error::{ ScanSeriesSnafu, TooManyFilesToReadSnafu, }; use crate::read::ScannerMetrics; -use crate::read::pruner::{PartitionPruner, Pruner}; +use crate::read::pruner::{PartitionPruner, Pruner, stats_aware_skip_config}; use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size, @@ -232,6 +232,7 @@ impl SeriesScan { final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))), partitions: self.properties.partitions.clone(), pruner: self.pruner.clone(), + stats_aware_skip: stats_aware_skip_config(&self.properties), senders, metrics_set: metrics_set.clone(), metrics_list: metrics_list.clone(), @@ -441,6 +442,8 @@ struct SeriesDistributor { partitions: Vec>, /// Shared pruner for file range building. pruner: Arc, + /// Optional stats-aware skip config for aggregate-stats runtime execution. + stats_aware_skip: Option, /// Senders of all partitions. senders: SenderList, /// Metrics set to report. @@ -484,6 +487,7 @@ impl SeriesDistributor { let partition_pruner = Arc::new(PartitionPruner::new( self.pruner.clone(), &all_partition_ranges, + self.stats_aware_skip.clone(), )); let part_metrics = new_partition_metrics( diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index fe09a525e5..f9992ef751 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -34,7 +34,7 @@ use store_api::region_engine::{ }; use crate::error::{PartitionOutOfRangeSnafu, Result}; -use crate::read::pruner::{PartitionPruner, Pruner}; +use crate::read::pruner::{PartitionPruner, Pruner, stats_aware_skip_config}; use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges, @@ -249,12 +249,14 @@ impl UnorderedScan { let stream_ctx = self.stream_ctx.clone(); let part_ranges = self.properties.partitions[partition].clone(); let pruner = self.pruner.clone(); + let stats_aware_skip = stats_aware_skip_config(&self.properties); // Initializes ref counts for the pruner. // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream, // then the ref count won't be decremented. // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache. pruner.add_partition_ranges(&part_ranges); - let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges)); + let partition_pruner = + Arc::new(PartitionPruner::new(pruner, &part_ranges, stats_aware_skip)); let stream = try_stream! { part_metrics.on_first_poll(); diff --git a/src/query/src/optimizer/aggr_stats.rs b/src/query/src/optimizer/aggr_stats.rs index 7558584b7a..6560ecb4a6 100644 --- a/src/query/src/optimizer/aggr_stats.rs +++ b/src/query/src/optimizer/aggr_stats.rs @@ -19,11 +19,15 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion_common::Result; +use datafusion::physical_plan::union::UnionExec; use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{DataFusionError, Result}; use table::table::scan::RegionScanExec; -use crate::optimizer::aggr_stats::support_aggr::SupportStatAggr; +use crate::optimizer::aggr_stats::stat_scan::StatsScanExec; +use crate::optimizer::aggr_stats::support_aggr::{ + SupportStatAggr, support_stat_aggr_from_aggr_expr, +}; pub(crate) mod stat_scan; pub(crate) mod support_aggr; @@ -56,11 +60,10 @@ impl PhysicalOptimizerRule for AggrStatsPhysicalRule { impl AggrStatsPhysicalRule { fn rewrite_plan_shape(plan: Arc) -> Result> { plan.transform_down(|plan| { - let Some(_rewrite_target) = RewriteTarget::extract(&plan) else { + let Some(rewrite_target) = RewriteTarget::extract(&plan) else { return Ok(Transformed::no(plan)); }; - // impl rewrite in RewriteTarget - Ok(Transformed::no(plan)) + rewrite_target.rewrite().map(Transformed::yes) }) .map(|res| res.data) } @@ -134,7 +137,9 @@ impl<'a> RewriteTarget<'a> { let aggr_exprs = aggregate_exec .aggr_expr() .iter() - .map(|aggr_expr| SupportStatAggr::from_aggr_expr(aggr_expr)) + .map(|aggr_expr| { + support_stat_aggr_from_aggr_expr(aggr_expr.as_ref(), ®ion_scan.time_index()) + }) .try_collect()?; let zelf = Self::FinalOverPartial { final_exec: aggregate_exec, @@ -150,6 +155,58 @@ impl<'a> RewriteTarget<'a> { } } + fn rewrite(&self) -> Result> { + match self { + Self::FinalOverPartial { + final_exec, + partial_exec, + region_scan, + keep_coalesce, + aggr_exprs, + } => { + let requirements = aggr_exprs.clone(); + let stats_scan = Arc::new(StatsScanExec::new( + partial_exec.schema(), + requirements.clone(), + region_scan.scanner(), + )); + + let fallback_scan = Arc::new( + region_scan + .with_stats_aware_skip_requirements(requirements) + .map_err(|error| DataFusionError::External(error.into()))?, + ); + let fallback_partial = Arc::new(AggregateExec::try_new( + *partial_exec.mode(), + Arc::new(partial_exec.group_expr().clone()), + partial_exec.aggr_expr().to_vec(), + partial_exec.filter_expr().to_vec(), + fallback_scan, + partial_exec.input_schema(), + )?); + + let union = UnionExec::try_new(vec![stats_scan, fallback_partial])?; + let merge_input: Arc = + if *keep_coalesce || union.properties().partitioning.partition_count() > 1 { + Arc::new(CoalescePartitionsExec::new(union)) + } else { + union + }; + + let final_aggregate = AggregateExec::try_new( + *final_exec.mode(), + Arc::new(final_exec.group_expr().clone()), + final_exec.aggr_expr().to_vec(), + final_exec.filter_expr().to_vec(), + merge_input, + final_exec.input_schema(), + )?; + + Ok(Arc::new(final_aggregate)) + } + } + } + fn first_stage_aggregate(&self) -> &'a AggregateExec { match self { RewriteTarget::FinalOverPartial { partial_exec, .. } => partial_exec, @@ -162,3 +219,366 @@ impl<'a> RewriteTarget<'a> { } } } + +#[cfg(test)] +mod tests { + use api::v1::SemanticType; + use common_recordbatch::EmptyRecordBatchStream; + use datafusion::functions_aggregate::average::avg_udaf; + use datafusion::functions_aggregate::count::count_udaf; + use datafusion::physical_plan::aggregates::PhysicalGroupBy; + use datafusion::scalar::ScalarValue; + use datafusion_expr::utils::COUNT_STAR_EXPANSION; + use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; + use datafusion_physical_expr::expressions::{Column as PhysicalColumn, Literal}; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::region_engine::SinglePartitionScanner; + use store_api::storage::{RegionId, ScanRequest}; + + use super::*; + + fn build_count_expr(schema: arrow_schema::SchemaRef) -> Arc { + Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![Arc::new(PhysicalColumn::new("v0", 0))]) + .schema(schema) + .alias("count(v0)") + .build() + .unwrap(), + ) + } + + fn build_count_time_index_expr(schema: arrow_schema::SchemaRef) -> Arc { + Arc::new( + AggregateExprBuilder::new(count_udaf(), vec![Arc::new(PhysicalColumn::new("ts", 1))]) + .schema(schema) + .alias("count(ts)") + .build() + .unwrap(), + ) + } + + fn build_count_star_expr(schema: arrow_schema::SchemaRef) -> Arc { + Arc::new( + AggregateExprBuilder::new( + count_udaf(), + vec![Arc::new(Literal::new(COUNT_STAR_EXPANSION.clone()))], + ) + .schema(schema) + .alias("count(*)") + .build() + .unwrap(), + ) + } + + fn build_count_null_expr(schema: arrow_schema::SchemaRef) -> Arc { + Arc::new( + AggregateExprBuilder::new( + count_udaf(), + vec![Arc::new(Literal::new(ScalarValue::Null))], + ) + .schema(schema) + .alias("count(NULL)") + .build() + .unwrap(), + ) + } + + fn build_avg_expr(schema: arrow_schema::SchemaRef) -> Arc { + Arc::new( + AggregateExprBuilder::new(avg_udaf(), vec![Arc::new(PhysicalColumn::new("v0", 0))]) + .schema(schema) + .alias("avg(v0)") + .build() + .unwrap(), + ) + } + + fn group_by_v0() -> PhysicalGroupBy { + PhysicalGroupBy::new_single(vec![( + Arc::new(PhysicalColumn::new("v0", 0)), + "v0".to_string(), + )]) + } + + fn build_region_scan(append_mode: bool) -> Arc { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("v0", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ])); + let stream = Box::pin(EmptyRecordBatchStream::new(schema.clone())); + + let mut metadata_builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + metadata_builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("v0", ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![]); + let metadata = Arc::new(metadata_builder.build().unwrap()); + + let scanner = Box::new(SinglePartitionScanner::new( + stream, + append_mode, + metadata, + None, + )); + Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap()) + } + + fn build_final_over_partial_plan() -> Arc { + build_final_over_partial_plan_with(build_region_scan(true), build_count_expr, None) + } + + fn build_final_over_partial_plan_with( + region_scan: Arc, + build_aggr_expr: fn(arrow_schema::SchemaRef) -> Arc, + group_by: Option, + ) -> Arc { + let input_schema = region_scan.schema(); + let aggr_expr = build_aggr_expr(input_schema.clone()); + let group_by = group_by.unwrap_or_default(); + + let partial = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + group_by.clone(), + vec![aggr_expr.clone()], + vec![None], + region_scan, + input_schema.clone(), + ) + .unwrap(), + ); + + let coalesce = Arc::new(CoalescePartitionsExec::new(partial)); + Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + group_by, + vec![aggr_expr], + vec![None], + coalesce, + input_schema, + ) + .unwrap(), + ) + } + + #[test] + fn rewrite_builds_stats_scan_union_with_stats_aware_fallback() { + let plan = build_final_over_partial_plan(); + let optimized = AggrStatsPhysicalRule + .optimize(plan, &ConfigOptions::default()) + .unwrap(); + + let final_exec = optimized.as_any().downcast_ref::().unwrap(); + assert!(matches!(final_exec.mode(), AggregateMode::Final)); + + let coalesce = final_exec + .input() + .as_any() + .downcast_ref::() + .unwrap(); + let union = coalesce + .input() + .as_any() + .downcast_ref::() + .unwrap(); + let union_children = union.children(); + assert_eq!(union_children.len(), 2); + + let stats_scan = union_children[0] + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + stats_scan.requirements(), + &[SupportStatAggr::CountNonNull { + column_name: "v0".to_string(), + }] + ); + + let fallback_partial = union_children[1] + .as_any() + .downcast_ref::() + .unwrap(); + assert!(matches!(fallback_partial.mode(), AggregateMode::Partial)); + + let fallback_scan = fallback_partial + .input() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + fallback_scan.stats_aware_skip_requirements(), + &[SupportStatAggr::CountNonNull { + column_name: "v0".to_string(), + }] + ); + } + + #[test] + fn rewrite_ignores_unsupported_avg_aggregate() { + let plan = + build_final_over_partial_plan_with(build_region_scan(true), build_avg_expr, None); + let optimized = AggrStatsPhysicalRule + .optimize(plan, &ConfigOptions::default()) + .unwrap(); + + assert_final_over_partial_without_union(&optimized); + } + + #[test] + fn rewrite_ignores_grouped_aggregate() { + let plan = build_final_over_partial_plan_with( + build_region_scan(true), + build_count_expr, + Some(group_by_v0()), + ); + let optimized = AggrStatsPhysicalRule + .optimize(plan, &ConfigOptions::default()) + .unwrap(); + + assert_final_over_partial_without_union(&optimized); + } + + #[test] + fn rewrite_ignores_non_append_region_scan() { + let plan = + build_final_over_partial_plan_with(build_region_scan(false), build_count_expr, None); + let optimized = AggrStatsPhysicalRule + .optimize(plan, &ConfigOptions::default()) + .unwrap(); + + assert_final_over_partial_without_union(&optimized); + } + + #[test] + fn rewrite_maps_count_star_to_count_rows() { + let optimized = AggrStatsPhysicalRule + .optimize( + build_final_over_partial_plan_with( + build_region_scan(true), + build_count_star_expr, + None, + ), + &ConfigOptions::default(), + ) + .unwrap(); + + assert_rewritten_stats_requirement(&optimized, &[SupportStatAggr::CountRows]); + } + + #[test] + fn rewrite_maps_count_time_index_to_count_rows() { + let optimized = AggrStatsPhysicalRule + .optimize( + build_final_over_partial_plan_with( + build_region_scan(true), + build_count_time_index_expr, + None, + ), + &ConfigOptions::default(), + ) + .unwrap(); + + assert_rewritten_stats_requirement(&optimized, &[SupportStatAggr::CountRows]); + } + + #[test] + fn rewrite_ignores_count_null_literal() { + let plan = build_final_over_partial_plan_with( + build_region_scan(true), + build_count_null_expr, + None, + ); + let optimized = AggrStatsPhysicalRule + .optimize(plan, &ConfigOptions::default()) + .unwrap(); + + assert_final_over_partial_without_union(&optimized); + } + + fn assert_rewritten_stats_requirement( + plan: &Arc, + expected: &[SupportStatAggr], + ) { + let final_exec = plan.as_any().downcast_ref::().unwrap(); + let coalesce = final_exec + .input() + .as_any() + .downcast_ref::() + .unwrap(); + let union = coalesce + .input() + .as_any() + .downcast_ref::() + .unwrap(); + let union_children = union.children(); + let stats_scan = union_children[0] + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(stats_scan.requirements(), expected); + + let fallback_partial = union_children[1] + .as_any() + .downcast_ref::() + .unwrap(); + let fallback_scan = fallback_partial + .input() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(fallback_scan.stats_aware_skip_requirements(), expected); + } + + fn assert_final_over_partial_without_union(plan: &Arc) { + let final_exec = plan.as_any().downcast_ref::().unwrap(); + assert!(matches!(final_exec.mode(), AggregateMode::Final)); + + let coalesce = final_exec + .input() + .as_any() + .downcast_ref::() + .unwrap(); + assert!( + coalesce + .input() + .as_any() + .downcast_ref::() + .is_none() + ); + + let partial_exec = coalesce + .input() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(matches!(partial_exec.mode(), AggregateMode::Partial)); + + let region_scan = partial_exec + .input() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(region_scan.stats_aware_skip_requirements().is_empty()); + } +} diff --git a/src/query/src/optimizer/aggr_stats/stat_scan.rs b/src/query/src/optimizer/aggr_stats/stat_scan.rs index 5676fb9487..51f24ebe91 100644 --- a/src/query/src/optimizer/aggr_stats/stat_scan.rs +++ b/src/query/src/optimizer/aggr_stats/stat_scan.rs @@ -320,7 +320,6 @@ mod tests { use api::v1::SemanticType; use bytes::Bytes; - use common_query::aggr_stats::SupportStatAggr; use datafusion::functions_aggregate::average::avg_udaf; use datafusion::functions_aggregate::count::count_udaf; use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf}; @@ -335,6 +334,7 @@ mod tests { use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::region_engine::{ FileStatsItem, RowGroupStatsItem, ScannerProperties, SendableFileStatsStream, + SupportStatAggr, }; use store_api::storage::RegionId; @@ -802,4 +802,95 @@ mod tests { .unwrap(); assert_eq!(max_values.value(0), 9); } + + #[tokio::test] + async fn stats_scan_exec_emits_no_batches_when_all_files_fallback() { + let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field( + "count_state", + "count[count]", + DataType::Int64, + false, + )])); + let region_metadata = build_region_metadata(Some("host = 'a'")); + let scanner = StaticStatsScanner { + schema: region_metadata.schema.clone(), + metadata: region_metadata, + properties: ScannerProperties::default(), + files: vec![ + FileStatsItem { + num_rows: Some(5), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: vec![], + }, + FileStatsItem { + num_rows: Some(5), + file_partition_expr: Some("host = 'b'".to_string()), + row_groups: build_row_groups(&[vec![Some(1), Some(2), Some(3)]]), + }, + ], + }; + + let exec = StatsScanExec::new( + schema, + vec![SupportStatAggr::CountNonNull { + column_name: "value".to_string(), + }], + Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)), + ); + + let stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap(); + let batches = stream.map(|batch| batch.unwrap()).collect::>().await; + + assert!(batches.is_empty()); + } + + #[tokio::test] + async fn stats_scan_exec_count_rows_uses_file_num_rows_without_row_groups() { + let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field( + "count_state", + "count[count]", + DataType::Int64, + false, + )])); + let region_metadata = build_region_metadata(Some("host = 'a'")); + let scanner = StaticStatsScanner { + schema: region_metadata.schema.clone(), + metadata: region_metadata, + properties: ScannerProperties::default(), + files: vec![ + FileStatsItem { + num_rows: Some(7), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: vec![], + }, + FileStatsItem { + num_rows: Some(3), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: vec![], + }, + ], + }; + + let exec = StatsScanExec::new( + schema, + vec![SupportStatAggr::CountRows], + Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)), + ); + + let batch = collect_single_batch(&exec).await; + assert_eq!(batch.num_rows(), 2); + + let count_state = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let count_values = count_state + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(count_values.value(0), 7); + assert_eq!(count_values.value(1), 3); + } } diff --git a/src/query/src/optimizer/aggr_stats/support_aggr.rs b/src/query/src/optimizer/aggr_stats/support_aggr.rs index 32b542847b..5cdd4660b7 100644 --- a/src/query/src/optimizer/aggr_stats/support_aggr.rs +++ b/src/query/src/optimizer/aggr_stats/support_aggr.rs @@ -12,4 +12,45 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use common_query::aggr_stats::SupportStatAggr; +use datafusion_expr::utils::COUNT_STAR_EXPANSION; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; +use datafusion_physical_expr::expressions::{Column as PhysicalColumn, Literal}; +pub use store_api::region_engine::SupportStatAggr; + +pub fn support_stat_aggr_from_aggr_expr( + aggr: &AggregateFunctionExpr, + time_index_column: &str, +) -> Option { + match (aggr.fun().name(), aggr.expressions().as_slice()) { + ("count", []) => Some(SupportStatAggr::CountRows), + ("count", [arg]) + if arg + .as_any() + .downcast_ref::() + .is_some_and(|lit| lit.value() == &COUNT_STAR_EXPANSION) => + { + Some(SupportStatAggr::CountRows) + } + ("count", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { + if col.name() == time_index_column { + Some(SupportStatAggr::CountRows) + } else { + Some(SupportStatAggr::CountNonNull { + column_name: col.name().to_string(), + }) + } + } + ("min", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { + Some(SupportStatAggr::MinValue { + column_name: col.name().to_string(), + }) + } + ("max", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { + Some(SupportStatAggr::MaxValue { + column_name: col.name().to_string(), + }) + } + _ => None, + } +} diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 2063a6882d..5c70a21a3a 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -317,8 +317,17 @@ pub struct ScannerProperties { /// Whether the scanner is scanning a logical region. logical_region: bool, - /// Whether stats-aware skip mode is enabled for aggregate-stats runtime execution. - stats_aware_skip_mode: bool, + /// Optimizer-approved aggregate-stats requirements used by stats-aware skip. + stats_aware_skip_requirements: Vec, +} + +/// Aggregate-stats requirement forwarded to scanner prepare / scan paths. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum SupportStatAggr { + CountRows, + CountNonNull { column_name: String }, + MinValue { column_name: String }, + MaxValue { column_name: String }, } impl ScannerProperties { @@ -343,7 +352,7 @@ impl ScannerProperties { distinguish_partition_range: false, target_partitions: 0, logical_region: false, - stats_aware_skip_mode: false, + stats_aware_skip_requirements: Vec::new(), } } @@ -358,8 +367,8 @@ impl ScannerProperties { if let Some(target_partitions) = request.target_partitions { self.target_partitions = target_partitions; } - if let Some(stats_aware_skip_mode) = request.stats_aware_skip_mode { - self.stats_aware_skip_mode = stats_aware_skip_mode; + if let Some(stats_aware_skip_requirements) = request.stats_aware_skip_requirements { + self.stats_aware_skip_requirements = stats_aware_skip_requirements; } } @@ -376,9 +385,9 @@ impl ScannerProperties { self.total_rows } - /// Returns whether stats-aware skip mode is enabled. - pub fn stats_aware_skip_mode(&self) -> bool { - self.stats_aware_skip_mode + /// Returns aggregate-stats requirements attached to stats-aware skip. + pub fn stats_aware_skip_requirements(&self) -> &[SupportStatAggr] { + &self.stats_aware_skip_requirements } /// Returns whether the scanner is scanning a logical region. @@ -410,8 +419,8 @@ pub struct PrepareRequest { pub distinguish_partition_range: Option, /// The expected number of target partitions. pub target_partitions: Option, - /// Whether to enable stats-aware skip mode on the scanner. - pub stats_aware_skip_mode: Option, + /// Optimizer-approved aggregate-stats requirements for stats-aware skip. + pub stats_aware_skip_requirements: Option>, } impl PrepareRequest { @@ -433,9 +442,12 @@ impl PrepareRequest { self } - /// Sets the stats-aware skip mode flag. - pub fn with_stats_aware_skip_mode(mut self, stats_aware_skip_mode: bool) -> Self { - self.stats_aware_skip_mode = Some(stats_aware_skip_mode); + /// Sets optimizer-approved aggregate-stats requirements for stats-aware skip. + pub fn with_stats_aware_skip_requirements( + mut self, + stats_aware_skip_requirements: Vec, + ) -> Self { + self.stats_aware_skip_requirements = Some(stats_aware_skip_requirements); self } } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index b6cbc8069b..85ed2a9c56 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -46,7 +46,7 @@ use datatypes::compute::SortOptions; use futures::{Stream, StreamExt}; use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::region_engine::{ - PartitionRange, PrepareRequest, QueryScanContext, RegionScannerRef, + PartitionRange, PrepareRequest, QueryScanContext, RegionScannerRef, SupportStatAggr, }; use store_api::storage::{ScanRequest, TimeSeriesDistribution}; @@ -66,7 +66,7 @@ pub struct RegionScanExec { is_partition_set: bool, // TODO(ruihang): handle TimeWindowed dist via this parameter distribution: Option, - stats_aware_skip_mode: bool, + stats_aware_skip_requirements: Vec, explain_verbose: bool, query_memory_tracker: Option, } @@ -83,7 +83,10 @@ impl std::fmt::Debug for RegionScanExec { .field("total_rows", &self.total_rows) .field("is_partition_set", &self.is_partition_set) .field("distribution", &self.distribution) - .field("stats_aware_skip_mode", &self.stats_aware_skip_mode) + .field( + "stats_aware_skip_requirements", + &self.stats_aware_skip_requirements, + ) .field("explain_verbose", &self.explain_verbose) .finish() } @@ -227,7 +230,7 @@ impl RegionScanExec { total_rows, is_partition_set: false, distribution: request.distribution, - stats_aware_skip_mode: false, + stats_aware_skip_requirements: Vec::new(), explain_verbose: false, query_memory_tracker, }) @@ -305,20 +308,21 @@ impl RegionScanExec { total_rows: self.total_rows, is_partition_set: true, distribution: self.distribution, - stats_aware_skip_mode: self.stats_aware_skip_mode, + stats_aware_skip_requirements: self.stats_aware_skip_requirements.clone(), explain_verbose: self.explain_verbose, query_memory_tracker: self.query_memory_tracker.clone(), }) } - pub fn with_stats_aware_skip_mode( + pub fn with_stats_aware_skip_requirements( &self, - stats_aware_skip_mode: bool, + stats_aware_skip_requirements: Vec, ) -> Result { { let mut scanner = self.scanner.lock().unwrap(); scanner.prepare( - PrepareRequest::default().with_stats_aware_skip_mode(stats_aware_skip_mode), + PrepareRequest::default() + .with_stats_aware_skip_requirements(stats_aware_skip_requirements.clone()), )?; } @@ -332,14 +336,14 @@ impl RegionScanExec { total_rows: self.total_rows, is_partition_set: self.is_partition_set, distribution: self.distribution, - stats_aware_skip_mode, + stats_aware_skip_requirements, explain_verbose: self.explain_verbose, query_memory_tracker: self.query_memory_tracker.clone(), }) } - pub fn stats_aware_skip_mode(&self) -> bool { - self.stats_aware_skip_mode + pub fn stats_aware_skip_requirements(&self) -> &[SupportStatAggr] { + &self.stats_aware_skip_requirements } pub fn append_mode(&self) -> bool { @@ -667,4 +671,79 @@ mod test { let result = plan.execute(0, ctx.task_ctx()); assert!(result.is_ok()); } + + #[tokio::test] + async fn test_region_scan_exec_records_stats_aware_skip_requirements() { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new( + "b", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ])); + let batch = RecordBatch::new( + schema.clone(), + vec![ + Arc::new(Int32Vector::from_slice([1, 2])) as _, + Arc::new(TimestampMillisecondVector::from_slice([1000, 2000])) as _, + ], + ) + .unwrap(); + let stream = RecordBatches::try_new(schema, vec![batch]) + .unwrap() + .as_stream(); + + let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "b", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![1]); + let region_metadata = Arc::new(builder.build().unwrap()); + + let scanner = Box::new(SinglePartitionScanner::new( + stream, + false, + region_metadata, + None, + )); + let plan = RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap(); + let requirements = vec![SupportStatAggr::CountNonNull { + column_name: "a".to_string(), + }]; + + let plan = plan + .with_stats_aware_skip_requirements(requirements.clone()) + .unwrap(); + + assert_eq!( + plan.stats_aware_skip_requirements(), + requirements.as_slice() + ); + + { + let scanner = plan.scanner(); + let scanner = scanner.lock().unwrap(); + assert_eq!( + scanner.properties().stats_aware_skip_requirements(), + requirements.as_slice() + ); + } + + let result = plan.execute(0, ctx.task_ctx()); + assert!(result.is_ok()); + } }