feat: scaffold for aggr

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-09 12:28:13 +08:00
parent 514abfc133
commit a5686f0042
4 changed files with 191 additions and 3 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod aggregate_stats;
pub mod constant_term;
pub mod count_wildcard;
pub mod parallelize_scan;

View File

@@ -0,0 +1,177 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion_common::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TreeNode};
use table::table::scan::RegionScanExec;
#[derive(Debug)]
pub struct AggregateStats;
impl PhysicalOptimizerRule for AggregateStats {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Self::do_optimize(plan)
}
fn name(&self) -> &str {
"aggregate_stats"
}
fn schema_check(&self) -> bool {
true
}
}
impl AggregateStats {
fn do_optimize(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
let result = plan
.transform_down(|plan| {
let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() else {
return Ok(Transformed::no(plan));
};
let Some(region_scan) = Self::extract_region_scan(aggregate_exec) else {
return Ok(Transformed::no(plan));
};
let eligibility = AggregateStatsEligibility::new(aggregate_exec, region_scan);
if let Some(reason) = eligibility.ineligible_reason() {
debug!("Skip aggregate stats optimization: {reason}");
return Ok(Transformed::no(plan));
}
// TODO(ruihang): implement mixed stats-plus-scan rewrite in follow-up tasks.
Ok(Transformed::no(plan))
})?
.data;
Ok(result)
}
fn extract_region_scan(aggregate_exec: &AggregateExec) -> Option<&RegionScanExec> {
let child = aggregate_exec.children().into_iter().next()?;
child.as_any().downcast_ref::<RegionScanExec>()
}
}
#[derive(Debug)]
struct AggregateStatsEligibility<'a> {
aggregate_exec: &'a AggregateExec,
region_scan: &'a RegionScanExec,
}
impl<'a> AggregateStatsEligibility<'a> {
fn new(aggregate_exec: &'a AggregateExec, region_scan: &'a RegionScanExec) -> Self {
Self {
aggregate_exec,
region_scan,
}
}
fn ineligible_reason(&self) -> Option<EligibilityRejection> {
if !self.region_scan.append_mode() {
return Some(EligibilityRejection::NonAppendOnly);
}
if !self.aggregate_exec.group_expr().is_empty() {
return Some(EligibilityRejection::GroupedAggregate);
}
if !self.has_supported_aggregates() {
return Some(EligibilityRejection::UnsupportedAggregate);
}
if !self.has_stats_eligible_candidates() {
return Some(EligibilityRejection::NoStatsEligibleFiles);
}
None
}
fn has_supported_aggregates(&self) -> bool {
let aggr_exprs = self.aggregate_exec.aggr_expr();
!aggr_exprs.is_empty()
&& aggr_exprs
.iter()
.all(|expr| is_supported_aggregate_name(expr.name()))
}
fn has_stats_eligible_candidates(&self) -> bool {
// TODO(ruihang): replace this scaffold with per-file stats classification.
self.region_scan.total_rows() > 0
}
}
#[derive(Debug)]
enum EligibilityRejection {
NonAppendOnly,
GroupedAggregate,
UnsupportedAggregate,
NoStatsEligibleFiles,
}
impl std::fmt::Display for EligibilityRejection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EligibilityRejection::NonAppendOnly => {
write!(f, "aggregate stats MVP only supports append-only scans")
}
EligibilityRejection::GroupedAggregate => {
write!(f, "aggregate stats MVP does not support GROUP BY yet")
}
EligibilityRejection::UnsupportedAggregate => {
write!(
f,
"aggregate stats MVP only supports min/max/count aggregates"
)
}
EligibilityRejection::NoStatsEligibleFiles => {
write!(
f,
"aggregate stats rewrite requires at least one stats-eligible file"
)
}
}
}
}
fn is_supported_aggregate_name(name: &str) -> bool {
let normalized = name.split('(').next().unwrap_or(name).to_ascii_lowercase();
matches!(normalized.as_str(), "min" | "max" | "count")
}
#[cfg(test)]
mod tests {
use super::is_supported_aggregate_name;
#[test]
fn test_supported_aggregate_names() {
assert!(is_supported_aggregate_name("min"));
assert!(is_supported_aggregate_name("max(value)"));
assert!(is_supported_aggregate_name("count(*)"));
assert!(!is_supported_aggregate_name("sum(value)"));
assert!(!is_supported_aggregate_name("avg(value)"));
}
}

View File

@@ -59,6 +59,7 @@ use crate::dist_plan::{
};
use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
use crate::optimizer::ExtensionAnalyzerRule;
use crate::optimizer::aggregate_stats::AggregateStats;
use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use crate::optimizer::parallelize_scan::ParallelizeScan;
@@ -175,17 +176,18 @@ impl QueryEngineState {
// add physical optimizer
let mut physical_optimizer = PhysicalOptimizer::new();
physical_optimizer.rules.insert(5, Arc::new(AggregateStats));
// Change TableScan's partition right before enforcing distribution
physical_optimizer
.rules
.insert(5, Arc::new(ParallelizeScan));
.insert(6, Arc::new(ParallelizeScan));
// Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
physical_optimizer
.rules
.insert(6, Arc::new(PassDistribution));
.insert(7, Arc::new(PassDistribution));
// Enforce sorting AFTER custom rules that modify the plan structure
physical_optimizer.rules.insert(
7,
8,
Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
);
// Add rule for windowed sort

View File

@@ -307,6 +307,14 @@ impl RegionScanExec {
self.distribution
}
pub fn append_mode(&self) -> bool {
self.append_mode
}
pub fn total_rows(&self) -> usize {
self.total_rows
}
pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) {
let mut scanner = self.scanner.lock().unwrap();
// set distinguish_partition_range won't fail