mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-15 01:32:56 +00:00
feat: push down tag distinct
This commit is contained in:
@@ -339,6 +339,7 @@ impl MetadataRegion {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -529,6 +530,7 @@ impl MetadataRegion {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
};
|
||||
let record_batch_stream = self
|
||||
.mito
|
||||
|
||||
@@ -81,6 +81,7 @@ async fn test_scan_projection() {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
};
|
||||
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
|
||||
@@ -358,15 +358,6 @@ impl ScanRegion {
|
||||
None => ProjectionMapper::all(&self.version.metadata)?,
|
||||
};
|
||||
|
||||
// This is incorrect, but we temporarily sets the tag only hint for test.
|
||||
let tag_only_distinct = match self.request.projection {
|
||||
// TODO(yingwen): index bound check
|
||||
Some(p) => p.iter().all(|idx| {
|
||||
self.version.metadata.column_metadatas[*idx].semantic_type == SemanticType::Tag
|
||||
}),
|
||||
None => false,
|
||||
};
|
||||
|
||||
// Get memtable ranges to scan.
|
||||
let memtables = memtables
|
||||
.into_iter()
|
||||
@@ -396,7 +387,7 @@ impl ScanRegion {
|
||||
.with_merge_mode(self.version.options.merge_mode())
|
||||
.with_series_row_selector(self.request.series_row_selector)
|
||||
.with_distribution(self.request.distribution)
|
||||
.with_tag_only_distinct(tag_only_distinct);
|
||||
.with_tag_only_distinct(self.request.tag_only_distinct);
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
|
||||
@@ -96,7 +96,13 @@ impl Categorizer {
|
||||
LogicalPlan::Extension(extension) => {
|
||||
Self::check_extension_plan(extension.node.as_ref() as _)
|
||||
}
|
||||
LogicalPlan::Distinct(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::Distinct(_) => {
|
||||
if partition_cols.is_empty() {
|
||||
return Commutativity::Commutative;
|
||||
}
|
||||
|
||||
Commutativity::Unimplemented
|
||||
}
|
||||
LogicalPlan::Unnest(_) => Commutativity::Commutative,
|
||||
LogicalPlan::Statement(_) => Commutativity::Unsupported,
|
||||
LogicalPlan::Values(_) => Commutativity::Unsupported,
|
||||
|
||||
@@ -247,6 +247,10 @@ impl DummyTableProvider {
|
||||
self.scan_request.lock().unwrap().sequence = Some(sequence);
|
||||
}
|
||||
|
||||
pub fn with_tag_only_distinct(&self, tag_only_distinct: bool) {
|
||||
self.scan_request.lock().unwrap().tag_only_distinct = tag_only_distinct;
|
||||
}
|
||||
|
||||
/// Gets the scan request of the provider.
|
||||
#[cfg(test)]
|
||||
pub fn scan_request(&self) -> ScanRequest {
|
||||
|
||||
@@ -21,7 +21,7 @@ use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
|
||||
use datafusion_common::{Column, Result};
|
||||
use datafusion_expr::expr::Sort;
|
||||
use datafusion_expr::{utils, Expr, LogicalPlan};
|
||||
use datafusion_expr::{utils, Aggregate, Expr, LogicalPlan};
|
||||
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
|
||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
|
||||
@@ -92,6 +92,15 @@ impl ScanHintRule {
|
||||
);
|
||||
}
|
||||
|
||||
// set distinct columns hint
|
||||
if !visitor.distinct_columns.is_empty() {
|
||||
Self::set_distinct_columns_hint(
|
||||
adapter,
|
||||
&visitor.distinct_columns,
|
||||
&visitor.distinct_filter_columns,
|
||||
);
|
||||
}
|
||||
|
||||
transformed = true;
|
||||
}
|
||||
}
|
||||
@@ -185,6 +194,43 @@ impl ScanHintRule {
|
||||
adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow);
|
||||
}
|
||||
}
|
||||
|
||||
fn set_distinct_columns_hint(
|
||||
adapter: &DummyTableProvider,
|
||||
distinct_columns: &HashSet<Column>,
|
||||
distinct_filter_columns: &HashSet<Column>,
|
||||
) {
|
||||
let region_metadata = adapter.region_metadata();
|
||||
let mut should_set_distinct_hint = true;
|
||||
// check if all group_by columns are primary key
|
||||
for col in distinct_columns {
|
||||
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
};
|
||||
if column_metadata.semantic_type != SemanticType::Tag {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// check if all filter columns are primary key columns or time index.
|
||||
for col in distinct_filter_columns {
|
||||
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
};
|
||||
if column_metadata.semantic_type != SemanticType::Tag
|
||||
&& column_metadata.semantic_type != SemanticType::Timestamp
|
||||
{
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if should_set_distinct_hint {
|
||||
adapter.with_tag_only_distinct(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Traverse and fetch hints.
|
||||
@@ -196,6 +242,10 @@ struct ScanHintVisitor {
|
||||
/// This field stores saved `group_by` columns when all aggregate functions are `last_value`
|
||||
/// and the `order_by` column which should be time index.
|
||||
ts_row_selector: Option<(HashSet<Column>, Column)>,
|
||||
/// Distinct columns for select distinct operation.
|
||||
distinct_columns: HashSet<Column>,
|
||||
/// Distinct filter column.
|
||||
distinct_filter_columns: HashSet<Column>,
|
||||
}
|
||||
|
||||
impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
@@ -263,23 +313,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
self.ts_row_selector = Some((group_by_cols, order_by_col));
|
||||
}
|
||||
}
|
||||
|
||||
// Check distinct.
|
||||
if !is_all_last_value {
|
||||
self.collect_distinct_columns(aggregate);
|
||||
}
|
||||
}
|
||||
|
||||
if self.ts_row_selector.is_some()
|
||||
&& (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1)
|
||||
{
|
||||
if matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1 {
|
||||
// clean previous time series selector hint when encounter subqueries or join
|
||||
self.ts_row_selector = None;
|
||||
self.distinct_columns.clear();
|
||||
}
|
||||
|
||||
if let LogicalPlan::Filter(filter) = node
|
||||
&& let Some(group_by_exprs) = &self.ts_row_selector
|
||||
{
|
||||
let mut filter_referenced_cols = HashSet::default();
|
||||
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
|
||||
// ensure only group_by columns are used in filter
|
||||
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
|
||||
self.ts_row_selector = None;
|
||||
if let LogicalPlan::Filter(filter) = node {
|
||||
if let Some(group_by_exprs) = &self.ts_row_selector {
|
||||
let mut filter_referenced_cols = HashSet::default();
|
||||
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
|
||||
// ensure only group_by columns are used in filter
|
||||
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
|
||||
self.ts_row_selector = None;
|
||||
}
|
||||
}
|
||||
|
||||
if !self.distinct_columns.is_empty() {
|
||||
utils::expr_to_columns(&filter.predicate, &mut self.distinct_filter_columns)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,7 +347,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
|
||||
impl ScanHintVisitor {
|
||||
fn need_rewrite(&self) -> bool {
|
||||
self.order_expr.is_some() || self.ts_row_selector.is_some()
|
||||
self.order_expr.is_some()
|
||||
|| self.ts_row_selector.is_some()
|
||||
|| !self.distinct_columns.is_empty()
|
||||
}
|
||||
|
||||
/// Returns select distinct columns.
|
||||
fn collect_distinct_columns(&mut self, aggregate: &Aggregate) {
|
||||
if !aggregate.aggr_expr.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut is_all_distinct = true;
|
||||
// make sure all the exprs are DIRECT `col` and collect them
|
||||
let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len());
|
||||
for expr in &aggregate.group_expr {
|
||||
if let Expr::Column(col) = expr {
|
||||
group_by_cols.insert(col.clone());
|
||||
} else {
|
||||
is_all_distinct = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if is_all_distinct {
|
||||
self.distinct_columns = group_by_cols;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,4 +58,6 @@ pub struct ScanRequest {
|
||||
pub sequence: Option<SequenceNumber>,
|
||||
/// Optional hint for the distribution of time-series data.
|
||||
pub distribution: Option<TimeSeriesDistribution>,
|
||||
/// Optional hint for the tag-only distinct operation.
|
||||
pub tag_only_distinct: bool,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user