diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 2b066a0bde..add34fb648 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -339,6 +339,7 @@ impl MetadataRegion { series_row_selector: None, sequence: None, distribution: None, + tag_only_distinct: false, } } @@ -529,6 +530,7 @@ impl MetadataRegion { series_row_selector: None, sequence: None, distribution: None, + tag_only_distinct: false, }; let record_batch_stream = self .mito diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index c7880d298a..97de619a73 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -81,6 +81,7 @@ async fn test_scan_projection() { series_row_selector: None, sequence: None, distribution: None, + tag_only_distinct: false, }; let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 4ccaca20f8..81e0e353e1 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -358,15 +358,6 @@ impl ScanRegion { None => ProjectionMapper::all(&self.version.metadata)?, }; - // This is incorrect, but we temporarily sets the tag only hint for test. - let tag_only_distinct = match self.request.projection { - // TODO(yingwen): index bound check - Some(p) => p.iter().all(|idx| { - self.version.metadata.column_metadatas[*idx].semantic_type == SemanticType::Tag - }), - None => false, - }; - // Get memtable ranges to scan. let memtables = memtables .into_iter() @@ -396,7 +387,7 @@ impl ScanRegion { .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution) - .with_tag_only_distinct(tag_only_distinct); + .with_tag_only_distinct(self.request.tag_only_distinct); Ok(input) } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 194e2d3b9d..3b60918239 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -96,7 +96,13 @@ impl Categorizer { LogicalPlan::Extension(extension) => { Self::check_extension_plan(extension.node.as_ref() as _) } - LogicalPlan::Distinct(_) => Commutativity::Unimplemented, + LogicalPlan::Distinct(_) => { + if partition_cols.is_empty() { + return Commutativity::Commutative; + } + + Commutativity::Unimplemented + } LogicalPlan::Unnest(_) => Commutativity::Commutative, LogicalPlan::Statement(_) => Commutativity::Unsupported, LogicalPlan::Values(_) => Commutativity::Unsupported, diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 0e3b3616a9..0fb8173738 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -247,6 +247,10 @@ impl DummyTableProvider { self.scan_request.lock().unwrap().sequence = Some(sequence); } + pub fn with_tag_only_distinct(&self, tag_only_distinct: bool) { + self.scan_request.lock().unwrap().tag_only_distinct = tag_only_distinct; + } + /// Gets the scan request of the provider. #[cfg(test)] pub fn scan_request(&self) -> ScanRequest { diff --git a/src/query/src/optimizer/scan_hint.rs b/src/query/src/optimizer/scan_hint.rs index e8cc95c7f1..7663461f0b 100644 --- a/src/query/src/optimizer/scan_hint.rs +++ b/src/query/src/optimizer/scan_hint.rs @@ -21,7 +21,7 @@ use datafusion::datasource::DefaultTableSource; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{Column, Result}; use datafusion_expr::expr::Sort; -use datafusion_expr::{utils, Expr, LogicalPlan}; +use datafusion_expr::{utils, Aggregate, Expr, LogicalPlan}; use datafusion_optimizer::{OptimizerConfig, OptimizerRule}; use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector}; @@ -92,6 +92,15 @@ impl ScanHintRule { ); } + // set distinct columns hint + if !visitor.distinct_columns.is_empty() { + Self::set_distinct_columns_hint( + adapter, + &visitor.distinct_columns, + &visitor.distinct_filter_columns, + ); + } + transformed = true; } } @@ -185,6 +194,43 @@ impl ScanHintRule { adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow); } } + + fn set_distinct_columns_hint( + adapter: &DummyTableProvider, + distinct_columns: &HashSet, + distinct_filter_columns: &HashSet, + ) { + let region_metadata = adapter.region_metadata(); + let mut should_set_distinct_hint = true; + // check if all group_by columns are primary key + for col in distinct_columns { + let Some(column_metadata) = region_metadata.column_by_name(&col.name) else { + should_set_distinct_hint = false; + break; + }; + if column_metadata.semantic_type != SemanticType::Tag { + should_set_distinct_hint = false; + break; + } + } + // check if all filter columns are primary key columns or time index. + for col in distinct_filter_columns { + let Some(column_metadata) = region_metadata.column_by_name(&col.name) else { + should_set_distinct_hint = false; + break; + }; + if column_metadata.semantic_type != SemanticType::Tag + && column_metadata.semantic_type != SemanticType::Timestamp + { + should_set_distinct_hint = false; + break; + } + } + + if should_set_distinct_hint { + adapter.with_tag_only_distinct(true); + } + } } /// Traverse and fetch hints. @@ -196,6 +242,10 @@ struct ScanHintVisitor { /// This field stores saved `group_by` columns when all aggregate functions are `last_value` /// and the `order_by` column which should be time index. ts_row_selector: Option<(HashSet, Column)>, + /// Distinct columns for select distinct operation. + distinct_columns: HashSet, + /// Distinct filter column. + distinct_filter_columns: HashSet, } impl TreeNodeVisitor<'_> for ScanHintVisitor { @@ -263,23 +313,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor { self.ts_row_selector = Some((group_by_cols, order_by_col)); } } + + // Check distinct. + if !is_all_last_value { + self.collect_distinct_columns(aggregate); + } } - if self.ts_row_selector.is_some() - && (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1) - { + if matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1 { // clean previous time series selector hint when encounter subqueries or join self.ts_row_selector = None; + self.distinct_columns.clear(); } - if let LogicalPlan::Filter(filter) = node - && let Some(group_by_exprs) = &self.ts_row_selector - { - let mut filter_referenced_cols = HashSet::default(); - utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?; - // ensure only group_by columns are used in filter - if !filter_referenced_cols.is_subset(&group_by_exprs.0) { - self.ts_row_selector = None; + if let LogicalPlan::Filter(filter) = node { + if let Some(group_by_exprs) = &self.ts_row_selector { + let mut filter_referenced_cols = HashSet::default(); + utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?; + // ensure only group_by columns are used in filter + if !filter_referenced_cols.is_subset(&group_by_exprs.0) { + self.ts_row_selector = None; + } + } + + if !self.distinct_columns.is_empty() { + utils::expr_to_columns(&filter.predicate, &mut self.distinct_filter_columns)?; } } @@ -289,7 +347,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor { impl ScanHintVisitor { fn need_rewrite(&self) -> bool { - self.order_expr.is_some() || self.ts_row_selector.is_some() + self.order_expr.is_some() + || self.ts_row_selector.is_some() + || !self.distinct_columns.is_empty() + } + + /// Returns select distinct columns. + fn collect_distinct_columns(&mut self, aggregate: &Aggregate) { + if !aggregate.aggr_expr.is_empty() { + return; + } + + let mut is_all_distinct = true; + // make sure all the exprs are DIRECT `col` and collect them + let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len()); + for expr in &aggregate.group_expr { + if let Expr::Column(col) = expr { + group_by_cols.insert(col.clone()); + } else { + is_all_distinct = false; + break; + } + } + if is_all_distinct { + self.distinct_columns = group_by_cols; + } } } diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index c9a440eaea..6893740cc3 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -58,4 +58,6 @@ pub struct ScanRequest { pub sequence: Option, /// Optional hint for the distribution of time-series data. pub distribution: Option, + /// Optional hint for the tag-only distinct operation. + pub tag_only_distinct: bool, }