feat: stats scan support

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-23 12:41:47 +08:00
parent 555741a9f1
commit 18590e36fb
19 changed files with 1498 additions and 10 deletions

2
Cargo.lock generated
View File

@@ -2633,6 +2633,7 @@ dependencies = [
"datafusion",
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
"datatypes",
"futures-util",
"once_cell",
@@ -12870,6 +12871,7 @@ dependencies = [
"itertools 0.14.0",
"lazy_static",
"num_enum 0.7.4",
"parquet",
"prometheus 0.14.0",
"prost 0.14.1",
"serde",

View File

@@ -22,6 +22,7 @@ common-time.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datatypes.workspace = true
once_cell.workspace = true
serde.workspace = true

View File

@@ -0,0 +1,310 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use datafusion::parquet::file::statistics::Statistics as ParquetStats;
use datafusion::scalar::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::expressions::{Column as PhysicalColumn, Literal};
use datatypes::schema::SchemaRef as RegionSchemaRef;
use datatypes::value::Value;
use store_api::region_engine::FileStatsItem;
/// Runtime requirement that has already been approved by optimizer rewrite checks.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SupportStatAggr {
CountRows,
CountNonNull { column_name: String },
MinValue { column_name: String },
MaxValue { column_name: String },
}
impl SupportStatAggr {
pub fn is_count_supported_expr(inputs: &[std::sync::Arc<dyn PhysicalExpr>]) -> bool {
match inputs {
[] => true,
[arg] if let Some(lit) = arg.as_any().downcast_ref::<Literal>() => {
lit.value() == &COUNT_STAR_EXPANSION
}
[arg] => arg.as_any().downcast_ref::<PhysicalColumn>().is_some(),
_ => false,
}
}
pub fn is_min_max_supported_expr(inputs: &[std::sync::Arc<dyn PhysicalExpr>]) -> bool {
match inputs {
[arg] => arg.as_any().downcast_ref::<PhysicalColumn>().is_some(),
_ => false,
}
}
pub fn from_aggr_expr(aggr: &AggregateFunctionExpr) -> Option<Self> {
match (aggr.fun().name(), aggr.expressions().as_slice()) {
("count", []) => Some(Self::CountRows),
("count", [arg]) if arg.as_any().downcast_ref::<Literal>().is_some() => {
Some(Self::CountRows)
}
("count", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
Some(Self::CountNonNull {
column_name: col.name().to_string(),
})
}
("min", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
Some(Self::MinValue {
column_name: col.name().to_string(),
})
}
("max", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
Some(Self::MaxValue {
column_name: col.name().to_string(),
})
}
_ => None,
}
}
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct FileColumnStats {
pub null_count: Option<u64>,
pub min_value: Option<Value>,
pub max_value: Option<Value>,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct StatsCandidateFile {
pub num_rows: Option<u64>,
pub column_stats: HashMap<String, FileColumnStats>,
}
impl StatsCandidateFile {
pub fn from_file_stats(
file_stats: &FileStatsItem,
region_partition_expr: Option<&str>,
requirements: &[SupportStatAggr],
region_schema: &RegionSchemaRef,
) -> Result<Option<Self>> {
let column_names = required_columns(requirements);
let column_stats = collect_column_stats(file_stats, region_schema, &column_names)?;
if !matches_partition_expr(
file_stats.file_partition_expr.as_deref(),
region_partition_expr,
) {
return Ok(None);
}
let candidate = Self {
num_rows: file_stats.num_rows,
column_stats,
};
for requirement in requirements {
if candidate.stat_value(requirement)?.is_none() {
return Ok(None);
}
}
Ok(Some(candidate))
}
pub fn stat_value(&self, requirement: &SupportStatAggr) -> Result<Option<Value>> {
match requirement {
SupportStatAggr::CountRows => self.num_rows.map(count_value).transpose(),
SupportStatAggr::CountNonNull { column_name } => {
let Some(column_stats) = self.column_stats.get(column_name) else {
return Ok(None);
};
let Some(num_rows) = self.num_rows else {
return Ok(None);
};
let Some(null_count) = column_stats.null_count else {
return Ok(None);
};
let Some(non_null_count) = num_rows.checked_sub(null_count) else {
return Err(DataFusionError::Internal(format!(
"StatsScanExec found null_count > num_rows for column {}",
column_name
)));
};
count_value(non_null_count).map(Some)
}
SupportStatAggr::MinValue { column_name } => Ok(self
.column_stats
.get(column_name)
.and_then(|stats| stats.min_value.clone())),
SupportStatAggr::MaxValue { column_name } => Ok(self
.column_stats
.get(column_name)
.and_then(|stats| stats.max_value.clone())),
}
}
}
fn count_value(value: u64) -> Result<Value> {
let value = i64::try_from(value).map_err(|_| {
DataFusionError::Internal(format!(
"StatsScanExec count state exceeds Int64 range: {}",
value
))
})?;
Ok(Value::Int64(value))
}
fn matches_partition_expr(
file_partition_expr: Option<&str>,
region_partition_expr: Option<&str>,
) -> bool {
matches!(
(file_partition_expr, region_partition_expr),
(Some(file_expr), Some(region_expr)) if file_expr == region_expr
)
}
fn required_columns(requirements: &[SupportStatAggr]) -> HashSet<String> {
requirements
.iter()
.filter_map(|requirement| match requirement {
SupportStatAggr::CountRows => None,
SupportStatAggr::CountNonNull { column_name }
| SupportStatAggr::MinValue { column_name }
| SupportStatAggr::MaxValue { column_name } => Some(column_name.clone()),
})
.collect()
}
fn collect_column_stats(
file_stats: &FileStatsItem,
region_schema: &RegionSchemaRef,
column_names: &HashSet<String>,
) -> Result<HashMap<String, FileColumnStats>> {
column_names
.iter()
.map(|column_name| {
Ok((
column_name.clone(),
collect_one_column_stats(file_stats, region_schema, column_name)?,
))
})
.collect()
}
fn collect_one_column_stats(
file_stats: &FileStatsItem,
region_schema: &RegionSchemaRef,
column_name: &str,
) -> Result<FileColumnStats> {
let Some(column_index) = region_schema.column_index_by_name(column_name) else {
return Ok(FileColumnStats::default());
};
Ok(FileColumnStats {
null_count: sum_null_counts(file_stats, column_index)?,
min_value: best_row_group_value(file_stats, column_index, Ordering::Less)?,
max_value: best_row_group_value(file_stats, column_index, Ordering::Greater)?,
})
}
fn sum_null_counts(file_stats: &FileStatsItem, column_index: usize) -> Result<Option<u64>> {
let mut total = 0_u64;
for row_group in &file_stats.row_groups {
let Some(stats) = row_group.metadata.column(column_index).statistics() else {
return Ok(None);
};
let Some(value) = stats.null_count_opt() else {
return Ok(None);
};
total = total.checked_add(value).ok_or_else(|| {
DataFusionError::Internal("StatsScanExec null-count overflow".to_string())
})?;
}
Ok(Some(total))
}
fn best_row_group_value(
file_stats: &FileStatsItem,
column_index: usize,
target: Ordering,
) -> Result<Option<Value>> {
let mut best = None;
for row_group in &file_stats.row_groups {
let Some(stats) = row_group.metadata.column(column_index).statistics() else {
return Ok(None);
};
let Some(scalar) = parquet_bound_scalar(stats, target) else {
return Ok(None);
};
let value = Value::try_from(scalar).map_err(|error| {
DataFusionError::Internal(format!(
"StatsScanExec failed to convert row-group scalar: {}",
error
))
})?;
let should_replace = best.as_ref().is_none_or(|current| {
value
.partial_cmp(current)
.is_some_and(|ordering| ordering == target)
});
if should_replace {
best = Some(value);
}
}
Ok(best)
}
fn parquet_bound_scalar(stats: &ParquetStats, target: Ordering) -> Option<ScalarValue> {
let use_min = target == Ordering::Less;
match stats {
ParquetStats::Boolean(stats) => Some(ScalarValue::Boolean(Some(if use_min {
*stats.min_opt()?
} else {
*stats.max_opt()?
}))),
ParquetStats::Int32(stats) => Some(ScalarValue::Int32(Some(if use_min {
*stats.min_opt()?
} else {
*stats.max_opt()?
}))),
ParquetStats::Int64(stats) => Some(ScalarValue::Int64(Some(if use_min {
*stats.min_opt()?
} else {
*stats.max_opt()?
}))),
ParquetStats::Int96(_) => None,
ParquetStats::Float(stats) => Some(ScalarValue::Float32(Some(if use_min {
*stats.min_opt()?
} else {
*stats.max_opt()?
}))),
ParquetStats::Double(stats) => Some(ScalarValue::Float64(Some(if use_min {
*stats.min_opt()?
} else {
*stats.max_opt()?
}))),
ParquetStats::ByteArray(stats) => {
let bytes = if use_min {
stats.min_bytes_opt()?
} else {
stats.max_bytes_opt()?
};
Some(ScalarValue::Utf8(String::from_utf8(bytes.to_owned()).ok()))
}
ParquetStats::FixedLenByteArray(_) => None,
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod aggr_stats;
pub mod columnar_value;
pub mod error;
pub mod logical_plan;

View File

@@ -21,6 +21,7 @@ use std::sync::Arc;
use std::time::Instant;
use api::v1::SemanticType;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::filter::SimpleFilterEvaluator;
@@ -36,7 +37,10 @@ use partition::expr::PartitionExpr;
use smallvec::SmallVec;
use snafu::{OptionExt as _, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::region_engine::{
FileStatsItem, PartitionRange, QueryScanContext, RegionScannerRef, RowGroupStatsItem,
SendableFileStatsStream,
};
use store_api::storage::{
ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
TimeSeriesRowSelector,
@@ -1450,6 +1454,53 @@ pub struct StreamContext {
pub(crate) query_start: Instant,
}
pub(crate) fn scan_input_stats(
input: &ScanInput,
ctx: &QueryScanContext,
) -> SendableFileStatsStream {
let access_layer = input.access_layer.clone();
let cache_strategy = input.cache_strategy.clone();
let region_metadata = input.region_metadata().clone();
let files = input.files.clone();
let explain_verbose = ctx.explain_verbose;
Box::pin(try_stream! {
if explain_verbose {
common_telemetry::info!(
"Scan stats, region_id: {}, files: {}",
region_metadata.region_id,
files.len()
);
}
for file in files {
let sst_meta = access_layer
.read_sst(file.clone())
.cache(cache_strategy.clone())
.expected_metadata(Some(region_metadata.clone()))
.read_sst_meta()
.await
.map_err(BoxedError::new)?;
let parquet_meta = sst_meta.parquet_metadata();
let row_groups = parquet_meta
.row_groups()
.iter()
.enumerate()
.map(|(row_group_index, metadata)| RowGroupStatsItem {
row_group_index,
metadata: Arc::new(metadata.clone()),
})
.collect();
yield FileStatsItem {
num_rows: Some(parquet_meta.file_metadata().num_rows() as u64),
file_partition_expr: file.meta_ref().partition_expr.as_ref().map(ToString::to_string),
row_groups,
};
}
})
}
impl StreamContext {
/// Creates a new [StreamContext] for [SeqScan].
pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {

View File

@@ -31,6 +31,7 @@ use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
SendableFileStatsStream,
};
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
@@ -44,7 +45,7 @@ use crate::read::range::RangeMeta;
use crate::read::range_cache::{
build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream,
};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge,
@@ -545,6 +546,10 @@ impl RegionScanner for SeqScan {
.map_err(BoxedError::new)
}
fn scan_stats(&self, ctx: &QueryScanContext) -> Result<SendableFileStatsStream, BoxedError> {
Ok(scan_input_stats(&self.stream_ctx.input, ctx))
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);

View File

@@ -35,6 +35,7 @@ use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
SendableFileStatsStream,
};
use tokio::sync::Semaphore;
use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
@@ -46,7 +47,7 @@ use crate::error::{
};
use crate::read::ScannerMetrics;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size,
compute_parallel_channel_size,
@@ -353,6 +354,10 @@ impl RegionScanner for SeriesScan {
.map_err(BoxedError::new)
}
fn scan_stats(&self, ctx: &QueryScanContext) -> Result<SendableFileStatsStream, BoxedError> {
Ok(scan_input_stats(&self.stream_ctx.input, ctx))
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);

View File

@@ -30,12 +30,12 @@ use futures::{Stream, StreamExt};
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, SendableFileStatsStream,
};
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges,
};
@@ -331,6 +331,10 @@ impl RegionScanner for UnorderedScan {
.map_err(BoxedError::new)
}
fn scan_stats(&self, ctx: &QueryScanContext) -> Result<SendableFileStatsStream, BoxedError> {
Ok(scan_input_stats(&self.stream_ctx.input, ctx))
}
/// If this scanner have predicate other than region partition exprs
fn has_predicate_without_region(&self) -> bool {
let predicate = self

View File

@@ -254,6 +254,21 @@ impl ParquetReaderBuilder {
self
}
pub(crate) async fn read_sst_meta(&self) -> Result<Arc<CachedSstMeta>> {
let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
let file_size = self.file_handle.meta_ref().file_size;
let mut cache_metrics = MetadataCacheMetrics::default();
let (sst_meta, _) = self
.read_parquet_metadata(
&file_path,
file_size,
&mut cache_metrics,
self.page_index_policy,
)
.await?;
Ok(sst_meta)
}
/// Sets the compaction flag.
#[must_use]
pub fn compaction(mut self, compaction: bool) -> Self {

View File

@@ -650,6 +650,7 @@ mod tests {
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
SendableFileStatsStream,
};
use store_api::storage::{RegionId, ScanRequest};
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
@@ -718,6 +719,13 @@ mod tests {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
}
fn scan_stats(
&self,
_ctx: &QueryScanContext,
) -> std::result::Result<SendableFileStatsStream, BoxedError> {
Ok(Box::pin(futures::stream::empty()))
}
fn has_predicate_without_region(&self) -> bool {
true
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod aggr_stats;
pub mod constant_term;
pub mod count_nest_aggr;
pub mod count_wildcard;

View File

@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_common::Result;
use datafusion_common::tree_node::{Transformed, TreeNode};
use table::table::scan::RegionScanExec;
use crate::optimizer::aggr_stats::support_aggr::SupportStatAggr;
pub(crate) mod stat_scan;
pub(crate) mod support_aggr;
/// Physical optimizer scaffold for aggregate-stats runtime rewrite.
///
/// This pass only owns query-shape eligibility and rewrite shape.
/// Runtime stats lookup/classification is handled during execution.
#[derive(Debug, Default)]
pub struct AggrStatsPhysicalRule;
impl PhysicalOptimizerRule for AggrStatsPhysicalRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Self::rewrite_plan_shape(plan)
}
fn name(&self) -> &str {
"aggr_stats_physical"
}
fn schema_check(&self) -> bool {
true
}
}
impl AggrStatsPhysicalRule {
fn rewrite_plan_shape(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(|plan| {
let Some(_rewrite_target) = RewriteTarget::extract(&plan) else {
return Ok(Transformed::no(plan));
};
// impl rewrite in RewriteTarget
Ok(Transformed::no(plan))
})
.map(|res| res.data)
}
}
/// TODO(discord9): support more kind of aggr
#[allow(unused)]
enum RewriteTarget<'a> {
FinalOverPartial {
final_exec: &'a AggregateExec,
partial_exec: &'a AggregateExec,
region_scan: &'a RegionScanExec,
keep_coalesce: bool,
aggr_exprs: Vec<SupportStatAggr>,
},
}
#[allow(unused)]
impl<'a> RewriteTarget<'a> {
fn is_supported_rewrite(&self) -> bool {
let Self::FinalOverPartial {
final_exec,
partial_exec,
region_scan,
..
} = self;
final_exec.group_expr().is_empty()
&& final_exec
.aggr_expr()
.iter()
.all(|aggr| !aggr.is_distinct())
&& final_exec
.filter_expr()
.iter()
.all(|filter| filter.is_none())
&& partial_exec
.filter_expr()
.iter()
.all(|filter| filter.is_none())
&& region_scan.append_mode()
}
#[allow(unused)]
fn extract(plan: &'a Arc<dyn ExecutionPlan>) -> Option<Self> {
let aggregate_exec = plan.as_any().downcast_ref::<AggregateExec>()?;
if !matches!(aggregate_exec.mode(), AggregateMode::Final) {
return None;
}
let (input, keep_coalesce) = if let Some(coalesce) = aggregate_exec
.input()
.as_any()
.downcast_ref::<CoalescePartitionsExec>(
) {
(coalesce.input(), true)
} else {
(aggregate_exec.input(), false)
};
let partial_exec = input.as_any().downcast_ref::<AggregateExec>()?;
if !matches!(partial_exec.mode(), AggregateMode::Partial) {
return None;
}
let region_scan = partial_exec
.input()
.as_any()
.downcast_ref::<RegionScanExec>()?;
let aggr_exprs = aggregate_exec
.aggr_expr()
.iter()
.map(|aggr_expr| SupportStatAggr::from_aggr_expr(aggr_expr))
.try_collect()?;
let zelf = Self::FinalOverPartial {
final_exec: aggregate_exec,
partial_exec,
region_scan,
keep_coalesce,
aggr_exprs,
};
if zelf.is_supported_rewrite() {
Some(zelf)
} else {
None
}
}
fn first_stage_aggregate(&self) -> &'a AggregateExec {
match self {
RewriteTarget::FinalOverPartial { partial_exec, .. } => partial_exec,
}
}
fn region_scan(&self) -> &'a RegionScanExec {
match self {
RewriteTarget::FinalOverPartial { region_scan, .. } => region_scan,
}
}
}

View File

@@ -0,0 +1,805 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use std::any::Any;
use std::sync::{Arc, Mutex};
use arrow_schema::{DataType, Field, SchemaRef};
use async_stream::try_stream;
use common_query::aggr_stats::StatsCandidateFile;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
};
use datafusion::scalar::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::EquivalenceProperties;
use datatypes::arrow::array::{ArrayRef, StructArray};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use futures::StreamExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{QueryScanContext, RegionScannerRef, SendableFileStatsStream};
use super::support_aggr::SupportStatAggr;
fn build_state_row(
candidate: &StatsCandidateFile,
schema: &SchemaRef,
requirements: &[SupportStatAggr],
) -> Result<Vec<ScalarValue>> {
if schema.fields().len() != requirements.len() {
return Err(DataFusionError::Internal(format!(
"StatsScanExec schema/requirement mismatch: {} fields, {} requirements",
schema.fields().len(),
requirements.len()
)));
}
let mut state_row = Vec::with_capacity(requirements.len());
for (field, requirement) in schema.fields().iter().zip(requirements) {
let state = build_state_scalar(candidate, field.as_ref(), requirement)?;
state_row.push(state);
}
Ok(state_row)
}
fn build_state_scalar(
candidate: &StatsCandidateFile,
field: &Field,
requirement: &SupportStatAggr,
) -> Result<ScalarValue> {
let DataType::Struct(state_fields) = field.data_type() else {
return Err(DataFusionError::Internal(format!(
"StatsScanExec expects struct state field, got {:?} for {}",
field.data_type(),
field.name()
)));
};
if state_fields.len() != 1 {
return Err(DataFusionError::Internal(format!(
"StatsScanExec only supports single-field state in v1, got {} fields for {}",
state_fields.len(),
field.name()
)));
}
let inner_field = state_fields[0].as_ref();
let output_type = ConcreteDataType::from_arrow_type(inner_field.data_type());
let Some(value) = candidate.stat_value(requirement)? else {
return Err(DataFusionError::Internal(format!(
"StatsScanExec built an ineligible stats candidate for requirement {:?}",
requirement
)));
};
let scalar = value.try_to_scalar_value(&output_type).map_err(|error| {
DataFusionError::Internal(format!(
"StatsScanExec failed to convert state value for {}: {}",
field.name(),
error
))
})?;
let state_array = scalar.to_array().map_err(|error| {
DataFusionError::Internal(format!(
"StatsScanExec failed to build state array for {}: {}",
field.name(),
error
))
})?;
Ok(ScalarValue::Struct(Arc::new(StructArray::new(
state_fields.clone(),
vec![state_array],
None,
))))
}
fn build_batch_from_candidates(
schema: &SchemaRef,
requirements: &[SupportStatAggr],
files: &[StatsCandidateFile],
) -> Result<Option<RecordBatch>> {
let mut columns = (0..requirements.len())
.map(|_| Vec::<ScalarValue>::new())
.collect::<Vec<_>>();
for file in files {
let row = build_state_row(file, schema, requirements)?;
for (index, scalar) in row.into_iter().enumerate() {
columns[index].push(scalar);
}
}
if columns.first().is_none_or(|column| column.is_empty()) {
return Ok(None);
}
let arrays = columns
.into_iter()
.map(|values| {
ScalarValue::iter_to_array(values).map_err(|error| {
DataFusionError::Internal(format!(
"StatsScanExec failed to materialize state array: {}",
error
))
})
})
.collect::<Result<Vec<ArrayRef>>>()?;
RecordBatch::try_new(schema.clone(), arrays)
.map(Some)
.map_err(|error| {
DataFusionError::Internal(format!(
"StatsScanExec failed to build record batch: {}",
error
))
})
}
async fn build_batch_from_scan_stats(
schema: &SchemaRef,
requirements: &[SupportStatAggr],
region_metadata: &RegionMetadataRef,
mut scan_stats: SendableFileStatsStream,
) -> Result<Option<RecordBatch>> {
let mut files = Vec::new();
while let Some(file_stats) = scan_stats.next().await {
let file_stats = file_stats.map_err(|error| DataFusionError::External(error.into()))?;
let Some(candidate) = StatsCandidateFile::from_file_stats(
&file_stats,
region_metadata.partition_expr.as_deref(),
requirements,
&region_metadata.schema,
)?
else {
continue;
};
files.push(candidate);
}
build_batch_from_candidates(schema, requirements, &files)
}
/// Physical execution plan for runtime stats-backed partial aggregates.
///
/// This node obtains scanner-owned file stats during `execute()` and materializes partial
/// aggregate state from those stats without doing optimizer-time I/O.
pub struct StatsScanExec {
schema: SchemaRef,
requirements: Vec<SupportStatAggr>,
scanner: Arc<Mutex<RegionScannerRef>>,
properties: Arc<PlanProperties>,
metrics: ExecutionPlanMetricsSet,
}
impl std::fmt::Debug for StatsScanExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StatsScanExec")
.field("schema", &self.schema)
.field("requirements", &self.requirements)
.finish()
}
}
impl StatsScanExec {
pub fn new(
schema: SchemaRef,
requirements: Vec<SupportStatAggr>,
scanner: Arc<Mutex<RegionScannerRef>>,
) -> Self {
Self {
properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
datafusion::physical_plan::Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
)),
schema,
requirements,
scanner,
metrics: ExecutionPlanMetricsSet::new(),
}
}
pub fn requirements(&self) -> &[SupportStatAggr] {
&self.requirements
}
}
impl DisplayAs for StatsScanExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "StatsScanExec: requirements={}", self.requirements.len())
}
DisplayFormatType::TreeRender => write!(f, "StatsScanExec"),
}
}
}
impl ExecutionPlan for StatsScanExec {
fn name(&self) -> &str {
"StatsScanExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if !children.is_empty() {
return Err(DataFusionError::Internal(format!(
"StatsScanExec expects no children, got {}",
children.len()
)));
}
Ok(Arc::new(Self {
schema: self.schema.clone(),
requirements: self.requirements.clone(),
scanner: self.scanner.clone(),
properties: self.properties.clone(),
metrics: self.metrics.clone(),
}))
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Execution(format!(
"StatsScanExec expects a single partition, got {}",
partition
)));
}
let schema = self.schema.clone();
let requirements = self.requirements.clone();
let (region_metadata, scan_stats) = {
let scanner = self.scanner.lock().unwrap();
let region_metadata = scanner.metadata();
let scan_stats = scanner
.scan_stats(&QueryScanContext::default())
.map_err(|error| DataFusionError::External(error.into()))?;
(region_metadata, scan_stats)
};
let stream = try_stream! {
if let Some(batch) = build_batch_from_scan_stats(
&schema,
&requirements,
&region_metadata,
scan_stats,
)
.await? {
yield batch;
}
};
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
Box::pin(stream),
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use api::v1::SemanticType;
use bytes::Bytes;
use common_query::aggr_stats::SupportStatAggr;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Column as PhysicalColumn;
use datatypes::arrow::array::Int64Array;
use datatypes::schema::ColumnSchema;
use futures::StreamExt;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::region_engine::{
FileStatsItem, RowGroupStatsItem, ScannerProperties, SendableFileStatsStream,
};
use store_api::storage::RegionId;
use super::*;
#[derive(Debug)]
struct StaticStatsScanner {
schema: datatypes::schema::SchemaRef,
metadata: RegionMetadataRef,
properties: ScannerProperties,
files: Vec<FileStatsItem>,
}
impl store_api::region_engine::RegionScanner for StaticStatsScanner {
fn name(&self) -> &str {
"StaticStatsScanner"
}
fn properties(&self) -> &ScannerProperties {
&self.properties
}
fn schema(&self) -> datatypes::schema::SchemaRef {
self.schema.clone()
}
fn metadata(&self) -> RegionMetadataRef {
self.metadata.clone()
}
fn prepare(
&mut self,
request: store_api::region_engine::PrepareRequest,
) -> std::result::Result<(), common_error::ext::BoxedError> {
self.properties.prepare(request);
Ok(())
}
fn scan_partition(
&self,
_ctx: &QueryScanContext,
_metrics_set: &ExecutionPlanMetricsSet,
_partition: usize,
) -> std::result::Result<
common_recordbatch::SendableRecordBatchStream,
common_error::ext::BoxedError,
> {
Ok(Box::pin(common_recordbatch::EmptyRecordBatchStream::new(
self.schema.clone(),
)))
}
fn scan_stats(
&self,
_ctx: &QueryScanContext,
) -> std::result::Result<SendableFileStatsStream, common_error::ext::BoxedError> {
Ok(Box::pin(futures::stream::iter(
self.files.clone().into_iter().map(Ok),
)))
}
fn has_predicate_without_region(&self) -> bool {
false
}
fn add_dyn_filter_to_predicate(
&mut self,
filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> Vec<bool> {
vec![false; filter_exprs.len()]
}
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
}
impl datafusion::physical_plan::DisplayAs for StaticStatsScanner {
fn fmt_as(
&self,
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
write!(f, "StaticStatsScanner")
}
}
fn single_state_field(
name: &str,
inner_name: &str,
inner_type: DataType,
inner_nullable: bool,
) -> Field {
Field::new(
name,
DataType::Struct(vec![Field::new(inner_name, inner_type, inner_nullable)].into()),
true,
)
}
fn build_region_metadata(partition_expr: Option<&str>) -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"value",
datatypes::data_type::ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 1,
});
builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
datatypes::data_type::ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
});
let mut metadata = builder.build_without_validation().unwrap();
metadata.set_partition_expr(partition_expr.map(str::to_string));
Arc::new(metadata)
}
fn build_row_groups(chunks: &[Vec<Option<i64>>]) -> Vec<RowGroupStatsItem> {
let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"value",
DataType::Int64,
true,
)]));
let mut buffer = Cursor::new(Vec::new());
let props = WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), Some(props)).unwrap();
for chunk in chunks {
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int64Array::from(chunk.clone()))],
)
.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
let metadata = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer.into_inner()))
.unwrap()
.metadata()
.clone();
metadata
.row_groups()
.iter()
.enumerate()
.map(|(row_group_index, metadata)| RowGroupStatsItem {
row_group_index,
metadata: Arc::new(metadata.clone()),
})
.collect()
}
fn build_datafusion_aggr_expr(
aggr: Arc<datafusion_expr::AggregateUDF>,
alias: &str,
) -> Arc<AggregateFunctionExpr> {
Arc::new(
AggregateExprBuilder::new(aggr, vec![Arc::new(PhysicalColumn::new("value", 0))])
.schema(Arc::new(arrow_schema::Schema::new(vec![Field::new(
"value",
DataType::Int64,
true,
)])))
.alias(alias)
.build()
.unwrap(),
)
}
async fn collect_single_batch(exec: &StatsScanExec) -> RecordBatch {
let stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
let batches = stream.map(|batch| batch.unwrap()).collect::<Vec<_>>().await;
assert_eq!(batches.len(), 1);
batches.into_iter().next().unwrap()
}
fn assert_struct_state_matches_field<'a>(
batch: &'a RecordBatch,
column_index: usize,
expected_field: &Field,
) -> &'a StructArray {
let struct_array = batch
.column(column_index)
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
assert_eq!(struct_array.fields().len(), 1);
assert_eq!(struct_array.fields()[0].as_ref(), expected_field);
struct_array
}
#[tokio::test]
async fn stats_scan_exec_matches_datafusion_count_state_field() {
let aggr_expr = build_datafusion_aggr_expr(count_udaf(), "count(value)");
let state_fields = aggr_expr.state_fields().unwrap();
assert_eq!(state_fields.len(), 1);
let inner_field = state_fields[0].as_ref().clone();
let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field(
"count_state",
inner_field.name(),
inner_field.data_type().clone(),
inner_field.is_nullable(),
)]));
let region_metadata = build_region_metadata(Some("host = 'a'"));
let scanner = StaticStatsScanner {
schema: region_metadata.schema.clone(),
metadata: region_metadata,
properties: ScannerProperties::default(),
files: vec![FileStatsItem {
num_rows: Some(5),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: build_row_groups(&[vec![Some(1), None, Some(9), Some(3), None]]),
}],
};
let exec = StatsScanExec::new(
schema,
vec![SupportStatAggr::CountNonNull {
column_name: "value".to_string(),
}],
Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)),
);
let batch = collect_single_batch(&exec).await;
let struct_array = assert_struct_state_matches_field(&batch, 0, &inner_field);
let values = struct_array
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(values.value(0), 3);
}
#[tokio::test]
async fn stats_scan_exec_matches_datafusion_min_state_field() {
let aggr_expr = build_datafusion_aggr_expr(min_udaf(), "min(value)");
let state_fields = aggr_expr.state_fields().unwrap();
assert_eq!(state_fields.len(), 1);
let inner_field = state_fields[0].as_ref().clone();
let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field(
"min_state",
inner_field.name(),
inner_field.data_type().clone(),
inner_field.is_nullable(),
)]));
let region_metadata = build_region_metadata(Some("host = 'a'"));
let scanner = StaticStatsScanner {
schema: region_metadata.schema.clone(),
metadata: region_metadata,
properties: ScannerProperties::default(),
files: vec![FileStatsItem {
num_rows: Some(6),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: build_row_groups(&[
vec![Some(4), Some(8), None],
vec![Some(-3), Some(7), Some(2)],
]),
}],
};
let exec = StatsScanExec::new(
schema,
vec![SupportStatAggr::MinValue {
column_name: "value".to_string(),
}],
Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)),
);
let batch = collect_single_batch(&exec).await;
let struct_array = assert_struct_state_matches_field(&batch, 0, &inner_field);
let values = struct_array
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(values.value(0), -3);
}
#[tokio::test]
async fn stats_scan_exec_matches_datafusion_max_state_field() {
let aggr_expr = build_datafusion_aggr_expr(max_udaf(), "max(value)");
let state_fields = aggr_expr.state_fields().unwrap();
assert_eq!(state_fields.len(), 1);
let inner_field = state_fields[0].as_ref().clone();
let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field(
"max_state",
inner_field.name(),
inner_field.data_type().clone(),
inner_field.is_nullable(),
)]));
let region_metadata = build_region_metadata(Some("host = 'a'"));
let scanner = StaticStatsScanner {
schema: region_metadata.schema.clone(),
metadata: region_metadata,
properties: ScannerProperties::default(),
files: vec![FileStatsItem {
num_rows: Some(6),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: build_row_groups(&[
vec![Some(4), Some(8), None],
vec![Some(-3), Some(11), Some(2)],
]),
}],
};
let exec = StatsScanExec::new(
schema,
vec![SupportStatAggr::MaxValue {
column_name: "value".to_string(),
}],
Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)),
);
let batch = collect_single_batch(&exec).await;
let struct_array = assert_struct_state_matches_field(&batch, 0, &inner_field);
let values = struct_array
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(values.value(0), 11);
}
#[tokio::test]
async fn stats_scan_exec_rejects_datafusion_avg_multi_field_state() {
let aggr_expr = build_datafusion_aggr_expr(avg_udaf(), "avg(value)");
let state_fields = aggr_expr.state_fields().unwrap();
assert!(state_fields.len() > 1);
let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
"avg_state",
DataType::Struct(
state_fields
.iter()
.map(|field| field.as_ref().clone())
.collect::<Vec<_>>()
.into(),
),
true,
)]));
let region_metadata = build_region_metadata(Some("host = 'a'"));
let scanner = StaticStatsScanner {
schema: region_metadata.schema.clone(),
metadata: region_metadata,
properties: ScannerProperties::default(),
files: vec![FileStatsItem {
num_rows: Some(5),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: build_row_groups(&[vec![Some(1), None, Some(9), Some(3), None]]),
}],
};
let exec = StatsScanExec::new(
schema,
vec![SupportStatAggr::CountNonNull {
column_name: "value".to_string(),
}],
Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)),
);
let mut stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
let error = stream.next().await.unwrap().unwrap_err();
assert!(
error
.to_string()
.contains("only supports single-field state in v1")
);
}
#[tokio::test]
async fn stats_scan_exec_emits_state_rows_for_eligible_files() {
let schema = Arc::new(arrow_schema::Schema::new(vec![
single_state_field("count_state", "count[count]", DataType::Int64, false),
single_state_field("max_state", "max[max]", DataType::Int64, true),
]));
let requirements = vec![
SupportStatAggr::CountNonNull {
column_name: "value".to_string(),
},
SupportStatAggr::MaxValue {
column_name: "value".to_string(),
},
];
let region_metadata = build_region_metadata(Some("host = 'a'"));
let eligible_row_groups = build_row_groups(&[
vec![Some(1), None, Some(9), Some(3), None],
vec![Some(2), Some(8), Some(7), None, Some(4)],
]);
let scanner = StaticStatsScanner {
schema: region_metadata.schema.clone(),
metadata: region_metadata,
properties: ScannerProperties::default(),
files: vec![
FileStatsItem {
num_rows: Some(10),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: eligible_row_groups.clone(),
},
FileStatsItem {
num_rows: Some(5),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: vec![],
},
FileStatsItem {
num_rows: Some(10),
file_partition_expr: Some("host = 'b'".to_string()),
row_groups: eligible_row_groups,
},
],
};
let exec = StatsScanExec::new(
schema,
requirements,
Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)),
);
let stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
let batches = stream.map(|batch| batch.unwrap()).collect::<Vec<_>>().await;
assert_eq!(batches.len(), 1);
let batch = &batches[0];
assert_eq!(batch.num_rows(), 1);
let count_state = batch
.column(0)
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let count_values = count_state
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(count_values.value(0), 7);
let max_state = batch
.column(1)
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let max_values = max_state
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(max_values.value(0), 9);
}
}

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub use common_query::aggr_stats::SupportStatAggr;

View File

@@ -59,6 +59,7 @@ use crate::dist_plan::{
};
use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
use crate::optimizer::ExtensionAnalyzerRule;
use crate::optimizer::aggr_stats::AggrStatsPhysicalRule;
use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
use crate::optimizer::count_nest_aggr::CountNestAggrRule;
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
@@ -177,17 +178,21 @@ impl QueryEngineState {
// add physical optimizer
let mut physical_optimizer = PhysicalOptimizer::new();
// Prepare aggregate-stats runtime rewrite shape before scan repartitioning/distribution.
physical_optimizer
.rules
.insert(5, Arc::new(AggrStatsPhysicalRule));
// Change TableScan's partition right before enforcing distribution
physical_optimizer
.rules
.insert(5, Arc::new(ParallelizeScan));
.insert(6, Arc::new(ParallelizeScan));
// Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling
physical_optimizer
.rules
.insert(6, Arc::new(PassDistribution));
.insert(7, Arc::new(PassDistribution));
// Enforce sorting AFTER custom rules that modify the plan structure
physical_optimizer.rules.insert(
7,
8,
Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}),
);
// Add rule for windowed sort

View File

@@ -29,6 +29,7 @@ humantime.workspace = true
itertools.workspace = true
lazy_static.workspace = true
num_enum = "0.7"
parquet.workspace = true
prometheus.workspace = true
prost.workspace = true
serde.workspace = true

View File

@@ -17,6 +17,7 @@
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
@@ -28,7 +29,9 @@ use common_time::Timestamp;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr};
use datatypes::schema::SchemaRef;
use futures::Stream;
use futures::future::join_all;
use parquet::file::metadata::RowGroupMetaData;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
@@ -313,6 +316,9 @@ pub struct ScannerProperties {
/// Whether the scanner is scanning a logical region.
logical_region: bool,
/// Whether stats-aware skip mode is enabled for aggregate-stats runtime execution.
stats_aware_skip_mode: bool,
}
impl ScannerProperties {
@@ -337,6 +343,7 @@ impl ScannerProperties {
distinguish_partition_range: false,
target_partitions: 0,
logical_region: false,
stats_aware_skip_mode: false,
}
}
@@ -351,6 +358,9 @@ impl ScannerProperties {
if let Some(target_partitions) = request.target_partitions {
self.target_partitions = target_partitions;
}
if let Some(stats_aware_skip_mode) = request.stats_aware_skip_mode {
self.stats_aware_skip_mode = stats_aware_skip_mode;
}
}
/// Returns the number of actual partitions.
@@ -366,6 +376,11 @@ impl ScannerProperties {
self.total_rows
}
/// Returns whether stats-aware skip mode is enabled.
pub fn stats_aware_skip_mode(&self) -> bool {
self.stats_aware_skip_mode
}
/// Returns whether the scanner is scanning a logical region.
pub fn is_logical_region(&self) -> bool {
self.logical_region
@@ -395,6 +410,8 @@ pub struct PrepareRequest {
pub distinguish_partition_range: Option<bool>,
/// The expected number of target partitions.
pub target_partitions: Option<usize>,
/// Whether to enable stats-aware skip mode on the scanner.
pub stats_aware_skip_mode: Option<bool>,
}
impl PrepareRequest {
@@ -415,6 +432,12 @@ impl PrepareRequest {
self.target_partitions = Some(target_partitions);
self
}
/// Sets the stats-aware skip mode flag.
pub fn with_stats_aware_skip_mode(mut self, stats_aware_skip_mode: bool) -> Self {
self.stats_aware_skip_mode = Some(stats_aware_skip_mode);
self
}
}
/// Necessary context of the query for the scanner.
@@ -424,6 +447,27 @@ pub struct QueryScanContext {
pub explain_verbose: bool,
}
/// File-level stats returned by [`RegionScanner::scan_stats`].
#[derive(Debug, Clone)]
pub struct FileStatsItem {
/// Exact row count from parquet metadata.
pub num_rows: Option<u64>,
/// Greptime file metadata, not parquet-native metadata.
pub file_partition_expr: Option<String>,
/// Nested parquet row-group metadata for future finer-grained use.
pub row_groups: Vec<RowGroupStatsItem>,
}
/// Row-group stats nested inside one [`FileStatsItem`].
#[derive(Debug, Clone)]
pub struct RowGroupStatsItem {
pub row_group_index: usize,
pub metadata: Arc<RowGroupMetaData>,
}
pub type SendableFileStatsStream =
Pin<Box<dyn Stream<Item = Result<FileStatsItem, BoxedError>> + Send>>;
/// A scanner that provides a way to scan the region concurrently.
///
/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
@@ -456,6 +500,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError>;
/// Returns file-level stats for the current scan context.
///
/// This method is read-only and does not scan row data.
fn scan_stats(&self, ctx: &QueryScanContext) -> Result<SendableFileStatsStream, BoxedError>;
/// Check if there is any predicate exclude region partition exprs that may be executed in this scanner.
fn has_predicate_without_region(&self) -> bool;
@@ -1016,6 +1065,10 @@ impl RegionScanner for SinglePartitionScanner {
Ok(result.unwrap())
}
fn scan_stats(&self, _ctx: &QueryScanContext) -> Result<SendableFileStatsStream, BoxedError> {
Ok(Box::pin(futures::stream::empty()))
}
fn has_predicate_without_region(&self) -> bool {
false
}

View File

@@ -34,8 +34,7 @@ use snafu::ResultExt;
use crate::error;
#[cfg(test)]
mod stats;
pub mod stats;
/// Assert the scalar value is not utf8. Returns `None` if it's utf8.
/// In theory, it should be converted to a timestamp scalar value by `TypeConversionRule`.

View File

@@ -66,6 +66,7 @@ pub struct RegionScanExec {
is_partition_set: bool,
// TODO(ruihang): handle TimeWindowed dist via this parameter
distribution: Option<TimeSeriesDistribution>,
stats_aware_skip_mode: bool,
explain_verbose: bool,
query_memory_tracker: Option<QueryMemoryTracker>,
}
@@ -82,6 +83,7 @@ impl std::fmt::Debug for RegionScanExec {
.field("total_rows", &self.total_rows)
.field("is_partition_set", &self.is_partition_set)
.field("distribution", &self.distribution)
.field("stats_aware_skip_mode", &self.stats_aware_skip_mode)
.field("explain_verbose", &self.explain_verbose)
.finish()
}
@@ -225,6 +227,7 @@ impl RegionScanExec {
total_rows,
is_partition_set: false,
distribution: request.distribution,
stats_aware_skip_mode: false,
explain_verbose: false,
query_memory_tracker,
})
@@ -260,6 +263,10 @@ impl RegionScanExec {
scanner.name().to_string()
}
pub fn scanner(&self) -> Arc<Mutex<RegionScannerRef>> {
self.scanner.clone()
}
/// Update the partition ranges of underlying scanner.
pub fn with_new_partitions(
&self,
@@ -298,11 +305,47 @@ impl RegionScanExec {
total_rows: self.total_rows,
is_partition_set: true,
distribution: self.distribution,
stats_aware_skip_mode: self.stats_aware_skip_mode,
explain_verbose: self.explain_verbose,
query_memory_tracker: self.query_memory_tracker.clone(),
})
}
pub fn with_stats_aware_skip_mode(
&self,
stats_aware_skip_mode: bool,
) -> Result<Self, BoxedError> {
{
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(
PrepareRequest::default().with_stats_aware_skip_mode(stats_aware_skip_mode),
)?;
}
Ok(Self {
scanner: self.scanner.clone(),
arrow_schema: self.arrow_schema.clone(),
output_ordering: self.output_ordering.clone(),
metric: self.metric.clone(),
properties: self.properties.clone(),
append_mode: self.append_mode,
total_rows: self.total_rows,
is_partition_set: self.is_partition_set,
distribution: self.distribution,
stats_aware_skip_mode,
explain_verbose: self.explain_verbose,
query_memory_tracker: self.query_memory_tracker.clone(),
})
}
pub fn stats_aware_skip_mode(&self) -> bool {
self.stats_aware_skip_mode
}
pub fn append_mode(&self) -> bool {
self.append_mode
}
pub fn distribution(&self) -> Option<TimeSeriesDistribution> {
self.distribution
}