diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index d4653ce5ec..563b590b7f 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -582,13 +582,13 @@ impl>> Iterator for LastNonNullIter { } /// A reader that only returns tags for select distinct. -pub(crate) struct TagsOnlyReader { +pub(crate) struct TagOnlyReader { source: BoxedBatchReader, /// Batch to return. to_return: Option, } -impl TagsOnlyReader { +impl TagOnlyReader { /// Creates a new tags only reader. pub(crate) fn new(source: BoxedBatchReader) -> Self { Self { @@ -599,7 +599,7 @@ impl TagsOnlyReader { } #[async_trait] -impl BatchReader for TagsOnlyReader { +impl BatchReader for TagOnlyReader { async fn next_batch(&mut self) -> Result> { while let Some(batch) = self.source.next_batch().await? { if let Some(to_return) = self.to_return.take() { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 69c12c709c..054a6b60f3 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -33,7 +33,7 @@ use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector}; use tokio::sync::Semaphore; use crate::error::{PartitionOutOfRangeSnafu, Result}; -use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; +use crate::read::dedup::{DedupReader, LastNonNull, LastRow, TagOnlyReader}; use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; use crate::read::range::RangeBuilderList; @@ -216,6 +216,7 @@ impl SeqScan { let compaction = self.compaction; let distinguish_range = self.properties.distinguish_partition_range; let part_metrics = self.new_partition_metrics(partition); + let tag_only = self.stream_ctx.input.tag_only_distinct; let stream = try_stream! { part_metrics.on_first_poll(); @@ -241,6 +242,9 @@ impl SeqScan { .await .map_err(BoxedError::new) .context(ExternalSnafu)?; + if tag_only { + reader = Box::new(TagOnlyReader::new(reader)); + } let cache = &stream_ctx.input.cache_strategy; let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now();