From a5686f0042a3722070222686484a95afbd5fcac9 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 9 Apr 2026 12:28:13 +0800 Subject: [PATCH] feat: scaffold for aggr Signed-off-by: discord9 --- src/query/src/optimizer.rs | 1 + src/query/src/optimizer/aggregate_stats.rs | 177 +++++++++++++++++++++ src/query/src/query_engine/state.rs | 8 +- src/table/src/table/scan.rs | 8 + 4 files changed, 191 insertions(+), 3 deletions(-) create mode 100644 src/query/src/optimizer/aggregate_stats.rs diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 4259b587ba..ff22971670 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod aggregate_stats; pub mod constant_term; pub mod count_wildcard; pub mod parallelize_scan; diff --git a/src/query/src/optimizer/aggregate_stats.rs b/src/query/src/optimizer/aggregate_stats.rs new file mode 100644 index 0000000000..b07af77c60 --- /dev/null +++ b/src/query/src/optimizer/aggregate_stats.rs @@ -0,0 +1,177 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_telemetry::debug; +use datafusion::config::ConfigOptions; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::aggregates::AggregateExec; +use datafusion_common::Result as DfResult; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use table::table::scan::RegionScanExec; + +#[derive(Debug)] +pub struct AggregateStats; + +impl PhysicalOptimizerRule for AggregateStats { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> DfResult> { + Self::do_optimize(plan) + } + + fn name(&self) -> &str { + "aggregate_stats" + } + + fn schema_check(&self) -> bool { + true + } +} + +impl AggregateStats { + fn do_optimize(plan: Arc) -> DfResult> { + let result = plan + .transform_down(|plan| { + let Some(aggregate_exec) = plan.as_any().downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + let Some(region_scan) = Self::extract_region_scan(aggregate_exec) else { + return Ok(Transformed::no(plan)); + }; + + let eligibility = AggregateStatsEligibility::new(aggregate_exec, region_scan); + if let Some(reason) = eligibility.ineligible_reason() { + debug!("Skip aggregate stats optimization: {reason}"); + return Ok(Transformed::no(plan)); + } + + // TODO(ruihang): implement mixed stats-plus-scan rewrite in follow-up tasks. + Ok(Transformed::no(plan)) + })? + .data; + + Ok(result) + } + + fn extract_region_scan(aggregate_exec: &AggregateExec) -> Option<&RegionScanExec> { + let child = aggregate_exec.children().into_iter().next()?; + child.as_any().downcast_ref::() + } +} + +#[derive(Debug)] +struct AggregateStatsEligibility<'a> { + aggregate_exec: &'a AggregateExec, + region_scan: &'a RegionScanExec, +} + +impl<'a> AggregateStatsEligibility<'a> { + fn new(aggregate_exec: &'a AggregateExec, region_scan: &'a RegionScanExec) -> Self { + Self { + aggregate_exec, + region_scan, + } + } + + fn ineligible_reason(&self) -> Option { + if !self.region_scan.append_mode() { + return Some(EligibilityRejection::NonAppendOnly); + } + + if !self.aggregate_exec.group_expr().is_empty() { + return Some(EligibilityRejection::GroupedAggregate); + } + + if !self.has_supported_aggregates() { + return Some(EligibilityRejection::UnsupportedAggregate); + } + + if !self.has_stats_eligible_candidates() { + return Some(EligibilityRejection::NoStatsEligibleFiles); + } + + None + } + + fn has_supported_aggregates(&self) -> bool { + let aggr_exprs = self.aggregate_exec.aggr_expr(); + !aggr_exprs.is_empty() + && aggr_exprs + .iter() + .all(|expr| is_supported_aggregate_name(expr.name())) + } + + fn has_stats_eligible_candidates(&self) -> bool { + // TODO(ruihang): replace this scaffold with per-file stats classification. + self.region_scan.total_rows() > 0 + } +} + +#[derive(Debug)] +enum EligibilityRejection { + NonAppendOnly, + GroupedAggregate, + UnsupportedAggregate, + NoStatsEligibleFiles, +} + +impl std::fmt::Display for EligibilityRejection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EligibilityRejection::NonAppendOnly => { + write!(f, "aggregate stats MVP only supports append-only scans") + } + EligibilityRejection::GroupedAggregate => { + write!(f, "aggregate stats MVP does not support GROUP BY yet") + } + EligibilityRejection::UnsupportedAggregate => { + write!( + f, + "aggregate stats MVP only supports min/max/count aggregates" + ) + } + EligibilityRejection::NoStatsEligibleFiles => { + write!( + f, + "aggregate stats rewrite requires at least one stats-eligible file" + ) + } + } + } +} + +fn is_supported_aggregate_name(name: &str) -> bool { + let normalized = name.split('(').next().unwrap_or(name).to_ascii_lowercase(); + matches!(normalized.as_str(), "min" | "max" | "count") +} + +#[cfg(test)] +mod tests { + use super::is_supported_aggregate_name; + + #[test] + fn test_supported_aggregate_names() { + assert!(is_supported_aggregate_name("min")); + assert!(is_supported_aggregate_name("max(value)")); + assert!(is_supported_aggregate_name("count(*)")); + assert!(!is_supported_aggregate_name("sum(value)")); + assert!(!is_supported_aggregate_name("avg(value)")); + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index a45fc4c896..8a7383342f 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -59,6 +59,7 @@ use crate::dist_plan::{ }; use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES}; use crate::optimizer::ExtensionAnalyzerRule; +use crate::optimizer::aggregate_stats::AggregateStats; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; use crate::optimizer::parallelize_scan::ParallelizeScan; @@ -175,17 +176,18 @@ impl QueryEngineState { // add physical optimizer let mut physical_optimizer = PhysicalOptimizer::new(); + physical_optimizer.rules.insert(5, Arc::new(AggregateStats)); // Change TableScan's partition right before enforcing distribution physical_optimizer .rules - .insert(5, Arc::new(ParallelizeScan)); + .insert(6, Arc::new(ParallelizeScan)); // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling physical_optimizer .rules - .insert(6, Arc::new(PassDistribution)); + .insert(7, Arc::new(PassDistribution)); // Enforce sorting AFTER custom rules that modify the plan structure physical_optimizer.rules.insert( - 7, + 8, Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}), ); // Add rule for windowed sort diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index e2d8f794da..a49f727d99 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -307,6 +307,14 @@ impl RegionScanExec { self.distribution } + pub fn append_mode(&self) -> bool { + self.append_mode + } + + pub fn total_rows(&self) -> usize { + self.total_rows + } + pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) { let mut scanner = self.scanner.lock().unwrap(); // set distinguish_partition_range won't fail