diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index 678c03fbf6..d4653ce5ec 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -25,7 +25,7 @@ use datatypes::vectors::MutableVector; use crate::error::Result; use crate::metrics::MERGE_FILTER_ROWS_TOTAL; -use crate::read::{Batch, BatchColumn, BatchReader}; +use crate::read::{Batch, BatchColumn, BatchReader, BoxedBatchReader}; /// A reader that dedup sorted batches from a source based on the /// dedup strategy. @@ -581,6 +581,46 @@ impl>> Iterator for LastNonNullIter { } } +/// A reader that only returns tags for select distinct. +pub(crate) struct TagsOnlyReader { + source: BoxedBatchReader, + /// Batch to return. + to_return: Option, +} + +impl TagsOnlyReader { + /// Creates a new tags only reader. + pub(crate) fn new(source: BoxedBatchReader) -> Self { + Self { + source, + to_return: None, + } + } +} + +#[async_trait] +impl BatchReader for TagsOnlyReader { + async fn next_batch(&mut self) -> Result> { + while let Some(batch) = self.source.next_batch().await? { + if let Some(to_return) = self.to_return.take() { + if to_return.primary_key() != batch.primary_key() { + self.to_return = Some(batch); + // A new key, store the batch and returns the previous one. + return Ok(Some(to_return)); + } else { + // The same key, override the batch. + self.to_return = Some(batch); + } + } else { + // No batch to return, store the current batch. + self.to_return = Some(batch); + } + } + + Ok(self.to_return.take()) + } +} + #[cfg(test)] mod tests { use std::sync::Arc;