Compare commits

...

5 Commits

Author SHA1 Message Date
evenyag
3d78113c22 feat: reduce rows returned 2025-03-14 00:35:19 +08:00
evenyag
9ee50dae6d feat: push down tag distinct 2025-03-14 00:35:00 +08:00
evenyag
fa57df9dc2 feat: use tag only reader 2025-03-13 23:07:37 +08:00
evenyag
f935921831 feat: tags only reader 2025-03-13 23:01:39 +08:00
evenyag
7f7d431cd8 feat: tag only distinct hint wip 2025-03-13 16:09:54 +08:00
9 changed files with 176 additions and 17 deletions

View File

@@ -339,6 +339,7 @@ impl MetadataRegion {
series_row_selector: None,
sequence: None,
distribution: None,
tag_only_distinct: false,
}
}
@@ -529,6 +530,7 @@ impl MetadataRegion {
series_row_selector: None,
sequence: None,
distribution: None,
tag_only_distinct: false,
};
let record_batch_stream = self
.mito

View File

@@ -81,6 +81,7 @@ async fn test_scan_projection() {
series_row_selector: None,
sequence: None,
distribution: None,
tag_only_distinct: false,
};
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -25,7 +25,7 @@ use datatypes::vectors::MutableVector;
use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::read::{Batch, BatchColumn, BatchReader, BoxedBatchReader};
/// A reader that dedup sorted batches from a source based on the
/// dedup strategy.
@@ -581,6 +581,52 @@ impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
}
}
/// A reader that only returns tags for select distinct.
pub(crate) struct TagOnlyReader {
source: BoxedBatchReader,
/// Batch to return.
to_return: Option<Batch>,
}
impl TagOnlyReader {
/// Creates a new tags only reader.
pub(crate) fn new(source: BoxedBatchReader) -> Self {
Self {
source,
to_return: None,
}
}
}
#[async_trait]
impl BatchReader for TagOnlyReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if batch.is_empty() {
// Ensure that the batch is not empty before proceeding.
continue;
}
if let Some(to_return) = self.to_return.take() {
if to_return.primary_key() != batch.primary_key() {
self.to_return = Some(batch);
// A new key, store the batch and returns the previous one.
// Safety: The batch is not empty, so it has at least one row.
return Ok(Some(to_return.slice(0, 1)));
} else {
// The same key, override the batch.
self.to_return = Some(batch);
}
} else {
// No batch to return, store the current batch.
self.to_return = Some(batch);
}
}
Ok(self.to_return.take())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -357,6 +357,7 @@ impl ScanRegion {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
};
// Get memtable ranges to scan.
let memtables = memtables
.into_iter()
@@ -385,7 +386,8 @@ impl ScanRegion {
.with_filter_deleted(filter_deleted)
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution);
.with_distribution(self.request.distribution)
.with_tag_only_distinct(self.request.tag_only_distinct);
Ok(input)
}
@@ -567,6 +569,8 @@ pub(crate) struct ScanInput {
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
/// Hint for the required distribution of the scanner.
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Hint for tag-only distinct scan.
pub(crate) tag_only_distinct: bool,
}
impl ScanInput {
@@ -592,6 +596,7 @@ impl ScanInput {
merge_mode: MergeMode::default(),
series_row_selector: None,
distribution: None,
tag_only_distinct: false,
}
}
@@ -724,6 +729,13 @@ impl ScanInput {
self
}
/// Sets the tag-only distinct scan hint.
#[must_use]
pub(crate) fn with_tag_only_distinct(mut self, tag_only_distinct: bool) -> Self {
self.tag_only_distinct = tag_only_distinct;
self
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.

View File

@@ -33,7 +33,7 @@ use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow, TagOnlyReader};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
@@ -216,6 +216,7 @@ impl SeqScan {
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = self.new_partition_metrics(partition);
let tag_only = self.stream_ctx.input.tag_only_distinct;
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -241,6 +242,9 @@ impl SeqScan {
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
if tag_only {
reader = Box::new(TagOnlyReader::new(reader));
}
let cache = &stream_ctx.input.cache_strategy;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();

View File

@@ -96,7 +96,13 @@ impl Categorizer {
LogicalPlan::Extension(extension) => {
Self::check_extension_plan(extension.node.as_ref() as _)
}
LogicalPlan::Distinct(_) => Commutativity::Unimplemented,
LogicalPlan::Distinct(_) => {
if partition_cols.is_empty() {
return Commutativity::Commutative;
}
Commutativity::Unimplemented
}
LogicalPlan::Unnest(_) => Commutativity::Commutative,
LogicalPlan::Statement(_) => Commutativity::Unsupported,
LogicalPlan::Values(_) => Commutativity::Unsupported,

View File

@@ -247,6 +247,10 @@ impl DummyTableProvider {
self.scan_request.lock().unwrap().sequence = Some(sequence);
}
pub fn with_tag_only_distinct(&self, tag_only_distinct: bool) {
self.scan_request.lock().unwrap().tag_only_distinct = tag_only_distinct;
}
/// Gets the scan request of the provider.
#[cfg(test)]
pub fn scan_request(&self) -> ScanRequest {

View File

@@ -21,7 +21,7 @@ use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{Column, Result};
use datafusion_expr::expr::Sort;
use datafusion_expr::{utils, Expr, LogicalPlan};
use datafusion_expr::{utils, Aggregate, Expr, LogicalPlan};
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
@@ -92,6 +92,15 @@ impl ScanHintRule {
);
}
// set distinct columns hint
if !visitor.distinct_columns.is_empty() {
Self::set_distinct_columns_hint(
adapter,
&visitor.distinct_columns,
&visitor.distinct_filter_columns,
);
}
transformed = true;
}
}
@@ -185,6 +194,43 @@ impl ScanHintRule {
adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow);
}
}
fn set_distinct_columns_hint(
adapter: &DummyTableProvider,
distinct_columns: &HashSet<Column>,
distinct_filter_columns: &HashSet<Column>,
) {
let region_metadata = adapter.region_metadata();
let mut should_set_distinct_hint = true;
// check if all group_by columns are primary key
for col in distinct_columns {
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
should_set_distinct_hint = false;
break;
};
if column_metadata.semantic_type != SemanticType::Tag {
should_set_distinct_hint = false;
break;
}
}
// check if all filter columns are primary key columns or time index.
for col in distinct_filter_columns {
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
should_set_distinct_hint = false;
break;
};
if column_metadata.semantic_type != SemanticType::Tag
&& column_metadata.semantic_type != SemanticType::Timestamp
{
should_set_distinct_hint = false;
break;
}
}
if should_set_distinct_hint {
adapter.with_tag_only_distinct(true);
}
}
}
/// Traverse and fetch hints.
@@ -196,6 +242,10 @@ struct ScanHintVisitor {
/// This field stores saved `group_by` columns when all aggregate functions are `last_value`
/// and the `order_by` column which should be time index.
ts_row_selector: Option<(HashSet<Column>, Column)>,
/// Distinct columns for select distinct operation.
distinct_columns: HashSet<Column>,
/// Distinct filter column.
distinct_filter_columns: HashSet<Column>,
}
impl TreeNodeVisitor<'_> for ScanHintVisitor {
@@ -263,23 +313,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
self.ts_row_selector = Some((group_by_cols, order_by_col));
}
}
// Check distinct.
if !is_all_last_value {
self.collect_distinct_columns(aggregate);
}
}
if self.ts_row_selector.is_some()
&& (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1)
{
if matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1 {
// clean previous time series selector hint when encounter subqueries or join
self.ts_row_selector = None;
self.distinct_columns.clear();
}
if let LogicalPlan::Filter(filter) = node
&& let Some(group_by_exprs) = &self.ts_row_selector
{
let mut filter_referenced_cols = HashSet::default();
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
// ensure only group_by columns are used in filter
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
self.ts_row_selector = None;
if let LogicalPlan::Filter(filter) = node {
if let Some(group_by_exprs) = &self.ts_row_selector {
let mut filter_referenced_cols = HashSet::default();
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
// ensure only group_by columns are used in filter
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
self.ts_row_selector = None;
}
}
if !self.distinct_columns.is_empty() {
utils::expr_to_columns(&filter.predicate, &mut self.distinct_filter_columns)?;
}
}
@@ -289,7 +347,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
impl ScanHintVisitor {
fn need_rewrite(&self) -> bool {
self.order_expr.is_some() || self.ts_row_selector.is_some()
self.order_expr.is_some()
|| self.ts_row_selector.is_some()
|| !self.distinct_columns.is_empty()
}
/// Returns select distinct columns.
fn collect_distinct_columns(&mut self, aggregate: &Aggregate) {
if !aggregate.aggr_expr.is_empty() {
return;
}
let mut is_all_distinct = true;
// make sure all the exprs are DIRECT `col` and collect them
let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len());
for expr in &aggregate.group_expr {
if let Expr::Column(col) = expr {
group_by_cols.insert(col.clone());
} else {
is_all_distinct = false;
break;
}
}
if is_all_distinct {
self.distinct_columns = group_by_cols;
}
}
}

View File

@@ -58,4 +58,6 @@ pub struct ScanRequest {
pub sequence: Option<SequenceNumber>,
/// Optional hint for the distribution of time-series data.
pub distribution: Option<TimeSeriesDistribution>,
/// Optional hint for the tag-only distinct operation.
pub tag_only_distinct: bool,
}