mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
5 Commits
v0.16.0
...
test/scan-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d78113c22 | ||
|
|
9ee50dae6d | ||
|
|
fa57df9dc2 | ||
|
|
f935921831 | ||
|
|
7f7d431cd8 |
@@ -339,6 +339,7 @@ impl MetadataRegion {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -529,6 +530,7 @@ impl MetadataRegion {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
};
|
||||
let record_batch_stream = self
|
||||
.mito
|
||||
|
||||
@@ -81,6 +81,7 @@ async fn test_scan_projection() {
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
};
|
||||
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
|
||||
@@ -25,7 +25,7 @@ use datatypes::vectors::MutableVector;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
|
||||
use crate::read::{Batch, BatchColumn, BatchReader};
|
||||
use crate::read::{Batch, BatchColumn, BatchReader, BoxedBatchReader};
|
||||
|
||||
/// A reader that dedup sorted batches from a source based on the
|
||||
/// dedup strategy.
|
||||
@@ -581,6 +581,52 @@ impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
|
||||
}
|
||||
}
|
||||
|
||||
/// A reader that only returns tags for select distinct.
|
||||
pub(crate) struct TagOnlyReader {
|
||||
source: BoxedBatchReader,
|
||||
/// Batch to return.
|
||||
to_return: Option<Batch>,
|
||||
}
|
||||
|
||||
impl TagOnlyReader {
|
||||
/// Creates a new tags only reader.
|
||||
pub(crate) fn new(source: BoxedBatchReader) -> Self {
|
||||
Self {
|
||||
source,
|
||||
to_return: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BatchReader for TagOnlyReader {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
while let Some(batch) = self.source.next_batch().await? {
|
||||
if batch.is_empty() {
|
||||
// Ensure that the batch is not empty before proceeding.
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(to_return) = self.to_return.take() {
|
||||
if to_return.primary_key() != batch.primary_key() {
|
||||
self.to_return = Some(batch);
|
||||
// A new key, store the batch and returns the previous one.
|
||||
// Safety: The batch is not empty, so it has at least one row.
|
||||
return Ok(Some(to_return.slice(0, 1)));
|
||||
} else {
|
||||
// The same key, override the batch.
|
||||
self.to_return = Some(batch);
|
||||
}
|
||||
} else {
|
||||
// No batch to return, store the current batch.
|
||||
self.to_return = Some(batch);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self.to_return.take())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -357,6 +357,7 @@ impl ScanRegion {
|
||||
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
|
||||
None => ProjectionMapper::all(&self.version.metadata)?,
|
||||
};
|
||||
|
||||
// Get memtable ranges to scan.
|
||||
let memtables = memtables
|
||||
.into_iter()
|
||||
@@ -385,7 +386,8 @@ impl ScanRegion {
|
||||
.with_filter_deleted(filter_deleted)
|
||||
.with_merge_mode(self.version.options.merge_mode())
|
||||
.with_series_row_selector(self.request.series_row_selector)
|
||||
.with_distribution(self.request.distribution);
|
||||
.with_distribution(self.request.distribution)
|
||||
.with_tag_only_distinct(self.request.tag_only_distinct);
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
@@ -567,6 +569,8 @@ pub(crate) struct ScanInput {
|
||||
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
|
||||
/// Hint for the required distribution of the scanner.
|
||||
pub(crate) distribution: Option<TimeSeriesDistribution>,
|
||||
/// Hint for tag-only distinct scan.
|
||||
pub(crate) tag_only_distinct: bool,
|
||||
}
|
||||
|
||||
impl ScanInput {
|
||||
@@ -592,6 +596,7 @@ impl ScanInput {
|
||||
merge_mode: MergeMode::default(),
|
||||
series_row_selector: None,
|
||||
distribution: None,
|
||||
tag_only_distinct: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -724,6 +729,13 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the tag-only distinct scan hint.
|
||||
#[must_use]
|
||||
pub(crate) fn with_tag_only_distinct(mut self, tag_only_distinct: bool) -> Self {
|
||||
self.tag_only_distinct = tag_only_distinct;
|
||||
self
|
||||
}
|
||||
|
||||
/// Scans sources in parallel.
|
||||
///
|
||||
/// # Panics if the input doesn't allow parallel scan.
|
||||
|
||||
@@ -33,7 +33,7 @@ use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
|
||||
use crate::read::dedup::{DedupReader, LastNonNull, LastRow, TagOnlyReader};
|
||||
use crate::read::last_row::LastRowReader;
|
||||
use crate::read::merge::MergeReaderBuilder;
|
||||
use crate::read::range::RangeBuilderList;
|
||||
@@ -216,6 +216,7 @@ impl SeqScan {
|
||||
let compaction = self.compaction;
|
||||
let distinguish_range = self.properties.distinguish_partition_range;
|
||||
let part_metrics = self.new_partition_metrics(partition);
|
||||
let tag_only = self.stream_ctx.input.tag_only_distinct;
|
||||
|
||||
let stream = try_stream! {
|
||||
part_metrics.on_first_poll();
|
||||
@@ -241,6 +242,9 @@ impl SeqScan {
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
if tag_only {
|
||||
reader = Box::new(TagOnlyReader::new(reader));
|
||||
}
|
||||
let cache = &stream_ctx.input.cache_strategy;
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
let mut fetch_start = Instant::now();
|
||||
|
||||
@@ -96,7 +96,13 @@ impl Categorizer {
|
||||
LogicalPlan::Extension(extension) => {
|
||||
Self::check_extension_plan(extension.node.as_ref() as _)
|
||||
}
|
||||
LogicalPlan::Distinct(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::Distinct(_) => {
|
||||
if partition_cols.is_empty() {
|
||||
return Commutativity::Commutative;
|
||||
}
|
||||
|
||||
Commutativity::Unimplemented
|
||||
}
|
||||
LogicalPlan::Unnest(_) => Commutativity::Commutative,
|
||||
LogicalPlan::Statement(_) => Commutativity::Unsupported,
|
||||
LogicalPlan::Values(_) => Commutativity::Unsupported,
|
||||
|
||||
@@ -247,6 +247,10 @@ impl DummyTableProvider {
|
||||
self.scan_request.lock().unwrap().sequence = Some(sequence);
|
||||
}
|
||||
|
||||
pub fn with_tag_only_distinct(&self, tag_only_distinct: bool) {
|
||||
self.scan_request.lock().unwrap().tag_only_distinct = tag_only_distinct;
|
||||
}
|
||||
|
||||
/// Gets the scan request of the provider.
|
||||
#[cfg(test)]
|
||||
pub fn scan_request(&self) -> ScanRequest {
|
||||
|
||||
@@ -21,7 +21,7 @@ use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
|
||||
use datafusion_common::{Column, Result};
|
||||
use datafusion_expr::expr::Sort;
|
||||
use datafusion_expr::{utils, Expr, LogicalPlan};
|
||||
use datafusion_expr::{utils, Aggregate, Expr, LogicalPlan};
|
||||
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
|
||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
|
||||
@@ -92,6 +92,15 @@ impl ScanHintRule {
|
||||
);
|
||||
}
|
||||
|
||||
// set distinct columns hint
|
||||
if !visitor.distinct_columns.is_empty() {
|
||||
Self::set_distinct_columns_hint(
|
||||
adapter,
|
||||
&visitor.distinct_columns,
|
||||
&visitor.distinct_filter_columns,
|
||||
);
|
||||
}
|
||||
|
||||
transformed = true;
|
||||
}
|
||||
}
|
||||
@@ -185,6 +194,43 @@ impl ScanHintRule {
|
||||
adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow);
|
||||
}
|
||||
}
|
||||
|
||||
fn set_distinct_columns_hint(
|
||||
adapter: &DummyTableProvider,
|
||||
distinct_columns: &HashSet<Column>,
|
||||
distinct_filter_columns: &HashSet<Column>,
|
||||
) {
|
||||
let region_metadata = adapter.region_metadata();
|
||||
let mut should_set_distinct_hint = true;
|
||||
// check if all group_by columns are primary key
|
||||
for col in distinct_columns {
|
||||
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
};
|
||||
if column_metadata.semantic_type != SemanticType::Tag {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// check if all filter columns are primary key columns or time index.
|
||||
for col in distinct_filter_columns {
|
||||
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
};
|
||||
if column_metadata.semantic_type != SemanticType::Tag
|
||||
&& column_metadata.semantic_type != SemanticType::Timestamp
|
||||
{
|
||||
should_set_distinct_hint = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if should_set_distinct_hint {
|
||||
adapter.with_tag_only_distinct(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Traverse and fetch hints.
|
||||
@@ -196,6 +242,10 @@ struct ScanHintVisitor {
|
||||
/// This field stores saved `group_by` columns when all aggregate functions are `last_value`
|
||||
/// and the `order_by` column which should be time index.
|
||||
ts_row_selector: Option<(HashSet<Column>, Column)>,
|
||||
/// Distinct columns for select distinct operation.
|
||||
distinct_columns: HashSet<Column>,
|
||||
/// Distinct filter column.
|
||||
distinct_filter_columns: HashSet<Column>,
|
||||
}
|
||||
|
||||
impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
@@ -263,23 +313,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
self.ts_row_selector = Some((group_by_cols, order_by_col));
|
||||
}
|
||||
}
|
||||
|
||||
// Check distinct.
|
||||
if !is_all_last_value {
|
||||
self.collect_distinct_columns(aggregate);
|
||||
}
|
||||
}
|
||||
|
||||
if self.ts_row_selector.is_some()
|
||||
&& (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1)
|
||||
{
|
||||
if matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1 {
|
||||
// clean previous time series selector hint when encounter subqueries or join
|
||||
self.ts_row_selector = None;
|
||||
self.distinct_columns.clear();
|
||||
}
|
||||
|
||||
if let LogicalPlan::Filter(filter) = node
|
||||
&& let Some(group_by_exprs) = &self.ts_row_selector
|
||||
{
|
||||
let mut filter_referenced_cols = HashSet::default();
|
||||
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
|
||||
// ensure only group_by columns are used in filter
|
||||
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
|
||||
self.ts_row_selector = None;
|
||||
if let LogicalPlan::Filter(filter) = node {
|
||||
if let Some(group_by_exprs) = &self.ts_row_selector {
|
||||
let mut filter_referenced_cols = HashSet::default();
|
||||
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
|
||||
// ensure only group_by columns are used in filter
|
||||
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
|
||||
self.ts_row_selector = None;
|
||||
}
|
||||
}
|
||||
|
||||
if !self.distinct_columns.is_empty() {
|
||||
utils::expr_to_columns(&filter.predicate, &mut self.distinct_filter_columns)?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,7 +347,31 @@ impl TreeNodeVisitor<'_> for ScanHintVisitor {
|
||||
|
||||
impl ScanHintVisitor {
|
||||
fn need_rewrite(&self) -> bool {
|
||||
self.order_expr.is_some() || self.ts_row_selector.is_some()
|
||||
self.order_expr.is_some()
|
||||
|| self.ts_row_selector.is_some()
|
||||
|| !self.distinct_columns.is_empty()
|
||||
}
|
||||
|
||||
/// Returns select distinct columns.
|
||||
fn collect_distinct_columns(&mut self, aggregate: &Aggregate) {
|
||||
if !aggregate.aggr_expr.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut is_all_distinct = true;
|
||||
// make sure all the exprs are DIRECT `col` and collect them
|
||||
let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len());
|
||||
for expr in &aggregate.group_expr {
|
||||
if let Expr::Column(col) = expr {
|
||||
group_by_cols.insert(col.clone());
|
||||
} else {
|
||||
is_all_distinct = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if is_all_distinct {
|
||||
self.distinct_columns = group_by_cols;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,4 +58,6 @@ pub struct ScanRequest {
|
||||
pub sequence: Option<SequenceNumber>,
|
||||
/// Optional hint for the distribution of time-series data.
|
||||
pub distribution: Option<TimeSeriesDistribution>,
|
||||
/// Optional hint for the tag-only distinct operation.
|
||||
pub tag_only_distinct: bool,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user