mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
5 Commits
docs/vecto
...
test/scan-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d78113c22 | ||
|
|
9ee50dae6d | ||
|
|
fa57df9dc2 | ||
|
|
f935921831 | ||
|
|
7f7d431cd8 |
@@ -339,6 +339,7 @@ impl MetadataRegion {
|
|||||||
series_row_selector: None,
|
series_row_selector: None,
|
||||||
sequence: None,
|
sequence: None,
|
||||||
distribution: None,
|
distribution: None,
|
||||||
|
tag_only_distinct: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -529,6 +530,7 @@ impl MetadataRegion {
|
|||||||
series_row_selector: None,
|
series_row_selector: None,
|
||||||
sequence: None,
|
sequence: None,
|
||||||
distribution: None,
|
distribution: None,
|
||||||
|
tag_only_distinct: false,
|
||||||
};
|
};
|
||||||
let record_batch_stream = self
|
let record_batch_stream = self
|
||||||
.mito
|
.mito
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ async fn test_scan_projection() {
|
|||||||
series_row_selector: None,
|
series_row_selector: None,
|
||||||
sequence: None,
|
sequence: None,
|
||||||
distribution: None,
|
distribution: None,
|
||||||
|
tag_only_distinct: false,
|
||||||
};
|
};
|
||||||
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
|
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
|
||||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ use datatypes::vectors::MutableVector;
|
|||||||
|
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
|
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
|
||||||
use crate::read::{Batch, BatchColumn, BatchReader};
|
use crate::read::{Batch, BatchColumn, BatchReader, BoxedBatchReader};
|
||||||
|
|
||||||
/// A reader that dedup sorted batches from a source based on the
|
/// A reader that dedup sorted batches from a source based on the
|
||||||
/// dedup strategy.
|
/// dedup strategy.
|
||||||
@@ -581,6 +581,52 @@ impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A reader that only returns tags for select distinct.
|
||||||
|
pub(crate) struct TagOnlyReader {
|
||||||
|
source: BoxedBatchReader,
|
||||||
|
/// Batch to return.
|
||||||
|
to_return: Option<Batch>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TagOnlyReader {
|
||||||
|
/// Creates a new tags only reader.
|
||||||
|
pub(crate) fn new(source: BoxedBatchReader) -> Self {
|
||||||
|
Self {
|
||||||
|
source,
|
||||||
|
to_return: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl BatchReader for TagOnlyReader {
|
||||||
|
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||||
|
while let Some(batch) = self.source.next_batch().await? {
|
||||||
|
if batch.is_empty() {
|
||||||
|
// Ensure that the batch is not empty before proceeding.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(to_return) = self.to_return.take() {
|
||||||
|
if to_return.primary_key() != batch.primary_key() {
|
||||||
|
self.to_return = Some(batch);
|
||||||
|
// A new key, store the batch and returns the previous one.
|
||||||
|
// Safety: The batch is not empty, so it has at least one row.
|
||||||
|
return Ok(Some(to_return.slice(0, 1)));
|
||||||
|
} else {
|
||||||
|
// The same key, override the batch.
|
||||||
|
self.to_return = Some(batch);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// No batch to return, store the current batch.
|
||||||
|
self.to_return = Some(batch);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(self.to_return.take())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|||||||
@@ -357,6 +357,7 @@ impl ScanRegion {
|
|||||||
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
|
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
|
||||||
None => ProjectionMapper::all(&self.version.metadata)?,
|
None => ProjectionMapper::all(&self.version.metadata)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Get memtable ranges to scan.
|
// Get memtable ranges to scan.
|
||||||
let memtables = memtables
|
let memtables = memtables
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@@ -385,7 +386,8 @@ impl ScanRegion {
|
|||||||
.with_filter_deleted(filter_deleted)
|
.with_filter_deleted(filter_deleted)
|
||||||
.with_merge_mode(self.version.options.merge_mode())
|
.with_merge_mode(self.version.options.merge_mode())
|
||||||
.with_series_row_selector(self.request.series_row_selector)
|
.with_series_row_selector(self.request.series_row_selector)
|
||||||
.with_distribution(self.request.distribution);
|
.with_distribution(self.request.distribution)
|
||||||
|
.with_tag_only_distinct(self.request.tag_only_distinct);
|
||||||
Ok(input)
|
Ok(input)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -567,6 +569,8 @@ pub(crate) struct ScanInput {
|
|||||||
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
|
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
|
||||||
/// Hint for the required distribution of the scanner.
|
/// Hint for the required distribution of the scanner.
|
||||||
pub(crate) distribution: Option<TimeSeriesDistribution>,
|
pub(crate) distribution: Option<TimeSeriesDistribution>,
|
||||||
|
/// Hint for tag-only distinct scan.
|
||||||
|
pub(crate) tag_only_distinct: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ScanInput {
|
impl ScanInput {
|
||||||
@@ -592,6 +596,7 @@ impl ScanInput {
|
|||||||
merge_mode: MergeMode::default(),
|
merge_mode: MergeMode::default(),
|
||||||
series_row_selector: None,
|
series_row_selector: None,
|
||||||
distribution: None,
|
distribution: None,
|
||||||
|
tag_only_distinct: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -724,6 +729,13 @@ impl ScanInput {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets the tag-only distinct scan hint.
|
||||||
|
#[must_use]
|
||||||
|
pub(crate) fn with_tag_only_distinct(mut self, tag_only_distinct: bool) -> Self {
|
||||||
|
self.tag_only_distinct = tag_only_distinct;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Scans sources in parallel.
|
/// Scans sources in parallel.
|
||||||
///
|
///
|
||||||
/// # Panics if the input doesn't allow parallel scan.
|
/// # Panics if the input doesn't allow parallel scan.
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
|||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||||
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
|
use crate::read::dedup::{DedupReader, LastNonNull, LastRow, TagOnlyReader};
|
||||||
use crate::read::last_row::LastRowReader;
|
use crate::read::last_row::LastRowReader;
|
||||||
use crate::read::merge::MergeReaderBuilder;
|
use crate::read::merge::MergeReaderBuilder;
|
||||||
use crate::read::range::RangeBuilderList;
|
use crate::read::range::RangeBuilderList;
|
||||||
@@ -216,6 +216,7 @@ impl SeqScan {
|
|||||||
let compaction = self.compaction;
|
let compaction = self.compaction;
|
||||||
let distinguish_range = self.properties.distinguish_partition_range;
|
let distinguish_range = self.properties.distinguish_partition_range;
|
||||||
let part_metrics = self.new_partition_metrics(partition);
|
let part_metrics = self.new_partition_metrics(partition);
|
||||||
|
let tag_only = self.stream_ctx.input.tag_only_distinct;
|
||||||
|
|
||||||
let stream = try_stream! {
|
let stream = try_stream! {
|
||||||
part_metrics.on_first_poll();
|
part_metrics.on_first_poll();
|
||||||
@@ -241,6 +242,9 @@ impl SeqScan {
|
|||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(ExternalSnafu)?;
|
.context(ExternalSnafu)?;
|
||||||
|
if tag_only {
|
||||||
|
reader = Box::new(TagOnlyReader::new(reader));
|
||||||
|
}
|
||||||
let cache = &stream_ctx.input.cache_strategy;
|
let cache = &stream_ctx.input.cache_strategy;
|
||||||
let mut metrics = ScannerMetrics::default();
|
let mut metrics = ScannerMetrics::default();
|
||||||
let mut fetch_start = Instant::now();
|
let mut fetch_start = Instant::now();
|
||||||
|
|||||||
@@ -96,7 +96,13 @@ impl Categorizer {
|
|||||||
LogicalPlan::Extension(extension) => {
|
LogicalPlan::Extension(extension) => {
|
||||||
Self::check_extension_plan(extension.node.as_ref() as _)
|
Self::check_extension_plan(extension.node.as_ref() as _)
|
||||||
}
|
}
|
||||||
LogicalPlan::Distinct(_) => Commutativity::Unimplemented,
|
LogicalPlan::Distinct(_) => {
|
||||||
|
if partition_cols.is_empty() {
|
||||||
|
return Commutativity::Commutative;
|
||||||
|
}
|
||||||
|
|
||||||
|
Commutativity::Unimplemented
|
||||||
|
}
|
||||||
LogicalPlan::Unnest(_) => Commutativity::Commutative,
|
LogicalPlan::Unnest(_) => Commutativity::Commutative,
|
||||||
LogicalPlan::Statement(_) => Commutativity::Unsupported,
|
LogicalPlan::Statement(_) => Commutativity::Unsupported,
|
||||||
LogicalPlan::Values(_) => Commutativity::Unsupported,
|
LogicalPlan::Values(_) => Commutativity::Unsupported,
|
||||||
|
|||||||
@@ -247,6 +247,10 @@ impl DummyTableProvider {
|
|||||||
self.scan_request.lock().unwrap().sequence = Some(sequence);
|
self.scan_request.lock().unwrap().sequence = Some(sequence);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn with_tag_only_distinct(&self, tag_only_distinct: bool) {
|
||||||
|
self.scan_request.lock().unwrap().tag_only_distinct = tag_only_distinct;
|
||||||
|
}
|
||||||
|
|
||||||
/// Gets the scan request of the provider.
|
/// Gets the scan request of the provider.
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn scan_request(&self) -> ScanRequest {
|
pub fn scan_request(&self) -> ScanRequest {
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ use datafusion::datasource::DefaultTableSource;
|
|||||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
|
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
|
||||||
use datafusion_common::{Column, Result};
|
use datafusion_common::{Column, Result};
|
||||||
use datafusion_expr::expr::Sort;
|
use datafusion_expr::expr::Sort;
|
||||||
use datafusion_expr::{utils, Expr, LogicalPlan};
|
use datafusion_expr::{utils, Aggregate, Expr, LogicalPlan};
|
||||||
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
|
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
|
||||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||||
|
|
||||||
@@ -92,6 +92,15 @@ impl ScanHintRule {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set distinct columns hint
|
||||||
|
if !visitor.distinct_columns.is_empty() {
|
||||||
|
Self::set_distinct_columns_hint(
|
||||||
|
adapter,
|
||||||
|
&visitor.distinct_columns,
|
||||||
|
&visitor.distinct_filter_columns,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
transformed = true;
|
transformed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -185,6 +194,43 @@ impl ScanHintRule {
|
|||||||
adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow);
|
adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn set_distinct_columns_hint(
|
||||||
|
adapter: &DummyTableProvider,
|
||||||
|
distinct_columns: &HashSet<Column>,
|
||||||
|
distinct_filter_columns: &HashSet<Column>,
|
||||||
|
) {
|
||||||
|
let region_metadata = adapter.region_metadata();
|
||||||
|
let mut should_set_distinct_hint = true;
|
||||||
|
// check if all group_by columns are primary key
|
||||||
|
for col in distinct_columns {
|
||||||
|
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
|
||||||
|
should_set_distinct_hint = false;
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
if column_metadata.semantic_type != SemanticType::Tag {
|
||||||
|
should_set_distinct_hint = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// check if all filter columns are primary key columns or time index.
|
||||||
|
for col in distinct_filter_columns {
|
||||||
|
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
|
||||||
|
should_set_distinct_hint = false;
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
if column_metadata.semantic_type != SemanticType::Tag
|
||||||
|
&& column_metadata.semantic_type != SemanticType::Timestamp
|
||||||
|
{
|
||||||
|
should_set_distinct_hint = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if should_set_distinct_hint {
|
||||||
|
adapter.with_tag_only_distinct(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Traverse and fetch hints.
|
/// Traverse and fetch hints.
|
||||||
@@ -196,6 +242,10 @@ struct ScanHintVisitor {
|
|||||||
/// This field stores saved `group_by` columns when all aggregate functions are `last_value`
|
/// This field stores saved `group_by` columns when all aggregate functions are `last_value`
|
||||||
/// and the `order_by` column which should be time index.
|
/// and the `order_by` column which should be time index.
|
||||||
ts_row_selector: Option<(HashSet<Column>, Column)>,
|
ts_row_selector: Option<(HashSet<Column>, Column)>,
|
||||||
|
/// Distinct columns for select distinct operation.
|
||||||
|
distinct_columns: HashSet<Column>,
|
||||||
|
/// Distinct filter column.
|
||||||
|
distinct_filter_columns: HashSet<Column>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||||
@@ -263,23 +313,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
|||||||
self.ts_row_selector = Some((group_by_cols, order_by_col));
|
self.ts_row_selector = Some((group_by_cols, order_by_col));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check distinct.
|
||||||
|
if !is_all_last_value {
|
||||||
|
self.collect_distinct_columns(aggregate);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.ts_row_selector.is_some()
|
if matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1 {
|
||||||
&& (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1)
|
|
||||||
{
|
|
||||||
// clean previous time series selector hint when encounter subqueries or join
|
// clean previous time series selector hint when encounter subqueries or join
|
||||||
self.ts_row_selector = None;
|
self.ts_row_selector = None;
|
||||||
|
self.distinct_columns.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
if let LogicalPlan::Filter(filter) = node
|
if let LogicalPlan::Filter(filter) = node {
|
||||||
&& let Some(group_by_exprs) = &self.ts_row_selector
|
if let Some(group_by_exprs) = &self.ts_row_selector {
|
||||||
{
|
let mut filter_referenced_cols = HashSet::default();
|
||||||
let mut filter_referenced_cols = HashSet::default();
|
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
|
||||||
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
|
// ensure only group_by columns are used in filter
|
||||||
// ensure only group_by columns are used in filter
|
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
|
||||||
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
|
self.ts_row_selector = None;
|
||||||
self.ts_row_selector = None;
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.distinct_columns.is_empty() {
|
||||||
|
utils::expr_to_columns(&filter.predicate, &mut self.distinct_filter_columns)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -289,7 +347,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
|||||||
|
|
||||||
impl ScanHintVisitor {
|
impl ScanHintVisitor {
|
||||||
fn need_rewrite(&self) -> bool {
|
fn need_rewrite(&self) -> bool {
|
||||||
self.order_expr.is_some() || self.ts_row_selector.is_some()
|
self.order_expr.is_some()
|
||||||
|
|| self.ts_row_selector.is_some()
|
||||||
|
|| !self.distinct_columns.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns select distinct columns.
|
||||||
|
fn collect_distinct_columns(&mut self, aggregate: &Aggregate) {
|
||||||
|
if !aggregate.aggr_expr.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut is_all_distinct = true;
|
||||||
|
// make sure all the exprs are DIRECT `col` and collect them
|
||||||
|
let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len());
|
||||||
|
for expr in &aggregate.group_expr {
|
||||||
|
if let Expr::Column(col) = expr {
|
||||||
|
group_by_cols.insert(col.clone());
|
||||||
|
} else {
|
||||||
|
is_all_distinct = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if is_all_distinct {
|
||||||
|
self.distinct_columns = group_by_cols;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -58,4 +58,6 @@ pub struct ScanRequest {
|
|||||||
pub sequence: Option<SequenceNumber>,
|
pub sequence: Option<SequenceNumber>,
|
||||||
/// Optional hint for the distribution of time-series data.
|
/// Optional hint for the distribution of time-series data.
|
||||||
pub distribution: Option<TimeSeriesDistribution>,
|
pub distribution: Option<TimeSeriesDistribution>,
|
||||||
|
/// Optional hint for the tag-only distinct operation.
|
||||||
|
pub tag_only_distinct: bool,
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user