feat: region scan skip&rewrite

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-24 16:34:30 +08:00
parent 18590e36fb
commit 08c66ab00b
12 changed files with 1131 additions and 132 deletions

View File

@@ -18,67 +18,9 @@ use std::collections::{HashMap, HashSet};
use datafusion::parquet::file::statistics::Statistics as ParquetStats;
use datafusion::scalar::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::expressions::{Column as PhysicalColumn, Literal};
use datatypes::schema::SchemaRef as RegionSchemaRef;
use datatypes::value::Value;
use store_api::region_engine::FileStatsItem;
/// Runtime requirement that has already been approved by optimizer rewrite checks.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SupportStatAggr {
CountRows,
CountNonNull { column_name: String },
MinValue { column_name: String },
MaxValue { column_name: String },
}
impl SupportStatAggr {
pub fn is_count_supported_expr(inputs: &[std::sync::Arc<dyn PhysicalExpr>]) -> bool {
match inputs {
[] => true,
[arg] if let Some(lit) = arg.as_any().downcast_ref::<Literal>() => {
lit.value() == &COUNT_STAR_EXPANSION
}
[arg] => arg.as_any().downcast_ref::<PhysicalColumn>().is_some(),
_ => false,
}
}
pub fn is_min_max_supported_expr(inputs: &[std::sync::Arc<dyn PhysicalExpr>]) -> bool {
match inputs {
[arg] => arg.as_any().downcast_ref::<PhysicalColumn>().is_some(),
_ => false,
}
}
pub fn from_aggr_expr(aggr: &AggregateFunctionExpr) -> Option<Self> {
match (aggr.fun().name(), aggr.expressions().as_slice()) {
("count", []) => Some(Self::CountRows),
("count", [arg]) if arg.as_any().downcast_ref::<Literal>().is_some() => {
Some(Self::CountRows)
}
("count", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
Some(Self::CountNonNull {
column_name: col.name().to_string(),
})
}
("min", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
Some(Self::MinValue {
column_name: col.name().to_string(),
})
}
("max", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
Some(Self::MaxValue {
column_name: col.name().to_string(),
})
}
_ => None,
}
}
}
use store_api::region_engine::{FileStatsItem, SupportStatAggr};
#[derive(Debug, Clone, Default, PartialEq)]
pub struct FileColumnStats {
@@ -219,6 +161,10 @@ fn collect_one_column_stats(
}
fn sum_null_counts(file_stats: &FileStatsItem, column_index: usize) -> Result<Option<u64>> {
if file_stats.row_groups.is_empty() {
return Ok(None);
}
let mut total = 0_u64;
for row_group in &file_stats.row_groups {
let Some(stats) = row_group.metadata.column(column_index).statistics() else {

View File

@@ -18,7 +18,7 @@ use api::v1::Rows;
use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use partition::expr::col;
use store_api::region_engine::RegionEngine;
use store_api::region_engine::{RegionEngine, SupportStatAggr};
use store_api::region_request::{
EnterStagingRequest, RegionFlushRequest, RegionRequest, StagingPartitionDirective,
};
@@ -172,3 +172,93 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) {
flat_format
);
}
#[tokio::test]
async fn test_stats_aware_skip_requirements_skip_eligible_sst() {
test_stats_aware_skip_requirements_skip_eligible_sst_with_format(false).await;
test_stats_aware_skip_requirements_skip_eligible_sst_with_format(true).await;
}
async fn test_stats_aware_skip_requirements_skip_eligible_sst_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1025, 0);
let partition_expr = range_expr_string("field_0", 0., 99.);
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(partition_expr))
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let flushed_rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, flushed_rows).await;
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let memtable_rows = Rows {
schema: column_schemas,
rows: build_rows(3, 5),
};
put_rows(&engine, region_id, memtable_rows).await;
let request = ScanRequest {
projection_input: Some(vec![1].into()),
..Default::default()
};
let baseline_scanner = engine.scanner(region_id, request.clone()).await.unwrap();
let baseline_batches = RecordBatches::try_collect(baseline_scanner.scan().await.unwrap())
.await
.unwrap();
assert_eq!(
r#"+---------+
| field_0 |
+---------+
| 0.0 |
| 1.0 |
| 2.0 |
| 3.0 |
| 4.0 |
+---------+"#,
baseline_batches.pretty_print().unwrap()
);
let mut skip_scanner = engine.scanner(region_id, request).await.unwrap();
skip_scanner.set_stats_aware_skip_requirements(vec![SupportStatAggr::CountRows]);
let skipped_batches = RecordBatches::try_collect(skip_scanner.scan().await.unwrap())
.await
.unwrap();
assert_eq!(
r#"+---------+
| field_0 |
+---------+
| 3.0 |
| 4.0 |
+---------+"#,
skipped_batches.pretty_print().unwrap(),
"stats-aware skip should drop the flushed SST rows and only leave memtable rows"
);
}

View File

@@ -22,7 +22,7 @@ use std::time::Instant;
use common_telemetry::debug;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::region_engine::PartitionRange;
use store_api::region_engine::{PartitionRange, ScannerProperties, SupportStatAggr};
use store_api::storage::FileId;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
@@ -38,6 +38,33 @@ use crate::sst::parquet::reader::ReaderMetrics;
/// Number of files to pre-fetch ahead of the current position.
const PREFETCH_COUNT: usize = 8;
#[derive(Clone)]
pub(crate) struct StatsAwareSkipConfig {
requirements: Arc<[SupportStatAggr]>,
}
impl StatsAwareSkipConfig {
fn new(requirements: Vec<SupportStatAggr>) -> Option<Self> {
if requirements.is_empty() {
None
} else {
Some(Self {
requirements: requirements.into(),
})
}
}
pub(crate) fn requirements(&self) -> &[SupportStatAggr] {
&self.requirements
}
}
pub(crate) fn stats_aware_skip_config(
properties: &ScannerProperties,
) -> Option<StatsAwareSkipConfig> {
StatsAwareSkipConfig::new(properties.stats_aware_skip_requirements().to_vec())
}
/// Local pruner in a partition that supports prefetching files to prune.
pub struct PartitionPruner {
pruner: Arc<Pruner>,
@@ -47,11 +74,16 @@ pub struct PartitionPruner {
pre_filter_modes: Vec<PreFilterMode>,
/// Current position for tracking pre-fetch progress.
current_position: AtomicUsize,
stats_aware_skip: Option<StatsAwareSkipConfig>,
}
impl PartitionPruner {
/// Creates a new `PartitionPruner` for the given partition ranges.
pub fn new(pruner: Arc<Pruner>, partition_ranges: &[PartitionRange]) -> Self {
pub fn new(
pruner: Arc<Pruner>,
partition_ranges: &[PartitionRange],
stats_aware_skip: Option<StatsAwareSkipConfig>,
) -> Self {
let num_files = pruner.inner.stream_ctx.input.num_files();
let mut file_indices = Vec::with_capacity(num_files);
let mut pre_filter_modes = vec![PreFilterMode::SkipFields; num_files];
@@ -84,6 +116,7 @@ impl PartitionPruner {
file_indices,
pre_filter_modes,
current_position: AtomicUsize::new(0),
stats_aware_skip,
}
}
@@ -103,7 +136,13 @@ impl PartitionPruner {
// Delegate to underlying Pruner
let ranges = self
.pruner
.build_file_ranges(index, pre_filter_mode, partition_metrics, reader_metrics)
.build_file_ranges(
index,
pre_filter_mode,
partition_metrics,
reader_metrics,
self.stats_aware_skip.as_ref(),
)
.await?;
// Find position and trigger pre-fetch for upcoming files
@@ -129,6 +168,7 @@ impl PartitionPruner {
file_index,
pre_filter_mode,
Some(partition_metrics.clone()),
self.stats_aware_skip.clone(),
);
}
}
@@ -161,6 +201,8 @@ struct PrunerInner {
struct FileBuilderEntry {
/// Cached builder after pruning. None if not yet built or already cleared.
builder: Option<Arc<FileRangeBuilder>>,
/// Stats-aware skip requirements used to build the cached builder.
stats_aware_skip_requirements: Option<Arc<[SupportStatAggr]>>,
/// Number of remaining ranges to scan for this file.
/// When this reaches 0, the builder is dropped for memory cleanup.
remaining_ranges: usize,
@@ -168,6 +210,27 @@ struct FileBuilderEntry {
waiters: Vec<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
}
impl FileBuilderEntry {
fn clear_builder_if_skip_requirements_changed(
&mut self,
stats_aware_skip: Option<&StatsAwareSkipConfig>,
) {
if self.builder.is_some() && !self.matches_skip_requirements(stats_aware_skip) {
self.builder = None;
self.stats_aware_skip_requirements = None;
PRUNER_ACTIVE_BUILDERS.dec();
}
}
fn matches_skip_requirements(&self, stats_aware_skip: Option<&StatsAwareSkipConfig>) -> bool {
match (&self.stats_aware_skip_requirements, stats_aware_skip) {
(None, None) => true,
(Some(cached), Some(current)) => cached.as_ref() == current.requirements(),
_ => false,
}
}
}
/// Request to prune a file.
struct PruneRequest {
/// Index of the file in ScanInput.files.
@@ -178,6 +241,8 @@ struct PruneRequest {
response_tx: Option<oneshot::Sender<Result<Arc<FileRangeBuilder>>>>,
/// Partition metrics for merging reader metrics.
partition_metrics: Option<PartitionMetrics>,
/// Optional stats-aware skip config for this request.
stats_aware_skip: Option<StatsAwareSkipConfig>,
}
impl Pruner {
@@ -191,6 +256,7 @@ impl Pruner {
.map(|_| {
Mutex::new(FileBuilderEntry {
builder: None,
stats_aware_skip_requirements: None,
remaining_ranges: 0,
waiters: Vec::new(),
})
@@ -254,6 +320,7 @@ impl Pruner {
pre_filter_mode: PreFilterMode,
partition_metrics: &PartitionMetrics,
reader_metrics: &mut ReaderMetrics,
stats_aware_skip: Option<&StatsAwareSkipConfig>,
) -> Result<SmallVec<[FileRange; 2]>> {
let file_index = index.index - self.inner.stream_ctx.input.num_memtables();
@@ -264,6 +331,7 @@ impl Pruner {
pre_filter_mode,
partition_metrics,
reader_metrics,
stats_aware_skip,
)
.await?;
@@ -284,10 +352,12 @@ impl Pruner {
pre_filter_mode: PreFilterMode,
partition_metrics: &PartitionMetrics,
reader_metrics: &mut ReaderMetrics,
stats_aware_skip: Option<&StatsAwareSkipConfig>,
) -> Result<Arc<FileRangeBuilder>> {
// Fast path: checks cache
{
let entry = self.inner.file_entries[file_index].lock().unwrap();
let mut entry = self.inner.file_entries[file_index].lock().unwrap();
entry.clear_builder_if_skip_requirements_changed(stats_aware_skip);
if let Some(builder) = &entry.builder {
reader_metrics.filter_metrics.pruner_cache_hit += 1;
return Ok(builder.clone());
@@ -306,13 +376,19 @@ impl Pruner {
pre_filter_mode,
response_tx: Some(response_tx),
partition_metrics: Some(partition_metrics.clone()),
stats_aware_skip: stats_aware_skip.cloned(),
};
let result = if self.worker_senders[worker_idx].send(request).await.is_err() {
common_telemetry::warn!("Worker channel closed, falling back to direct pruning");
// Worker channel closed, falls back to direct pruning
self.prune_file_directly(file_index, pre_filter_mode, reader_metrics)
.await
self.prune_file_directly(
file_index,
pre_filter_mode,
reader_metrics,
stats_aware_skip,
)
.await
} else {
// Waits for response
match response_rx.await {
@@ -322,8 +398,13 @@ impl Pruner {
"Response channel closed, falling back to direct pruning"
);
// Channel closed, falls back to direct pruning
self.prune_file_directly(file_index, pre_filter_mode, reader_metrics)
.await
self.prune_file_directly(
file_index,
pre_filter_mode,
reader_metrics,
stats_aware_skip,
)
.await
}
}
};
@@ -337,10 +418,12 @@ impl Pruner {
file_index: usize,
pre_filter_mode: PreFilterMode,
partition_metrics: Option<PartitionMetrics>,
stats_aware_skip: Option<StatsAwareSkipConfig>,
) {
// Fast path: checks cache
{
let entry = self.inner.file_entries[file_index].lock().unwrap();
let mut entry = self.inner.file_entries[file_index].lock().unwrap();
entry.clear_builder_if_skip_requirements_changed(stats_aware_skip.as_ref());
if entry.builder.is_some() {
return;
}
@@ -355,6 +438,7 @@ impl Pruner {
pre_filter_mode,
response_tx: None,
partition_metrics,
stats_aware_skip,
};
// Sends request to worker
@@ -373,13 +457,19 @@ impl Pruner {
file_index: usize,
pre_filter_mode: PreFilterMode,
reader_metrics: &mut ReaderMetrics,
stats_aware_skip: Option<&StatsAwareSkipConfig>,
) -> Result<Arc<FileRangeBuilder>> {
let file = &self.inner.stream_ctx.input.files[file_index];
let builder = self
.inner
.stream_ctx
.input
.prune_file(file, pre_filter_mode, reader_metrics)
.prune_file(
file,
pre_filter_mode,
reader_metrics,
stats_aware_skip.map(StatsAwareSkipConfig::requirements),
)
.await?;
let arc_builder = Arc::new(builder);
@@ -391,6 +481,8 @@ impl Pruner {
reader_metrics.metadata_mem_size += arc_builder.memory_size() as isize;
reader_metrics.num_range_builders += 1;
entry.builder = Some(arc_builder.clone());
entry.stats_aware_skip_requirements =
stats_aware_skip.map(|config| config.requirements.clone());
PRUNER_ACTIVE_BUILDERS.inc();
}
}
@@ -406,6 +498,7 @@ impl Pruner {
if entry.remaining_ranges == 0
&& let Some(builder) = entry.builder.take()
{
entry.stats_aware_skip_requirements = None;
PRUNER_ACTIVE_BUILDERS.dec();
reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
reader_metrics.num_range_builders -= 1;
@@ -428,11 +521,13 @@ impl Pruner {
pre_filter_mode,
response_tx,
partition_metrics,
stats_aware_skip,
} = request;
// Check if already cached or in-progress
{
let entry = inner.file_entries[file_index].lock().unwrap();
let mut entry = inner.file_entries[file_index].lock().unwrap();
entry.clear_builder_if_skip_requirements_changed(stats_aware_skip.as_ref());
if let Some(builder) = &entry.builder {
// Cache hit - send immediately
if let Some(response_tx) = response_tx {
@@ -451,7 +546,14 @@ impl Pruner {
let result = inner
.stream_ctx
.input
.prune_file(file, pre_filter_mode, &mut metrics)
.prune_file(
file,
pre_filter_mode,
&mut metrics,
stats_aware_skip
.as_ref()
.map(StatsAwareSkipConfig::requirements),
)
.await;
// Update state and notify waiters
@@ -459,8 +561,13 @@ impl Pruner {
match result {
Ok(builder) => {
let arc_builder = Arc::new(builder);
entry.builder = Some(arc_builder.clone());
PRUNER_ACTIVE_BUILDERS.inc();
if entry.builder.is_none() {
entry.builder = Some(arc_builder.clone());
entry.stats_aware_skip_requirements = stats_aware_skip
.as_ref()
.map(|config| config.requirements.clone());
PRUNER_ACTIVE_BUILDERS.inc();
}
// Notify all waiters
for waiter in entry.waiters.drain(..) {

View File

@@ -22,7 +22,9 @@ use std::time::Instant;
use api::v1::SemanticType;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_query::aggr_stats::StatsCandidateFile;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::tracing::Instrument;
@@ -33,13 +35,14 @@ use datafusion_common::Column;
use datafusion_expr::Expr;
use datafusion_expr::utils::expr_to_columns;
use futures::StreamExt;
use parquet::file::metadata::ParquetMetaData;
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
use snafu::{OptionExt as _, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{
FileStatsItem, PartitionRange, QueryScanContext, RegionScannerRef, RowGroupStatsItem,
SendableFileStatsStream,
SendableFileStatsStream, SupportStatAggr,
};
use store_api::storage::{
ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
@@ -52,7 +55,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
use crate::error::{ExternalSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
use crate::memtable::{MemtableRange, RangesOptions};
@@ -171,6 +174,17 @@ impl Scanner {
Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
}
}
pub(crate) fn set_stats_aware_skip_requirements(&mut self, requirements: Vec<SupportStatAggr>) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
let request = PrepareRequest::default().with_stats_aware_skip_requirements(requirements);
match self {
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -1088,6 +1102,29 @@ impl ScanInput {
}
}
fn should_skip_file_by_stats(
&self,
file: &FileHandle,
parquet_meta: &ParquetMetaData,
requirements: &[SupportStatAggr],
) -> Result<bool> {
let file_stats = build_file_stats_item(file, parquet_meta)?;
let region_partition_expr = self.region_metadata().partition_expr.clone();
let candidate = StatsCandidateFile::from_file_stats(
&file_stats,
region_partition_expr.as_deref(),
requirements,
&self.region_metadata().schema,
)
.map_err(|error| {
BoxedError::new(PlainError::new(error.to_string(), StatusCode::Unexpected))
})
.context(ExternalSnafu {
context: "failed to classify file stats for stats-aware skip mode",
})?;
Ok(candidate.is_some())
}
/// Prunes a file to scan and returns the builder to build readers.
#[tracing::instrument(
skip_all,
@@ -1101,7 +1138,23 @@ impl ScanInput {
file: &FileHandle,
pre_filter_mode: PreFilterMode,
reader_metrics: &mut ReaderMetrics,
stats_aware_skip_requirements: Option<&[SupportStatAggr]>,
) -> Result<FileRangeBuilder> {
if let Some(requirements) =
stats_aware_skip_requirements.filter(|requirements| !requirements.is_empty())
{
let sst_meta = self
.access_layer
.read_sst(file.clone())
.cache(self.cache_strategy.clone())
.expected_metadata(Some(self.region_metadata().clone()))
.read_sst_meta()
.await?;
if self.should_skip_file_by_stats(file, &sst_meta.parquet_metadata(), requirements)? {
return Ok(FileRangeBuilder::default());
}
}
let predicate = self.predicate_for_file(file);
let decode_pk_values = !self.compaction
&& self
@@ -1454,6 +1507,40 @@ pub struct StreamContext {
pub(crate) query_start: Instant,
}
fn build_file_stats_item(
file: &FileHandle,
parquet_meta: &ParquetMetaData,
) -> Result<FileStatsItem> {
let row_groups = parquet_meta
.row_groups()
.iter()
.enumerate()
.map(|(row_group_index, metadata)| RowGroupStatsItem {
row_group_index,
metadata: Arc::new(metadata.clone()),
})
.collect();
let file_partition_expr = file
.meta_ref()
.partition_expr
.as_ref()
.map(|expr| expr.as_json_str())
.transpose()
.map_err(|error| {
BoxedError::new(PlainError::new(error.to_string(), StatusCode::Unexpected))
})
.context(ExternalSnafu {
context: "failed to serialize file partition expr for stats-aware skip mode",
})?;
Ok(FileStatsItem {
num_rows: Some(parquet_meta.file_metadata().num_rows() as u64),
file_partition_expr,
row_groups,
})
}
pub(crate) fn scan_input_stats(
input: &ScanInput,
ctx: &QueryScanContext,
@@ -1481,22 +1568,8 @@ pub(crate) fn scan_input_stats(
.read_sst_meta()
.await
.map_err(BoxedError::new)?;
let parquet_meta = sst_meta.parquet_metadata();
let row_groups = parquet_meta
.row_groups()
.iter()
.enumerate()
.map(|(row_group_index, metadata)| RowGroupStatsItem {
row_group_index,
metadata: Arc::new(metadata.clone()),
})
.collect();
yield FileStatsItem {
num_rows: Some(parquet_meta.file_metadata().num_rows() as u64),
file_partition_expr: file.meta_ref().partition_expr.as_ref().map(ToString::to_string),
row_groups,
};
yield build_file_stats_item(&file, &sst_meta.parquet_metadata())
.map_err(BoxedError::new)?;
}
})
}
@@ -1842,11 +1915,22 @@ impl PredicateGroup {
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use bytes::Bytes;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::expressions::lit as physical_lit;
use datafusion_common::ScalarValue;
use datafusion_expr::{col, lit};
use datatypes::arrow::array::StringArray;
use datatypes::arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
use datatypes::arrow::record_batch::RecordBatch as ArrowRecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use partition::expr::col as partition_col;
use partition::expr::{PartitionExpr, col as partition_col};
use store_api::metadata::RegionMetadataBuilder;
use store_api::storage::{
ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
@@ -1857,6 +1941,8 @@ mod tests {
use crate::memtable::time_partition::TimePartitions;
use crate::read::range_cache::ScanRequestFingerprintBuilder;
use crate::region::version::VersionBuilder;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::NoopFilePurger;
use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
use crate::test_util::scheduler_util::SchedulerEnv;
@@ -1889,6 +1975,62 @@ mod tests {
.with_files(vec![file])
}
fn metadata_with_partition_expr(partition_expr_json: String) -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(store_api::storage::RegionId::new(1, 1));
builder
.push_column_metadata(store_api::metadata::ColumnMetadata {
column_schema: ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(store_api::metadata::ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![1])
.partition_expr_json(Some(partition_expr_json));
Arc::new(builder.build_without_validation().unwrap())
}
fn file_with_partition_expr(partition_expr_json: &str) -> FileHandle {
let mut file_meta = FileMeta::default();
file_meta.partition_expr = PartitionExpr::from_json_str(partition_expr_json).unwrap();
file_meta.num_row_groups = 1;
FileHandle::new(file_meta, Arc::new(NoopFilePurger))
}
fn parquet_metadata_for_hosts(hosts: &[Option<&str>]) -> Arc<ParquetMetaData> {
let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"host",
DataType::Utf8,
true,
)]));
let mut buffer = std::io::Cursor::new(Vec::new());
let mut writer = ArrowWriter::try_new(
&mut buffer,
arrow_schema.clone(),
Some(WriterProperties::builder().build()),
)
.unwrap();
let batch = ArrowRecordBatch::try_new(
arrow_schema,
vec![Arc::new(StringArray::from(hosts.to_vec()))],
)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer.into_inner()))
.unwrap()
.metadata()
.clone()
}
#[tokio::test]
async fn test_build_read_column_ids_includes_filters() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
@@ -2133,4 +2275,64 @@ mod tests {
assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
}
}
#[tokio::test]
async fn test_should_skip_file_by_stats_when_shared_classifier_accepts_file() {
let partition_expr_json = partition_col("host")
.eq(Value::String("foo".into()))
.as_json_str()
.unwrap();
let metadata = metadata_with_partition_expr(partition_expr_json.clone());
let env = SchedulerEnv::new().await;
let mapper = FlatProjectionMapper::new(&metadata, [0, 1].into_iter()).unwrap();
let predicate =
PredicateGroup::new(metadata.as_ref(), &[col("host").eq(lit("foo"))]).unwrap();
let file = file_with_partition_expr(&partition_expr_json);
let input = ScanInput::new(env.access_layer.clone(), mapper)
.with_predicate(predicate)
.with_files(vec![file.clone()]);
let parquet_meta = parquet_metadata_for_hosts(&[Some("bar"), Some("foo")]);
let should_skip = input
.should_skip_file_by_stats(
&file,
&parquet_meta,
&[SupportStatAggr::MaxValue {
column_name: "host".to_string(),
}],
)
.unwrap();
assert!(should_skip);
}
#[tokio::test]
async fn test_should_not_skip_file_by_stats_when_required_stats_are_missing() {
let partition_expr_json = partition_col("host")
.eq(Value::String("foo".into()))
.as_json_str()
.unwrap();
let metadata = metadata_with_partition_expr(partition_expr_json.clone());
let env = SchedulerEnv::new().await;
let mapper = FlatProjectionMapper::new(&metadata, [0, 1].into_iter()).unwrap();
let predicate =
PredicateGroup::new(metadata.as_ref(), &[col("host").eq(lit("foo"))]).unwrap();
let file = file_with_partition_expr(&partition_expr_json);
let input = ScanInput::new(env.access_layer.clone(), mapper)
.with_predicate(predicate)
.with_files(vec![file.clone()]);
let parquet_meta = parquet_metadata_for_hosts(&[None, None]);
let should_skip = input
.should_skip_file_by_stats(
&file,
&parquet_meta,
&[SupportStatAggr::MaxValue {
column_name: "host".to_string(),
}],
)
.unwrap();
assert!(!should_skip);
}
}

View File

@@ -40,7 +40,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeReader;
use crate::read::last_row::FlatLastRowReader;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::pruner::{PartitionPruner, Pruner, stats_aware_skip_config};
use crate::read::range::RangeMeta;
use crate::read::range_cache::{
build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream,
@@ -158,7 +158,7 @@ impl SeqScan {
pruner: Arc<Pruner>,
) -> Result<BoxedRecordBatchStream> {
pruner.add_partition_ranges(partition_ranges);
let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges, None));
let mut sources = Vec::new();
for part_range in partition_ranges {
@@ -385,12 +385,17 @@ impl SeqScan {
let compaction = self.stream_ctx.input.compaction;
let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
let pruner = self.pruner.clone();
let stats_aware_skip = stats_aware_skip_config(&self.properties);
// Initializes ref counts for the pruner.
// If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
// then the ref count won't be decremented.
// This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
pruner.add_partition_ranges(&partition_ranges);
let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
let partition_pruner = Arc::new(PartitionPruner::new(
pruner,
&partition_ranges,
stats_aware_skip,
));
let stream = try_stream! {
part_metrics.on_first_poll();

View File

@@ -46,7 +46,7 @@ use crate::error::{
ScanSeriesSnafu, TooManyFilesToReadSnafu,
};
use crate::read::ScannerMetrics;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::pruner::{PartitionPruner, Pruner, stats_aware_skip_config};
use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size,
@@ -232,6 +232,7 @@ impl SeriesScan {
final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
partitions: self.properties.partitions.clone(),
pruner: self.pruner.clone(),
stats_aware_skip: stats_aware_skip_config(&self.properties),
senders,
metrics_set: metrics_set.clone(),
metrics_list: metrics_list.clone(),
@@ -441,6 +442,8 @@ struct SeriesDistributor {
partitions: Vec<Vec<PartitionRange>>,
/// Shared pruner for file range building.
pruner: Arc<Pruner>,
/// Optional stats-aware skip config for aggregate-stats runtime execution.
stats_aware_skip: Option<crate::read::pruner::StatsAwareSkipConfig>,
/// Senders of all partitions.
senders: SenderList,
/// Metrics set to report.
@@ -484,6 +487,7 @@ impl SeriesDistributor {
let partition_pruner = Arc::new(PartitionPruner::new(
self.pruner.clone(),
&all_partition_ranges,
self.stats_aware_skip.clone(),
));
let part_metrics = new_partition_metrics(

View File

@@ -34,7 +34,7 @@ use store_api::region_engine::{
};
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::pruner::{PartitionPruner, Pruner, stats_aware_skip_config};
use crate::read::scan_region::{ScanInput, StreamContext, scan_input_stats};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges,
@@ -249,12 +249,14 @@ impl UnorderedScan {
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let pruner = self.pruner.clone();
let stats_aware_skip = stats_aware_skip_config(&self.properties);
// Initializes ref counts for the pruner.
// If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
// then the ref count won't be decremented.
// This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
pruner.add_partition_ranges(&part_ranges);
let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
let partition_pruner =
Arc::new(PartitionPruner::new(pruner, &part_ranges, stats_aware_skip));
let stream = try_stream! {
part_metrics.on_first_poll();

View File

@@ -19,11 +19,15 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_common::Result;
use datafusion::physical_plan::union::UnionExec;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result};
use table::table::scan::RegionScanExec;
use crate::optimizer::aggr_stats::support_aggr::SupportStatAggr;
use crate::optimizer::aggr_stats::stat_scan::StatsScanExec;
use crate::optimizer::aggr_stats::support_aggr::{
SupportStatAggr, support_stat_aggr_from_aggr_expr,
};
pub(crate) mod stat_scan;
pub(crate) mod support_aggr;
@@ -56,11 +60,10 @@ impl PhysicalOptimizerRule for AggrStatsPhysicalRule {
impl AggrStatsPhysicalRule {
fn rewrite_plan_shape(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(|plan| {
let Some(_rewrite_target) = RewriteTarget::extract(&plan) else {
let Some(rewrite_target) = RewriteTarget::extract(&plan) else {
return Ok(Transformed::no(plan));
};
// impl rewrite in RewriteTarget
Ok(Transformed::no(plan))
rewrite_target.rewrite().map(Transformed::yes)
})
.map(|res| res.data)
}
@@ -134,7 +137,9 @@ impl<'a> RewriteTarget<'a> {
let aggr_exprs = aggregate_exec
.aggr_expr()
.iter()
.map(|aggr_expr| SupportStatAggr::from_aggr_expr(aggr_expr))
.map(|aggr_expr| {
support_stat_aggr_from_aggr_expr(aggr_expr.as_ref(), &region_scan.time_index())
})
.try_collect()?;
let zelf = Self::FinalOverPartial {
final_exec: aggregate_exec,
@@ -150,6 +155,58 @@ impl<'a> RewriteTarget<'a> {
}
}
fn rewrite(&self) -> Result<Arc<dyn ExecutionPlan>> {
match self {
Self::FinalOverPartial {
final_exec,
partial_exec,
region_scan,
keep_coalesce,
aggr_exprs,
} => {
let requirements = aggr_exprs.clone();
let stats_scan = Arc::new(StatsScanExec::new(
partial_exec.schema(),
requirements.clone(),
region_scan.scanner(),
));
let fallback_scan = Arc::new(
region_scan
.with_stats_aware_skip_requirements(requirements)
.map_err(|error| DataFusionError::External(error.into()))?,
);
let fallback_partial = Arc::new(AggregateExec::try_new(
*partial_exec.mode(),
Arc::new(partial_exec.group_expr().clone()),
partial_exec.aggr_expr().to_vec(),
partial_exec.filter_expr().to_vec(),
fallback_scan,
partial_exec.input_schema(),
)?);
let union = UnionExec::try_new(vec![stats_scan, fallback_partial])?;
let merge_input: Arc<dyn ExecutionPlan> =
if *keep_coalesce || union.properties().partitioning.partition_count() > 1 {
Arc::new(CoalescePartitionsExec::new(union))
} else {
union
};
let final_aggregate = AggregateExec::try_new(
*final_exec.mode(),
Arc::new(final_exec.group_expr().clone()),
final_exec.aggr_expr().to_vec(),
final_exec.filter_expr().to_vec(),
merge_input,
final_exec.input_schema(),
)?;
Ok(Arc::new(final_aggregate))
}
}
}
fn first_stage_aggregate(&self) -> &'a AggregateExec {
match self {
RewriteTarget::FinalOverPartial { partial_exec, .. } => partial_exec,
@@ -162,3 +219,366 @@ impl<'a> RewriteTarget<'a> {
}
}
}
#[cfg(test)]
mod tests {
use api::v1::SemanticType;
use common_recordbatch::EmptyRecordBatchStream;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::physical_plan::aggregates::PhysicalGroupBy;
use datafusion::scalar::ScalarValue;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::{Column as PhysicalColumn, Literal};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::region_engine::SinglePartitionScanner;
use store_api::storage::{RegionId, ScanRequest};
use super::*;
fn build_count_expr(schema: arrow_schema::SchemaRef) -> Arc<AggregateFunctionExpr> {
Arc::new(
AggregateExprBuilder::new(count_udaf(), vec![Arc::new(PhysicalColumn::new("v0", 0))])
.schema(schema)
.alias("count(v0)")
.build()
.unwrap(),
)
}
fn build_count_time_index_expr(schema: arrow_schema::SchemaRef) -> Arc<AggregateFunctionExpr> {
Arc::new(
AggregateExprBuilder::new(count_udaf(), vec![Arc::new(PhysicalColumn::new("ts", 1))])
.schema(schema)
.alias("count(ts)")
.build()
.unwrap(),
)
}
fn build_count_star_expr(schema: arrow_schema::SchemaRef) -> Arc<AggregateFunctionExpr> {
Arc::new(
AggregateExprBuilder::new(
count_udaf(),
vec![Arc::new(Literal::new(COUNT_STAR_EXPANSION.clone()))],
)
.schema(schema)
.alias("count(*)")
.build()
.unwrap(),
)
}
fn build_count_null_expr(schema: arrow_schema::SchemaRef) -> Arc<AggregateFunctionExpr> {
Arc::new(
AggregateExprBuilder::new(
count_udaf(),
vec![Arc::new(Literal::new(ScalarValue::Null))],
)
.schema(schema)
.alias("count(NULL)")
.build()
.unwrap(),
)
}
fn build_avg_expr(schema: arrow_schema::SchemaRef) -> Arc<AggregateFunctionExpr> {
Arc::new(
AggregateExprBuilder::new(avg_udaf(), vec![Arc::new(PhysicalColumn::new("v0", 0))])
.schema(schema)
.alias("avg(v0)")
.build()
.unwrap(),
)
}
fn group_by_v0() -> PhysicalGroupBy {
PhysicalGroupBy::new_single(vec![(
Arc::new(PhysicalColumn::new("v0", 0)),
"v0".to_string(),
)])
}
fn build_region_scan(append_mode: bool) -> Arc<RegionScanExec> {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("v0", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
]));
let stream = Box::pin(EmptyRecordBatchStream::new(schema.clone()));
let mut metadata_builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
metadata_builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("v0", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![]);
let metadata = Arc::new(metadata_builder.build().unwrap());
let scanner = Box::new(SinglePartitionScanner::new(
stream,
append_mode,
metadata,
None,
));
Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap())
}
fn build_final_over_partial_plan() -> Arc<dyn ExecutionPlan> {
build_final_over_partial_plan_with(build_region_scan(true), build_count_expr, None)
}
fn build_final_over_partial_plan_with(
region_scan: Arc<RegionScanExec>,
build_aggr_expr: fn(arrow_schema::SchemaRef) -> Arc<AggregateFunctionExpr>,
group_by: Option<PhysicalGroupBy>,
) -> Arc<dyn ExecutionPlan> {
let input_schema = region_scan.schema();
let aggr_expr = build_aggr_expr(input_schema.clone());
let group_by = group_by.unwrap_or_default();
let partial = Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
vec![aggr_expr.clone()],
vec![None],
region_scan,
input_schema.clone(),
)
.unwrap(),
);
let coalesce = Arc::new(CoalescePartitionsExec::new(partial));
Arc::new(
AggregateExec::try_new(
AggregateMode::Final,
group_by,
vec![aggr_expr],
vec![None],
coalesce,
input_schema,
)
.unwrap(),
)
}
#[test]
fn rewrite_builds_stats_scan_union_with_stats_aware_fallback() {
let plan = build_final_over_partial_plan();
let optimized = AggrStatsPhysicalRule
.optimize(plan, &ConfigOptions::default())
.unwrap();
let final_exec = optimized.as_any().downcast_ref::<AggregateExec>().unwrap();
assert!(matches!(final_exec.mode(), AggregateMode::Final));
let coalesce = final_exec
.input()
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.unwrap();
let union = coalesce
.input()
.as_any()
.downcast_ref::<UnionExec>()
.unwrap();
let union_children = union.children();
assert_eq!(union_children.len(), 2);
let stats_scan = union_children[0]
.as_any()
.downcast_ref::<StatsScanExec>()
.unwrap();
assert_eq!(
stats_scan.requirements(),
&[SupportStatAggr::CountNonNull {
column_name: "v0".to_string(),
}]
);
let fallback_partial = union_children[1]
.as_any()
.downcast_ref::<AggregateExec>()
.unwrap();
assert!(matches!(fallback_partial.mode(), AggregateMode::Partial));
let fallback_scan = fallback_partial
.input()
.as_any()
.downcast_ref::<RegionScanExec>()
.unwrap();
assert_eq!(
fallback_scan.stats_aware_skip_requirements(),
&[SupportStatAggr::CountNonNull {
column_name: "v0".to_string(),
}]
);
}
#[test]
fn rewrite_ignores_unsupported_avg_aggregate() {
let plan =
build_final_over_partial_plan_with(build_region_scan(true), build_avg_expr, None);
let optimized = AggrStatsPhysicalRule
.optimize(plan, &ConfigOptions::default())
.unwrap();
assert_final_over_partial_without_union(&optimized);
}
#[test]
fn rewrite_ignores_grouped_aggregate() {
let plan = build_final_over_partial_plan_with(
build_region_scan(true),
build_count_expr,
Some(group_by_v0()),
);
let optimized = AggrStatsPhysicalRule
.optimize(plan, &ConfigOptions::default())
.unwrap();
assert_final_over_partial_without_union(&optimized);
}
#[test]
fn rewrite_ignores_non_append_region_scan() {
let plan =
build_final_over_partial_plan_with(build_region_scan(false), build_count_expr, None);
let optimized = AggrStatsPhysicalRule
.optimize(plan, &ConfigOptions::default())
.unwrap();
assert_final_over_partial_without_union(&optimized);
}
#[test]
fn rewrite_maps_count_star_to_count_rows() {
let optimized = AggrStatsPhysicalRule
.optimize(
build_final_over_partial_plan_with(
build_region_scan(true),
build_count_star_expr,
None,
),
&ConfigOptions::default(),
)
.unwrap();
assert_rewritten_stats_requirement(&optimized, &[SupportStatAggr::CountRows]);
}
#[test]
fn rewrite_maps_count_time_index_to_count_rows() {
let optimized = AggrStatsPhysicalRule
.optimize(
build_final_over_partial_plan_with(
build_region_scan(true),
build_count_time_index_expr,
None,
),
&ConfigOptions::default(),
)
.unwrap();
assert_rewritten_stats_requirement(&optimized, &[SupportStatAggr::CountRows]);
}
#[test]
fn rewrite_ignores_count_null_literal() {
let plan = build_final_over_partial_plan_with(
build_region_scan(true),
build_count_null_expr,
None,
);
let optimized = AggrStatsPhysicalRule
.optimize(plan, &ConfigOptions::default())
.unwrap();
assert_final_over_partial_without_union(&optimized);
}
fn assert_rewritten_stats_requirement(
plan: &Arc<dyn ExecutionPlan>,
expected: &[SupportStatAggr],
) {
let final_exec = plan.as_any().downcast_ref::<AggregateExec>().unwrap();
let coalesce = final_exec
.input()
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.unwrap();
let union = coalesce
.input()
.as_any()
.downcast_ref::<UnionExec>()
.unwrap();
let union_children = union.children();
let stats_scan = union_children[0]
.as_any()
.downcast_ref::<StatsScanExec>()
.unwrap();
assert_eq!(stats_scan.requirements(), expected);
let fallback_partial = union_children[1]
.as_any()
.downcast_ref::<AggregateExec>()
.unwrap();
let fallback_scan = fallback_partial
.input()
.as_any()
.downcast_ref::<RegionScanExec>()
.unwrap();
assert_eq!(fallback_scan.stats_aware_skip_requirements(), expected);
}
fn assert_final_over_partial_without_union(plan: &Arc<dyn ExecutionPlan>) {
let final_exec = plan.as_any().downcast_ref::<AggregateExec>().unwrap();
assert!(matches!(final_exec.mode(), AggregateMode::Final));
let coalesce = final_exec
.input()
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.unwrap();
assert!(
coalesce
.input()
.as_any()
.downcast_ref::<UnionExec>()
.is_none()
);
let partial_exec = coalesce
.input()
.as_any()
.downcast_ref::<AggregateExec>()
.unwrap();
assert!(matches!(partial_exec.mode(), AggregateMode::Partial));
let region_scan = partial_exec
.input()
.as_any()
.downcast_ref::<RegionScanExec>()
.unwrap();
assert!(region_scan.stats_aware_skip_requirements().is_empty());
}
}

View File

@@ -320,7 +320,6 @@ mod tests {
use api::v1::SemanticType;
use bytes::Bytes;
use common_query::aggr_stats::SupportStatAggr;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
@@ -335,6 +334,7 @@ mod tests {
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::region_engine::{
FileStatsItem, RowGroupStatsItem, ScannerProperties, SendableFileStatsStream,
SupportStatAggr,
};
use store_api::storage::RegionId;
@@ -802,4 +802,95 @@ mod tests {
.unwrap();
assert_eq!(max_values.value(0), 9);
}
#[tokio::test]
async fn stats_scan_exec_emits_no_batches_when_all_files_fallback() {
let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field(
"count_state",
"count[count]",
DataType::Int64,
false,
)]));
let region_metadata = build_region_metadata(Some("host = 'a'"));
let scanner = StaticStatsScanner {
schema: region_metadata.schema.clone(),
metadata: region_metadata,
properties: ScannerProperties::default(),
files: vec![
FileStatsItem {
num_rows: Some(5),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: vec![],
},
FileStatsItem {
num_rows: Some(5),
file_partition_expr: Some("host = 'b'".to_string()),
row_groups: build_row_groups(&[vec![Some(1), Some(2), Some(3)]]),
},
],
};
let exec = StatsScanExec::new(
schema,
vec![SupportStatAggr::CountNonNull {
column_name: "value".to_string(),
}],
Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)),
);
let stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
let batches = stream.map(|batch| batch.unwrap()).collect::<Vec<_>>().await;
assert!(batches.is_empty());
}
#[tokio::test]
async fn stats_scan_exec_count_rows_uses_file_num_rows_without_row_groups() {
let schema = Arc::new(arrow_schema::Schema::new(vec![single_state_field(
"count_state",
"count[count]",
DataType::Int64,
false,
)]));
let region_metadata = build_region_metadata(Some("host = 'a'"));
let scanner = StaticStatsScanner {
schema: region_metadata.schema.clone(),
metadata: region_metadata,
properties: ScannerProperties::default(),
files: vec![
FileStatsItem {
num_rows: Some(7),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: vec![],
},
FileStatsItem {
num_rows: Some(3),
file_partition_expr: Some("host = 'a'".to_string()),
row_groups: vec![],
},
],
};
let exec = StatsScanExec::new(
schema,
vec![SupportStatAggr::CountRows],
Arc::new(Mutex::new(Box::new(scanner) as RegionScannerRef)),
);
let batch = collect_single_batch(&exec).await;
assert_eq!(batch.num_rows(), 2);
let count_state = batch
.column(0)
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let count_values = count_state
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(count_values.value(0), 7);
assert_eq!(count_values.value(1), 3);
}
}

View File

@@ -12,4 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use common_query::aggr_stats::SupportStatAggr;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::expressions::{Column as PhysicalColumn, Literal};
pub use store_api::region_engine::SupportStatAggr;
pub fn support_stat_aggr_from_aggr_expr(
aggr: &AggregateFunctionExpr,
time_index_column: &str,
) -> Option<SupportStatAggr> {
match (aggr.fun().name(), aggr.expressions().as_slice()) {
("count", []) => Some(SupportStatAggr::CountRows),
("count", [arg])
if arg
.as_any()
.downcast_ref::<Literal>()
.is_some_and(|lit| lit.value() == &COUNT_STAR_EXPANSION) =>
{
Some(SupportStatAggr::CountRows)
}
("count", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
if col.name() == time_index_column {
Some(SupportStatAggr::CountRows)
} else {
Some(SupportStatAggr::CountNonNull {
column_name: col.name().to_string(),
})
}
}
("min", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
Some(SupportStatAggr::MinValue {
column_name: col.name().to_string(),
})
}
("max", [arg]) if let Some(col) = arg.as_any().downcast_ref::<PhysicalColumn>() => {
Some(SupportStatAggr::MaxValue {
column_name: col.name().to_string(),
})
}
_ => None,
}
}

View File

@@ -317,8 +317,17 @@ pub struct ScannerProperties {
/// Whether the scanner is scanning a logical region.
logical_region: bool,
/// Whether stats-aware skip mode is enabled for aggregate-stats runtime execution.
stats_aware_skip_mode: bool,
/// Optimizer-approved aggregate-stats requirements used by stats-aware skip.
stats_aware_skip_requirements: Vec<SupportStatAggr>,
}
/// Aggregate-stats requirement forwarded to scanner prepare / scan paths.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SupportStatAggr {
CountRows,
CountNonNull { column_name: String },
MinValue { column_name: String },
MaxValue { column_name: String },
}
impl ScannerProperties {
@@ -343,7 +352,7 @@ impl ScannerProperties {
distinguish_partition_range: false,
target_partitions: 0,
logical_region: false,
stats_aware_skip_mode: false,
stats_aware_skip_requirements: Vec::new(),
}
}
@@ -358,8 +367,8 @@ impl ScannerProperties {
if let Some(target_partitions) = request.target_partitions {
self.target_partitions = target_partitions;
}
if let Some(stats_aware_skip_mode) = request.stats_aware_skip_mode {
self.stats_aware_skip_mode = stats_aware_skip_mode;
if let Some(stats_aware_skip_requirements) = request.stats_aware_skip_requirements {
self.stats_aware_skip_requirements = stats_aware_skip_requirements;
}
}
@@ -376,9 +385,9 @@ impl ScannerProperties {
self.total_rows
}
/// Returns whether stats-aware skip mode is enabled.
pub fn stats_aware_skip_mode(&self) -> bool {
self.stats_aware_skip_mode
/// Returns aggregate-stats requirements attached to stats-aware skip.
pub fn stats_aware_skip_requirements(&self) -> &[SupportStatAggr] {
&self.stats_aware_skip_requirements
}
/// Returns whether the scanner is scanning a logical region.
@@ -410,8 +419,8 @@ pub struct PrepareRequest {
pub distinguish_partition_range: Option<bool>,
/// The expected number of target partitions.
pub target_partitions: Option<usize>,
/// Whether to enable stats-aware skip mode on the scanner.
pub stats_aware_skip_mode: Option<bool>,
/// Optimizer-approved aggregate-stats requirements for stats-aware skip.
pub stats_aware_skip_requirements: Option<Vec<SupportStatAggr>>,
}
impl PrepareRequest {
@@ -433,9 +442,12 @@ impl PrepareRequest {
self
}
/// Sets the stats-aware skip mode flag.
pub fn with_stats_aware_skip_mode(mut self, stats_aware_skip_mode: bool) -> Self {
self.stats_aware_skip_mode = Some(stats_aware_skip_mode);
/// Sets optimizer-approved aggregate-stats requirements for stats-aware skip.
pub fn with_stats_aware_skip_requirements(
mut self,
stats_aware_skip_requirements: Vec<SupportStatAggr>,
) -> Self {
self.stats_aware_skip_requirements = Some(stats_aware_skip_requirements);
self
}
}

View File

@@ -46,7 +46,7 @@ use datatypes::compute::SortOptions;
use futures::{Stream, StreamExt};
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::region_engine::{
PartitionRange, PrepareRequest, QueryScanContext, RegionScannerRef,
PartitionRange, PrepareRequest, QueryScanContext, RegionScannerRef, SupportStatAggr,
};
use store_api::storage::{ScanRequest, TimeSeriesDistribution};
@@ -66,7 +66,7 @@ pub struct RegionScanExec {
is_partition_set: bool,
// TODO(ruihang): handle TimeWindowed dist via this parameter
distribution: Option<TimeSeriesDistribution>,
stats_aware_skip_mode: bool,
stats_aware_skip_requirements: Vec<SupportStatAggr>,
explain_verbose: bool,
query_memory_tracker: Option<QueryMemoryTracker>,
}
@@ -83,7 +83,10 @@ impl std::fmt::Debug for RegionScanExec {
.field("total_rows", &self.total_rows)
.field("is_partition_set", &self.is_partition_set)
.field("distribution", &self.distribution)
.field("stats_aware_skip_mode", &self.stats_aware_skip_mode)
.field(
"stats_aware_skip_requirements",
&self.stats_aware_skip_requirements,
)
.field("explain_verbose", &self.explain_verbose)
.finish()
}
@@ -227,7 +230,7 @@ impl RegionScanExec {
total_rows,
is_partition_set: false,
distribution: request.distribution,
stats_aware_skip_mode: false,
stats_aware_skip_requirements: Vec::new(),
explain_verbose: false,
query_memory_tracker,
})
@@ -305,20 +308,21 @@ impl RegionScanExec {
total_rows: self.total_rows,
is_partition_set: true,
distribution: self.distribution,
stats_aware_skip_mode: self.stats_aware_skip_mode,
stats_aware_skip_requirements: self.stats_aware_skip_requirements.clone(),
explain_verbose: self.explain_verbose,
query_memory_tracker: self.query_memory_tracker.clone(),
})
}
pub fn with_stats_aware_skip_mode(
pub fn with_stats_aware_skip_requirements(
&self,
stats_aware_skip_mode: bool,
stats_aware_skip_requirements: Vec<SupportStatAggr>,
) -> Result<Self, BoxedError> {
{
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(
PrepareRequest::default().with_stats_aware_skip_mode(stats_aware_skip_mode),
PrepareRequest::default()
.with_stats_aware_skip_requirements(stats_aware_skip_requirements.clone()),
)?;
}
@@ -332,14 +336,14 @@ impl RegionScanExec {
total_rows: self.total_rows,
is_partition_set: self.is_partition_set,
distribution: self.distribution,
stats_aware_skip_mode,
stats_aware_skip_requirements,
explain_verbose: self.explain_verbose,
query_memory_tracker: self.query_memory_tracker.clone(),
})
}
pub fn stats_aware_skip_mode(&self) -> bool {
self.stats_aware_skip_mode
pub fn stats_aware_skip_requirements(&self) -> &[SupportStatAggr] {
&self.stats_aware_skip_requirements
}
pub fn append_mode(&self) -> bool {
@@ -667,4 +671,79 @@ mod test {
let result = plan.execute(0, ctx.task_ctx());
assert!(result.is_ok());
}
#[tokio::test]
async fn test_region_scan_exec_records_stats_aware_skip_requirements() {
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
]));
let batch = RecordBatch::new(
schema.clone(),
vec![
Arc::new(Int32Vector::from_slice([1, 2])) as _,
Arc::new(TimestampMillisecondVector::from_slice([1000, 2000])) as _,
],
)
.unwrap();
let stream = RecordBatches::try_new(schema, vec![batch])
.unwrap()
.as_stream();
let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![1]);
let region_metadata = Arc::new(builder.build().unwrap());
let scanner = Box::new(SinglePartitionScanner::new(
stream,
false,
region_metadata,
None,
));
let plan = RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap();
let requirements = vec![SupportStatAggr::CountNonNull {
column_name: "a".to_string(),
}];
let plan = plan
.with_stats_aware_skip_requirements(requirements.clone())
.unwrap();
assert_eq!(
plan.stats_aware_skip_requirements(),
requirements.as_slice()
);
{
let scanner = plan.scanner();
let scanner = scanner.lock().unwrap();
assert_eq!(
scanner.properties().stats_aware_skip_requirements(),
requirements.as_slice()
);
}
let result = plan.execute(0, ctx.task_ctx());
assert!(result.is_ok());
}
}