diff --git a/Cargo.lock b/Cargo.lock index a40d5b7cdd..7404460abb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2633,6 +2633,7 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-expr", + "datafusion-physical-expr", "datatypes", "futures-util", "once_cell", @@ -12870,6 +12871,7 @@ dependencies = [ "itertools 0.14.0", "lazy_static", "num_enum 0.7.4", + "parquet", "prometheus 0.14.0", "prost 0.14.1", "serde", diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index 48328ea612..0e2afcf512 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -22,6 +22,7 @@ common-time.workspace = true datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true +datafusion-physical-expr.workspace = true datatypes.workspace = true once_cell.workspace = true serde.workspace = true diff --git a/src/common/query/src/aggr_stats.rs b/src/common/query/src/aggr_stats.rs new file mode 100644 index 0000000000..e7e07a21bc --- /dev/null +++ b/src/common/query/src/aggr_stats.rs @@ -0,0 +1,310 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; + +use datafusion::parquet::file::statistics::Statistics as ParquetStats; +use datafusion::scalar::ScalarValue; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::utils::COUNT_STAR_EXPANSION; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; +use datafusion_physical_expr::expressions::{Column as PhysicalColumn, Literal}; +use datatypes::schema::SchemaRef as RegionSchemaRef; +use datatypes::value::Value; +use store_api::region_engine::FileStatsItem; + +/// Runtime requirement that has already been approved by optimizer rewrite checks. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum SupportStatAggr { + CountRows, + CountNonNull { column_name: String }, + MinValue { column_name: String }, + MaxValue { column_name: String }, +} + +impl SupportStatAggr { + pub fn is_count_supported_expr(inputs: &[std::sync::Arc]) -> bool { + match inputs { + [] => true, + [arg] if let Some(lit) = arg.as_any().downcast_ref::() => { + lit.value() == &COUNT_STAR_EXPANSION + } + [arg] => arg.as_any().downcast_ref::().is_some(), + _ => false, + } + } + + pub fn is_min_max_supported_expr(inputs: &[std::sync::Arc]) -> bool { + match inputs { + [arg] => arg.as_any().downcast_ref::().is_some(), + _ => false, + } + } + + pub fn from_aggr_expr(aggr: &AggregateFunctionExpr) -> Option { + match (aggr.fun().name(), aggr.expressions().as_slice()) { + ("count", []) => Some(Self::CountRows), + ("count", [arg]) if arg.as_any().downcast_ref::().is_some() => { + Some(Self::CountRows) + } + ("count", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { + Some(Self::CountNonNull { + column_name: col.name().to_string(), + }) + } + ("min", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { + Some(Self::MinValue { + column_name: col.name().to_string(), + }) + } + ("max", [arg]) if let Some(col) = arg.as_any().downcast_ref::() => { + Some(Self::MaxValue { + column_name: col.name().to_string(), + }) + } + _ => None, + } + } +} + +#[derive(Debug, Clone, Default, PartialEq)] +pub struct FileColumnStats { + pub null_count: Option, + pub min_value: Option, + pub max_value: Option, +} + +#[derive(Debug, Clone, Default, PartialEq)] +pub struct StatsCandidateFile { + pub num_rows: Option, + pub column_stats: HashMap, +} + +impl StatsCandidateFile { + pub fn from_file_stats( + file_stats: &FileStatsItem, + region_partition_expr: Option<&str>, + requirements: &[SupportStatAggr], + region_schema: &RegionSchemaRef, + ) -> Result> { + let column_names = required_columns(requirements); + let column_stats = collect_column_stats(file_stats, region_schema, &column_names)?; + if !matches_partition_expr( + file_stats.file_partition_expr.as_deref(), + region_partition_expr, + ) { + return Ok(None); + } + + let candidate = Self { + num_rows: file_stats.num_rows, + column_stats, + }; + for requirement in requirements { + if candidate.stat_value(requirement)?.is_none() { + return Ok(None); + } + } + Ok(Some(candidate)) + } + + pub fn stat_value(&self, requirement: &SupportStatAggr) -> Result> { + match requirement { + SupportStatAggr::CountRows => self.num_rows.map(count_value).transpose(), + SupportStatAggr::CountNonNull { column_name } => { + let Some(column_stats) = self.column_stats.get(column_name) else { + return Ok(None); + }; + let Some(num_rows) = self.num_rows else { + return Ok(None); + }; + let Some(null_count) = column_stats.null_count else { + return Ok(None); + }; + let Some(non_null_count) = num_rows.checked_sub(null_count) else { + return Err(DataFusionError::Internal(format!( + "StatsScanExec found null_count > num_rows for column {}", + column_name + ))); + }; + count_value(non_null_count).map(Some) + } + SupportStatAggr::MinValue { column_name } => Ok(self + .column_stats + .get(column_name) + .and_then(|stats| stats.min_value.clone())), + SupportStatAggr::MaxValue { column_name } => Ok(self + .column_stats + .get(column_name) + .and_then(|stats| stats.max_value.clone())), + } + } +} + +fn count_value(value: u64) -> Result { + let value = i64::try_from(value).map_err(|_| { + DataFusionError::Internal(format!( + "StatsScanExec count state exceeds Int64 range: {}", + value + )) + })?; + Ok(Value::Int64(value)) +} + +fn matches_partition_expr( + file_partition_expr: Option<&str>, + region_partition_expr: Option<&str>, +) -> bool { + matches!( + (file_partition_expr, region_partition_expr), + (Some(file_expr), Some(region_expr)) if file_expr == region_expr + ) +} + +fn required_columns(requirements: &[SupportStatAggr]) -> HashSet { + requirements + .iter() + .filter_map(|requirement| match requirement { + SupportStatAggr::CountRows => None, + SupportStatAggr::CountNonNull { column_name } + | SupportStatAggr::MinValue { column_name } + | SupportStatAggr::MaxValue { column_name } => Some(column_name.clone()), + }) + .collect() +} + +fn collect_column_stats( + file_stats: &FileStatsItem, + region_schema: &RegionSchemaRef, + column_names: &HashSet, +) -> Result> { + column_names + .iter() + .map(|column_name| { + Ok(( + column_name.clone(), + collect_one_column_stats(file_stats, region_schema, column_name)?, + )) + }) + .collect() +} + +fn collect_one_column_stats( + file_stats: &FileStatsItem, + region_schema: &RegionSchemaRef, + column_name: &str, +) -> Result { + let Some(column_index) = region_schema.column_index_by_name(column_name) else { + return Ok(FileColumnStats::default()); + }; + + Ok(FileColumnStats { + null_count: sum_null_counts(file_stats, column_index)?, + min_value: best_row_group_value(file_stats, column_index, Ordering::Less)?, + max_value: best_row_group_value(file_stats, column_index, Ordering::Greater)?, + }) +} + +fn sum_null_counts(file_stats: &FileStatsItem, column_index: usize) -> Result> { + let mut total = 0_u64; + for row_group in &file_stats.row_groups { + let Some(stats) = row_group.metadata.column(column_index).statistics() else { + return Ok(None); + }; + let Some(value) = stats.null_count_opt() else { + return Ok(None); + }; + total = total.checked_add(value).ok_or_else(|| { + DataFusionError::Internal("StatsScanExec null-count overflow".to_string()) + })?; + } + Ok(Some(total)) +} + +fn best_row_group_value( + file_stats: &FileStatsItem, + column_index: usize, + target: Ordering, +) -> Result> { + let mut best = None; + + for row_group in &file_stats.row_groups { + let Some(stats) = row_group.metadata.column(column_index).statistics() else { + return Ok(None); + }; + let Some(scalar) = parquet_bound_scalar(stats, target) else { + return Ok(None); + }; + let value = Value::try_from(scalar).map_err(|error| { + DataFusionError::Internal(format!( + "StatsScanExec failed to convert row-group scalar: {}", + error + )) + })?; + let should_replace = best.as_ref().is_none_or(|current| { + value + .partial_cmp(current) + .is_some_and(|ordering| ordering == target) + }); + if should_replace { + best = Some(value); + } + } + + Ok(best) +} + +fn parquet_bound_scalar(stats: &ParquetStats, target: Ordering) -> Option { + let use_min = target == Ordering::Less; + + match stats { + ParquetStats::Boolean(stats) => Some(ScalarValue::Boolean(Some(if use_min { + *stats.min_opt()? + } else { + *stats.max_opt()? + }))), + ParquetStats::Int32(stats) => Some(ScalarValue::Int32(Some(if use_min { + *stats.min_opt()? + } else { + *stats.max_opt()? + }))), + ParquetStats::Int64(stats) => Some(ScalarValue::Int64(Some(if use_min { + *stats.min_opt()? + } else { + *stats.max_opt()? + }))), + ParquetStats::Int96(_) => None, + ParquetStats::Float(stats) => Some(ScalarValue::Float32(Some(if use_min { + *stats.min_opt()? + } else { + *stats.max_opt()? + }))), + ParquetStats::Double(stats) => Some(ScalarValue::Float64(Some(if use_min { + *stats.min_opt()? + } else { + *stats.max_opt()? + }))), + ParquetStats::ByteArray(stats) => { + let bytes = if use_min { + stats.min_bytes_opt()? + } else { + stats.max_bytes_opt()? + }; + Some(ScalarValue::Utf8(String::from_utf8(bytes.to_owned()).ok())) + } + ParquetStats::FixedLenByteArray(_) => None, + } +} diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 91a417d356..413840631b 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod aggr_stats; pub mod columnar_value; pub mod error; pub mod logical_plan; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 374a42144e..107a36eb26 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use std::time::Instant; use api::v1::SemanticType; +use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; use common_recordbatch::filter::SimpleFilterEvaluator; @@ -36,7 +37,10 @@ use partition::expr::PartitionExpr; use smallvec::SmallVec; use snafu::{OptionExt as _, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; -use store_api::region_engine::{PartitionRange, RegionScannerRef}; +use store_api::region_engine::{ + FileStatsItem, PartitionRange, QueryScanContext, RegionScannerRef, RowGroupStatsItem, + SendableFileStatsStream, +}; use store_api::storage::{ ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector, @@ -1450,6 +1454,53 @@ pub struct StreamContext { pub(crate) query_start: Instant, } +pub(crate) fn scan_input_stats( + input: &ScanInput, + ctx: &QueryScanContext, +) -> SendableFileStatsStream { + let access_layer = input.access_layer.clone(); + let cache_strategy = input.cache_strategy.clone(); + let region_metadata = input.region_metadata().clone(); + let files = input.files.clone(); + let explain_verbose = ctx.explain_verbose; + + Box::pin(try_stream! { + if explain_verbose { + common_telemetry::info!( + "Scan stats, region_id: {}, files: {}", + region_metadata.region_id, + files.len() + ); + } + + for file in files { + let sst_meta = access_layer + .read_sst(file.clone()) + .cache(cache_strategy.clone()) + .expected_metadata(Some(region_metadata.clone())) + .read_sst_meta() + .await + .map_err(BoxedError::new)?; + let parquet_meta = sst_meta.parquet_metadata(); + let row_groups = parquet_meta + .row_groups() + .iter() + .enumerate() + .map(|(row_group_index, metadata)| RowGroupStatsItem { + row_group_index, + metadata: Arc::new(metadata.clone()), + }) + .collect(); + + yield FileStatsItem { + num_rows: Some(parquet_meta.file_metadata().num_rows() as u64), + file_partition_expr: file.meta_ref().partition_expr.as_ref().map(ToString::to_string), + row_groups, + }; + } + }) +} + impl StreamContext { /// Creates a new [StreamContext] for [SeqScan]. pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 39bd0ce842..1b082afd7d 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -31,6 +31,7 @@ use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, + SendableFileStatsStream, }; use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; @@ -44,7 +45,7 @@ use crate::read::range::RangeMeta; use crate::read::range_cache::{ build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream, }; -use crate::read::scan_region::{ScanInput, StreamContext}; +use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size, scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge, @@ -545,6 +546,10 @@ impl RegionScanner for SeqScan { .map_err(BoxedError::new) } + fn scan_stats(&self, ctx: &QueryScanContext) -> Result { + Ok(scan_input_stats(&self.stream_ctx.input, ctx)) + } + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { self.properties.prepare(request); diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 003a754363..1135ecd055 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -35,6 +35,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, + SendableFileStatsStream, }; use tokio::sync::Semaphore; use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; @@ -46,7 +47,7 @@ use crate::error::{ }; use crate::read::ScannerMetrics; use crate::read::pruner::{PartitionPruner, Pruner}; -use crate::read::scan_region::{ScanInput, StreamContext}; +use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size, compute_parallel_channel_size, @@ -353,6 +354,10 @@ impl RegionScanner for SeriesScan { .map_err(BoxedError::new) } + fn scan_stats(&self, ctx: &QueryScanContext) -> Result { + Ok(scan_input_stats(&self.stream_ctx.input, ctx)) + } + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { self.properties.prepare(request); diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index cb6e850439..fe09a525e5 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -30,12 +30,12 @@ use futures::{Stream, StreamExt}; use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, + PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, SendableFileStatsStream, }; use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::pruner::{PartitionPruner, Pruner}; -use crate::read::scan_region::{ScanInput, StreamContext}; +use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges, }; @@ -331,6 +331,10 @@ impl RegionScanner for UnorderedScan { .map_err(BoxedError::new) } + fn scan_stats(&self, ctx: &QueryScanContext) -> Result { + Ok(scan_input_stats(&self.stream_ctx.input, ctx)) + } + /// If this scanner have predicate other than region partition exprs fn has_predicate_without_region(&self) -> bool { let predicate = self diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 6942c8223d..02d1911f1b 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -254,6 +254,21 @@ impl ParquetReaderBuilder { self } + pub(crate) async fn read_sst_meta(&self) -> Result> { + let file_path = self.file_handle.file_path(&self.table_dir, self.path_type); + let file_size = self.file_handle.meta_ref().file_size; + let mut cache_metrics = MetadataCacheMetrics::default(); + let (sst_meta, _) = self + .read_parquet_metadata( + &file_path, + file_size, + &mut cache_metrics, + self.page_index_policy, + ) + .await?; + Ok(sst_meta) + } + /// Sets the compaction flag. #[must_use] pub fn compaction(mut self, compaction: bool) -> Self { diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index e2e577debf..9690453e5c 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -650,6 +650,7 @@ mod tests { use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::region_engine::{ PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, + SendableFileStatsStream, }; use store_api::storage::{RegionId, ScanRequest}; use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable}; @@ -718,6 +719,13 @@ mod tests { Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) } + fn scan_stats( + &self, + _ctx: &QueryScanContext, + ) -> std::result::Result { + Ok(Box::pin(futures::stream::empty())) + } + fn has_predicate_without_region(&self) -> bool { true } diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index aaac1e3124..70ef24291f 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod aggr_stats; pub mod constant_term; pub mod count_nest_aggr; pub mod count_wildcard; diff --git a/src/query/src/optimizer/aggr_stats.rs b/src/query/src/optimizer/aggr_stats.rs new file mode 100644 index 0000000000..7558584b7a --- /dev/null +++ b/src/query/src/optimizer/aggr_stats.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datafusion::config::ConfigOptions; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_common::Result; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use table::table::scan::RegionScanExec; + +use crate::optimizer::aggr_stats::support_aggr::SupportStatAggr; + +pub(crate) mod stat_scan; +pub(crate) mod support_aggr; + +/// Physical optimizer scaffold for aggregate-stats runtime rewrite. +/// +/// This pass only owns query-shape eligibility and rewrite shape. +/// Runtime stats lookup/classification is handled during execution. +#[derive(Debug, Default)] +pub struct AggrStatsPhysicalRule; + +impl PhysicalOptimizerRule for AggrStatsPhysicalRule { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + Self::rewrite_plan_shape(plan) + } + + fn name(&self) -> &str { + "aggr_stats_physical" + } + + fn schema_check(&self) -> bool { + true + } +} + +impl AggrStatsPhysicalRule { + fn rewrite_plan_shape(plan: Arc) -> Result> { + plan.transform_down(|plan| { + let Some(_rewrite_target) = RewriteTarget::extract(&plan) else { + return Ok(Transformed::no(plan)); + }; + // impl rewrite in RewriteTarget + Ok(Transformed::no(plan)) + }) + .map(|res| res.data) + } +} + +/// TODO(discord9): support more kind of aggr +#[allow(unused)] +enum RewriteTarget<'a> { + FinalOverPartial { + final_exec: &'a AggregateExec, + partial_exec: &'a AggregateExec, + region_scan: &'a RegionScanExec, + keep_coalesce: bool, + aggr_exprs: Vec, + }, +} + +#[allow(unused)] +impl<'a> RewriteTarget<'a> { + fn is_supported_rewrite(&self) -> bool { + let Self::FinalOverPartial { + final_exec, + partial_exec, + region_scan, + .. + } = self; + + final_exec.group_expr().is_empty() + && final_exec + .aggr_expr() + .iter() + .all(|aggr| !aggr.is_distinct()) + && final_exec + .filter_expr() + .iter() + .all(|filter| filter.is_none()) + && partial_exec + .filter_expr() + .iter() + .all(|filter| filter.is_none()) + && region_scan.append_mode() + } + + #[allow(unused)] + fn extract(plan: &'a Arc) -> Option { + let aggregate_exec = plan.as_any().downcast_ref::()?; + + if !matches!(aggregate_exec.mode(), AggregateMode::Final) { + return None; + } + + let (input, keep_coalesce) = if let Some(coalesce) = aggregate_exec + .input() + .as_any() + .downcast_ref::( + ) { + (coalesce.input(), true) + } else { + (aggregate_exec.input(), false) + }; + + let partial_exec = input.as_any().downcast_ref::()?; + if !matches!(partial_exec.mode(), AggregateMode::Partial) { + return None; + } + + let region_scan = partial_exec + .input() + .as_any() + .downcast_ref::()?; + let aggr_exprs = aggregate_exec + .aggr_expr() + .iter() + .map(|aggr_expr| SupportStatAggr::from_aggr_expr(aggr_expr)) + .try_collect()?; + let zelf = Self::FinalOverPartial { + final_exec: aggregate_exec, + partial_exec, + region_scan, + keep_coalesce, + aggr_exprs, + }; + if zelf.is_supported_rewrite() { + Some(zelf) + } else { + None + } + } + + fn first_stage_aggregate(&self) -> &'a AggregateExec { + match self { + RewriteTarget::FinalOverPartial { partial_exec, .. } => partial_exec, + } + } + + fn region_scan(&self) -> &'a RegionScanExec { + match self { + RewriteTarget::FinalOverPartial { region_scan, .. } => region_scan, + } + } +} diff --git a/src/query/src/optimizer/aggr_stats/stat_scan.rs b/src/query/src/optimizer/aggr_stats/stat_scan.rs new file mode 100644 index 0000000000..5676fb9487 --- /dev/null +++ b/src/query/src/optimizer/aggr_stats/stat_scan.rs @@ -0,0 +1,805 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(dead_code)] + +use std::any::Any; +use std::sync::{Arc, Mutex}; + +use arrow_schema::{DataType, Field, SchemaRef}; +use async_stream::try_stream; +use common_query::aggr_stats::StatsCandidateFile; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, +}; +use datafusion::scalar::ScalarValue; +use datafusion_common::{DataFusionError, Result}; +use datafusion_physical_expr::EquivalenceProperties; +use datatypes::arrow::array::{ArrayRef, StructArray}; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::data_type::ConcreteDataType; +use futures::StreamExt; +use store_api::metadata::RegionMetadataRef; +use store_api::region_engine::{QueryScanContext, RegionScannerRef, SendableFileStatsStream}; + +use super::support_aggr::SupportStatAggr; + +fn build_state_row( + candidate: &StatsCandidateFile, + schema: &SchemaRef, + requirements: &[SupportStatAggr], +) -> Result> { + if schema.fields().len() != requirements.len() { + return Err(DataFusionError::Internal(format!( + "StatsScanExec schema/requirement mismatch: {} fields, {} requirements", + schema.fields().len(), + requirements.len() + ))); + } + + let mut state_row = Vec::with_capacity(requirements.len()); + for (field, requirement) in schema.fields().iter().zip(requirements) { + let state = build_state_scalar(candidate, field.as_ref(), requirement)?; + state_row.push(state); + } + + Ok(state_row) +} + +fn build_state_scalar( + candidate: &StatsCandidateFile, + field: &Field, + requirement: &SupportStatAggr, +) -> Result { + let DataType::Struct(state_fields) = field.data_type() else { + return Err(DataFusionError::Internal(format!( + "StatsScanExec expects struct state field, got {:?} for {}", + field.data_type(), + field.name() + ))); + }; + if state_fields.len() != 1 { + return Err(DataFusionError::Internal(format!( + "StatsScanExec only supports single-field state in v1, got {} fields for {}", + state_fields.len(), + field.name() + ))); + } + + let inner_field = state_fields[0].as_ref(); + let output_type = ConcreteDataType::from_arrow_type(inner_field.data_type()); + let Some(value) = candidate.stat_value(requirement)? else { + return Err(DataFusionError::Internal(format!( + "StatsScanExec built an ineligible stats candidate for requirement {:?}", + requirement + ))); + }; + + let scalar = value.try_to_scalar_value(&output_type).map_err(|error| { + DataFusionError::Internal(format!( + "StatsScanExec failed to convert state value for {}: {}", + field.name(), + error + )) + })?; + let state_array = scalar.to_array().map_err(|error| { + DataFusionError::Internal(format!( + "StatsScanExec failed to build state array for {}: {}", + field.name(), + error + )) + })?; + Ok(ScalarValue::Struct(Arc::new(StructArray::new( + state_fields.clone(), + vec![state_array], + None, + )))) +} + +fn build_batch_from_candidates( + schema: &SchemaRef, + requirements: &[SupportStatAggr], + files: &[StatsCandidateFile], +) -> Result> { + let mut columns = (0..requirements.len()) + .map(|_| Vec::::new()) + .collect::>(); + + for file in files { + let row = build_state_row(file, schema, requirements)?; + for (index, scalar) in row.into_iter().enumerate() { + columns[index].push(scalar); + } + } + + if columns.first().is_none_or(|column| column.is_empty()) { + return Ok(None); + } + + let arrays = columns + .into_iter() + .map(|values| { + ScalarValue::iter_to_array(values).map_err(|error| { + DataFusionError::Internal(format!( + "StatsScanExec failed to materialize state array: {}", + error + )) + }) + }) + .collect::>>()?; + + RecordBatch::try_new(schema.clone(), arrays) + .map(Some) + .map_err(|error| { + DataFusionError::Internal(format!( + "StatsScanExec failed to build record batch: {}", + error + )) + }) +} + +async fn build_batch_from_scan_stats( + schema: &SchemaRef, + requirements: &[SupportStatAggr], + region_metadata: &RegionMetadataRef, + mut scan_stats: SendableFileStatsStream, +) -> Result> { + let mut files = Vec::new(); + while let Some(file_stats) = scan_stats.next().await { + let file_stats = file_stats.map_err(|error| DataFusionError::External(error.into()))?; + let Some(candidate) = StatsCandidateFile::from_file_stats( + &file_stats, + region_metadata.partition_expr.as_deref(), + requirements, + ®ion_metadata.schema, + )? + else { + continue; + }; + files.push(candidate); + } + build_batch_from_candidates(schema, requirements, &files) +} + +/// Physical execution plan for runtime stats-backed partial aggregates. +/// +/// This node obtains scanner-owned file stats during `execute()` and materializes partial +/// aggregate state from those stats without doing optimizer-time I/O. +pub struct StatsScanExec { + schema: SchemaRef, + requirements: Vec, + scanner: Arc>, + properties: Arc, + metrics: ExecutionPlanMetricsSet, +} + +impl std::fmt::Debug for StatsScanExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StatsScanExec") + .field("schema", &self.schema) + .field("requirements", &self.requirements) + .finish() + } +} + +impl StatsScanExec { + pub fn new( + schema: SchemaRef, + requirements: Vec, + scanner: Arc>, + ) -> Self { + Self { + properties: Arc::new(PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + datafusion::physical_plan::Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + )), + schema, + requirements, + scanner, + metrics: ExecutionPlanMetricsSet::new(), + } + } + + pub fn requirements(&self) -> &[SupportStatAggr] { + &self.requirements + } +} + +impl DisplayAs for StatsScanExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "StatsScanExec: requirements={}", self.requirements.len()) + } + DisplayFormatType::TreeRender => write!(f, "StatsScanExec"), + } + } +} + +impl ExecutionPlan for StatsScanExec { + fn name(&self) -> &str { + "StatsScanExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &Arc { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if !children.is_empty() { + return Err(DataFusionError::Internal(format!( + "StatsScanExec expects no children, got {}", + children.len() + ))); + } + + Ok(Arc::new(Self { + schema: self.schema.clone(), + requirements: self.requirements.clone(), + scanner: self.scanner.clone(), + properties: self.properties.clone(), + metrics: self.metrics.clone(), + })) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + if partition != 0 { + return Err(DataFusionError::Execution(format!( + "StatsScanExec expects a single partition, got {}", + partition + ))); + } + + let schema = self.schema.clone(); + let requirements = self.requirements.clone(); + let (region_metadata, scan_stats) = { + let scanner = self.scanner.lock().unwrap(); + let region_metadata = scanner.metadata(); + let scan_stats = scanner + .scan_stats(&QueryScanContext::default()) + .map_err(|error| DataFusionError::External(error.into()))?; + (region_metadata, scan_stats) + }; + let stream = try_stream! { + if let Some(batch) = build_batch_from_scan_stats( + &schema, + &requirements, + ®ion_metadata, + scan_stats, + ) + .await? { + yield batch; + } + }; + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + Box::pin(stream), + ))) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use api::v1::SemanticType; + use bytes::Bytes; + use common_query::aggr_stats::SupportStatAggr; + use datafusion::functions_aggregate::average::avg_udaf; + use datafusion::functions_aggregate::count::count_udaf; + use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf}; + use datafusion::parquet::arrow::ArrowWriter; + use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use datafusion::parquet::file::properties::WriterProperties; + use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; + use datafusion_physical_expr::expressions::Column as PhysicalColumn; + use datatypes::arrow::array::Int64Array; + use datatypes::schema::ColumnSchema; + use futures::StreamExt; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; + use store_api::region_engine::{ + FileStatsItem, RowGroupStatsItem, ScannerProperties, SendableFileStatsStream, + }; + use store_api::storage::RegionId; + + use super::*; + + #[derive(Debug)] + struct StaticStatsScanner { + schema: datatypes::schema::SchemaRef, + metadata: RegionMetadataRef, + properties: ScannerProperties, + files: Vec, + } + + impl store_api::region_engine::RegionScanner for StaticStatsScanner { + fn name(&self) -> &str { + "StaticStatsScanner" + } + + fn properties(&self) -> &ScannerProperties { + &self.properties + } + + fn schema(&self) -> datatypes::schema::SchemaRef { + self.schema.clone() + } + + fn metadata(&self) -> RegionMetadataRef { + self.metadata.clone() + } + + fn prepare( + &mut self, + request: store_api::region_engine::PrepareRequest, + ) -> std::result::Result<(), common_error::ext::BoxedError> { + self.properties.prepare(request); + Ok(()) + } + + fn scan_partition( + &self, + _ctx: &QueryScanContext, + _metrics_set: &ExecutionPlanMetricsSet, + _partition: usize, + ) -> std::result::Result< + common_recordbatch::SendableRecordBatchStream, + common_error::ext::BoxedError, + > { + Ok(Box::pin(common_recordbatch::EmptyRecordBatchStream::new( + self.schema.clone(), + ))) + } + + fn scan_stats( + &self, + _ctx: &QueryScanContext, + ) -> std::result::Result { + Ok(Box::pin(futures::stream::iter( + self.files.clone().into_iter().map(Ok), + ))) + } + + fn has_predicate_without_region(&self) -> bool { + false + } + + fn add_dyn_filter_to_predicate( + &mut self, + filter_exprs: Vec>, + ) -> Vec { + vec![false; filter_exprs.len()] + } + + fn set_logical_region(&mut self, logical_region: bool) { + self.properties.set_logical_region(logical_region); + } + } + + impl datafusion::physical_plan::DisplayAs for StaticStatsScanner { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "StaticStatsScanner") + } + } + + fn single_state_field( + name: &str, + inner_name: &str, + inner_type: DataType, + inner_nullable: bool, + ) -> Field { + Field::new( + name, + DataType::Struct(vec![Field::new(inner_name, inner_type, inner_nullable)].into()), + true, + ) + } + + fn build_region_metadata(partition_expr: Option<&str>) -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "value", + datatypes::data_type::ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 1, + }); + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + datatypes::data_type::ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }); + + let mut metadata = builder.build_without_validation().unwrap(); + metadata.set_partition_expr(partition_expr.map(str::to_string)); + Arc::new(metadata) + } + + fn build_row_groups(chunks: &[Vec>]) -> Vec { + let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "value", + DataType::Int64, + true, + )])); + let mut buffer = Cursor::new(Vec::new()); + let props = WriterProperties::builder().build(); + let mut writer = + ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), Some(props)).unwrap(); + + for chunk in chunks { + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int64Array::from(chunk.clone()))], + ) + .unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + + let metadata = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer.into_inner())) + .unwrap() + .metadata() + .clone(); + metadata + .row_groups() + .iter() + .enumerate() + .map(|(row_group_index, metadata)| RowGroupStatsItem { + row_group_index, + metadata: Arc::new(metadata.clone()), + }) + .collect() + } + + fn build_datafusion_aggr_expr( + aggr: Arc, + alias: &str, + ) -> Arc { + Arc::new( + AggregateExprBuilder::new(aggr, vec![Arc::new(PhysicalColumn::new("value", 0))]) + .schema(Arc::new(arrow_schema::Schema::new(vec![Field::new( + "value", + DataType::Int64, + true, + )]))) + .alias(alias) + .build() + .unwrap(), + ) + } + + async fn collect_single_batch(exec: &StatsScanExec) -> RecordBatch { + let stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap(); + let batches = stream.map(|batch| batch.unwrap()).collect::>().await; + assert_eq!(batches.len(), 1); + batches.into_iter().next().unwrap() + } + + fn assert_struct_state_matches_field<'a>( + batch: &'a RecordBatch, + column_index: usize, + expected_field: &Field, + ) -> &'a StructArray { + let struct_array = batch + .column(column_index) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(struct_array.fields().len(), 1); + assert_eq!(struct_array.fields()[0].as_ref(), expected_field); + struct_array + } + + #[tokio::test] + async fn stats_scan_exec_matches_datafusion_count_state_field() { + let aggr_expr = build_datafusion_aggr_expr(count_udaf(), "count(value)"); + let state_fields = aggr_expr.state_fields().unwrap(); + assert_eq!(state_fields.len(), 1); + + let inner_field = state_fields[0].as_ref().clone(); + let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field( + "count_state", + inner_field.name(), + inner_field.data_type().clone(), + inner_field.is_nullable(), + )])); + let region_metadata = build_region_metadata(Some("host = 'a'")); + let scanner = StaticStatsScanner { + schema: region_metadata.schema.clone(), + metadata: region_metadata, + properties: ScannerProperties::default(), + files: vec![FileStatsItem { + num_rows: Some(5), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: build_row_groups(&[vec![Some(1), None, Some(9), Some(3), None]]), + }], + }; + + let exec = StatsScanExec::new( + schema, + vec![SupportStatAggr::CountNonNull { + column_name: "value".to_string(), + }], + Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)), + ); + + let batch = collect_single_batch(&exec).await; + + let struct_array = assert_struct_state_matches_field(&batch, 0, &inner_field); + let values = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(values.value(0), 3); + } + + #[tokio::test] + async fn stats_scan_exec_matches_datafusion_min_state_field() { + let aggr_expr = build_datafusion_aggr_expr(min_udaf(), "min(value)"); + let state_fields = aggr_expr.state_fields().unwrap(); + assert_eq!(state_fields.len(), 1); + + let inner_field = state_fields[0].as_ref().clone(); + let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field( + "min_state", + inner_field.name(), + inner_field.data_type().clone(), + inner_field.is_nullable(), + )])); + let region_metadata = build_region_metadata(Some("host = 'a'")); + let scanner = StaticStatsScanner { + schema: region_metadata.schema.clone(), + metadata: region_metadata, + properties: ScannerProperties::default(), + files: vec![FileStatsItem { + num_rows: Some(6), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: build_row_groups(&[ + vec![Some(4), Some(8), None], + vec![Some(-3), Some(7), Some(2)], + ]), + }], + }; + + let exec = StatsScanExec::new( + schema, + vec![SupportStatAggr::MinValue { + column_name: "value".to_string(), + }], + Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)), + ); + + let batch = collect_single_batch(&exec).await; + + let struct_array = assert_struct_state_matches_field(&batch, 0, &inner_field); + let values = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(values.value(0), -3); + } + + #[tokio::test] + async fn stats_scan_exec_matches_datafusion_max_state_field() { + let aggr_expr = build_datafusion_aggr_expr(max_udaf(), "max(value)"); + let state_fields = aggr_expr.state_fields().unwrap(); + assert_eq!(state_fields.len(), 1); + + let inner_field = state_fields[0].as_ref().clone(); + let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field( + "max_state", + inner_field.name(), + inner_field.data_type().clone(), + inner_field.is_nullable(), + )])); + let region_metadata = build_region_metadata(Some("host = 'a'")); + let scanner = StaticStatsScanner { + schema: region_metadata.schema.clone(), + metadata: region_metadata, + properties: ScannerProperties::default(), + files: vec![FileStatsItem { + num_rows: Some(6), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: build_row_groups(&[ + vec![Some(4), Some(8), None], + vec![Some(-3), Some(11), Some(2)], + ]), + }], + }; + + let exec = StatsScanExec::new( + schema, + vec![SupportStatAggr::MaxValue { + column_name: "value".to_string(), + }], + Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)), + ); + + let batch = collect_single_batch(&exec).await; + + let struct_array = assert_struct_state_matches_field(&batch, 0, &inner_field); + let values = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(values.value(0), 11); + } + + #[tokio::test] + async fn stats_scan_exec_rejects_datafusion_avg_multi_field_state() { + let aggr_expr = build_datafusion_aggr_expr(avg_udaf(), "avg(value)"); + let state_fields = aggr_expr.state_fields().unwrap(); + assert!(state_fields.len() > 1); + + let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "avg_state", + DataType::Struct( + state_fields + .iter() + .map(|field| field.as_ref().clone()) + .collect::>() + .into(), + ), + true, + )])); + let region_metadata = build_region_metadata(Some("host = 'a'")); + let scanner = StaticStatsScanner { + schema: region_metadata.schema.clone(), + metadata: region_metadata, + properties: ScannerProperties::default(), + files: vec![FileStatsItem { + num_rows: Some(5), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: build_row_groups(&[vec![Some(1), None, Some(9), Some(3), None]]), + }], + }; + + let exec = StatsScanExec::new( + schema, + vec![SupportStatAggr::CountNonNull { + column_name: "value".to_string(), + }], + Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)), + ); + + let mut stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap(); + let error = stream.next().await.unwrap().unwrap_err(); + + assert!( + error + .to_string() + .contains("only supports single-field state in v1") + ); + } + + #[tokio::test] + async fn stats_scan_exec_emits_state_rows_for_eligible_files() { + let schema = Arc::new(arrow_schema::Schema::new(vec![ + single_state_field("count_state", "count[count]", DataType::Int64, false), + single_state_field("max_state", "max[max]", DataType::Int64, true), + ])); + let requirements = vec![ + SupportStatAggr::CountNonNull { + column_name: "value".to_string(), + }, + SupportStatAggr::MaxValue { + column_name: "value".to_string(), + }, + ]; + + let region_metadata = build_region_metadata(Some("host = 'a'")); + let eligible_row_groups = build_row_groups(&[ + vec![Some(1), None, Some(9), Some(3), None], + vec![Some(2), Some(8), Some(7), None, Some(4)], + ]); + + let scanner = StaticStatsScanner { + schema: region_metadata.schema.clone(), + metadata: region_metadata, + properties: ScannerProperties::default(), + files: vec![ + FileStatsItem { + num_rows: Some(10), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: eligible_row_groups.clone(), + }, + FileStatsItem { + num_rows: Some(5), + file_partition_expr: Some("host = 'a'".to_string()), + row_groups: vec![], + }, + FileStatsItem { + num_rows: Some(10), + file_partition_expr: Some("host = 'b'".to_string()), + row_groups: eligible_row_groups, + }, + ], + }; + + let exec = StatsScanExec::new( + schema, + requirements, + Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)), + ); + + let stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap(); + let batches = stream.map(|batch| batch.unwrap()).collect::>().await; + + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let count_state = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let count_values = count_state + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(count_values.value(0), 7); + + let max_state = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let max_values = max_state + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(max_values.value(0), 9); + } +} diff --git a/src/query/src/optimizer/aggr_stats/support_aggr.rs b/src/query/src/optimizer/aggr_stats/support_aggr.rs new file mode 100644 index 0000000000..32b542847b --- /dev/null +++ b/src/query/src/optimizer/aggr_stats/support_aggr.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub use common_query::aggr_stats::SupportStatAggr; diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f696c8b53e..dfe4a82298 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -59,6 +59,7 @@ use crate::dist_plan::{ }; use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES}; use crate::optimizer::ExtensionAnalyzerRule; +use crate::optimizer::aggr_stats::AggrStatsPhysicalRule; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_nest_aggr::CountNestAggrRule; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; @@ -177,17 +178,21 @@ impl QueryEngineState { // add physical optimizer let mut physical_optimizer = PhysicalOptimizer::new(); + // Prepare aggregate-stats runtime rewrite shape before scan repartitioning/distribution. + physical_optimizer + .rules + .insert(5, Arc::new(AggrStatsPhysicalRule)); // Change TableScan's partition right before enforcing distribution physical_optimizer .rules - .insert(5, Arc::new(ParallelizeScan)); + .insert(6, Arc::new(ParallelizeScan)); // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling physical_optimizer .rules - .insert(6, Arc::new(PassDistribution)); + .insert(7, Arc::new(PassDistribution)); // Enforce sorting AFTER custom rules that modify the plan structure physical_optimizer.rules.insert( - 7, + 8, Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}), ); // Add rule for windowed sort diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 3eee78b2d1..0b734a7b7c 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -29,6 +29,7 @@ humantime.workspace = true itertools.workspace = true lazy_static.workspace = true num_enum = "0.7" +parquet.workspace = true prometheus.workspace = true prost.workspace = true serde.workspace = true diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index b235fcffc7..2063a6882d 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -17,6 +17,7 @@ use std::any::Any; use std::collections::HashMap; use std::fmt::{Debug, Display}; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; @@ -28,7 +29,9 @@ use common_time::Timestamp; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr}; use datatypes::schema::SchemaRef; +use futures::Stream; use futures::future::join_all; +use parquet::file::metadata::RowGroupMetaData; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; @@ -313,6 +316,9 @@ pub struct ScannerProperties { /// Whether the scanner is scanning a logical region. logical_region: bool, + + /// Whether stats-aware skip mode is enabled for aggregate-stats runtime execution. + stats_aware_skip_mode: bool, } impl ScannerProperties { @@ -337,6 +343,7 @@ impl ScannerProperties { distinguish_partition_range: false, target_partitions: 0, logical_region: false, + stats_aware_skip_mode: false, } } @@ -351,6 +358,9 @@ impl ScannerProperties { if let Some(target_partitions) = request.target_partitions { self.target_partitions = target_partitions; } + if let Some(stats_aware_skip_mode) = request.stats_aware_skip_mode { + self.stats_aware_skip_mode = stats_aware_skip_mode; + } } /// Returns the number of actual partitions. @@ -366,6 +376,11 @@ impl ScannerProperties { self.total_rows } + /// Returns whether stats-aware skip mode is enabled. + pub fn stats_aware_skip_mode(&self) -> bool { + self.stats_aware_skip_mode + } + /// Returns whether the scanner is scanning a logical region. pub fn is_logical_region(&self) -> bool { self.logical_region @@ -395,6 +410,8 @@ pub struct PrepareRequest { pub distinguish_partition_range: Option, /// The expected number of target partitions. pub target_partitions: Option, + /// Whether to enable stats-aware skip mode on the scanner. + pub stats_aware_skip_mode: Option, } impl PrepareRequest { @@ -415,6 +432,12 @@ impl PrepareRequest { self.target_partitions = Some(target_partitions); self } + + /// Sets the stats-aware skip mode flag. + pub fn with_stats_aware_skip_mode(mut self, stats_aware_skip_mode: bool) -> Self { + self.stats_aware_skip_mode = Some(stats_aware_skip_mode); + self + } } /// Necessary context of the query for the scanner. @@ -424,6 +447,27 @@ pub struct QueryScanContext { pub explain_verbose: bool, } +/// File-level stats returned by [`RegionScanner::scan_stats`]. +#[derive(Debug, Clone)] +pub struct FileStatsItem { + /// Exact row count from parquet metadata. + pub num_rows: Option, + /// Greptime file metadata, not parquet-native metadata. + pub file_partition_expr: Option, + /// Nested parquet row-group metadata for future finer-grained use. + pub row_groups: Vec, +} + +/// Row-group stats nested inside one [`FileStatsItem`]. +#[derive(Debug, Clone)] +pub struct RowGroupStatsItem { + pub row_group_index: usize, + pub metadata: Arc, +} + +pub type SendableFileStatsStream = + Pin> + Send>>; + /// A scanner that provides a way to scan the region concurrently. /// /// The scanner splits the region into partitions so that each partition can be scanned concurrently. @@ -456,6 +500,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send { partition: usize, ) -> Result; + /// Returns file-level stats for the current scan context. + /// + /// This method is read-only and does not scan row data. + fn scan_stats(&self, ctx: &QueryScanContext) -> Result; + /// Check if there is any predicate exclude region partition exprs that may be executed in this scanner. fn has_predicate_without_region(&self) -> bool; @@ -1016,6 +1065,10 @@ impl RegionScanner for SinglePartitionScanner { Ok(result.unwrap()) } + fn scan_stats(&self, _ctx: &QueryScanContext) -> Result { + Ok(Box::pin(futures::stream::empty())) + } + fn has_predicate_without_region(&self) -> bool { false } diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 2c9ac41560..c3420f3b6b 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -34,8 +34,7 @@ use snafu::ResultExt; use crate::error; -#[cfg(test)] -mod stats; +pub mod stats; /// Assert the scalar value is not utf8. Returns `None` if it's utf8. /// In theory, it should be converted to a timestamp scalar value by `TypeConversionRule`. diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 02511456ae..b6cbc8069b 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -66,6 +66,7 @@ pub struct RegionScanExec { is_partition_set: bool, // TODO(ruihang): handle TimeWindowed dist via this parameter distribution: Option, + stats_aware_skip_mode: bool, explain_verbose: bool, query_memory_tracker: Option, } @@ -82,6 +83,7 @@ impl std::fmt::Debug for RegionScanExec { .field("total_rows", &self.total_rows) .field("is_partition_set", &self.is_partition_set) .field("distribution", &self.distribution) + .field("stats_aware_skip_mode", &self.stats_aware_skip_mode) .field("explain_verbose", &self.explain_verbose) .finish() } @@ -225,6 +227,7 @@ impl RegionScanExec { total_rows, is_partition_set: false, distribution: request.distribution, + stats_aware_skip_mode: false, explain_verbose: false, query_memory_tracker, }) @@ -260,6 +263,10 @@ impl RegionScanExec { scanner.name().to_string() } + pub fn scanner(&self) -> Arc> { + self.scanner.clone() + } + /// Update the partition ranges of underlying scanner. pub fn with_new_partitions( &self, @@ -298,11 +305,47 @@ impl RegionScanExec { total_rows: self.total_rows, is_partition_set: true, distribution: self.distribution, + stats_aware_skip_mode: self.stats_aware_skip_mode, explain_verbose: self.explain_verbose, query_memory_tracker: self.query_memory_tracker.clone(), }) } + pub fn with_stats_aware_skip_mode( + &self, + stats_aware_skip_mode: bool, + ) -> Result { + { + let mut scanner = self.scanner.lock().unwrap(); + scanner.prepare( + PrepareRequest::default().with_stats_aware_skip_mode(stats_aware_skip_mode), + )?; + } + + Ok(Self { + scanner: self.scanner.clone(), + arrow_schema: self.arrow_schema.clone(), + output_ordering: self.output_ordering.clone(), + metric: self.metric.clone(), + properties: self.properties.clone(), + append_mode: self.append_mode, + total_rows: self.total_rows, + is_partition_set: self.is_partition_set, + distribution: self.distribution, + stats_aware_skip_mode, + explain_verbose: self.explain_verbose, + query_memory_tracker: self.query_memory_tracker.clone(), + }) + } + + pub fn stats_aware_skip_mode(&self) -> bool { + self.stats_aware_skip_mode + } + + pub fn append_mode(&self) -> bool { + self.append_mode + } + pub fn distribution(&self) -> Option { self.distribution }