feat: tags only reader

This commit is contained in:
evenyag
2025-03-13 23:01:39 +08:00
parent 7f7d431cd8
commit f935921831

View File

@@ -25,7 +25,7 @@ use datatypes::vectors::MutableVector;
use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::read::{Batch, BatchColumn, BatchReader, BoxedBatchReader};
/// A reader that dedup sorted batches from a source based on the
/// dedup strategy.
@@ -581,6 +581,46 @@ impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
}
}
/// A reader that only returns tags for select distinct.
pub(crate) struct TagsOnlyReader {
source: BoxedBatchReader,
/// Batch to return.
to_return: Option<Batch>,
}
impl TagsOnlyReader {
/// Creates a new tags only reader.
pub(crate) fn new(source: BoxedBatchReader) -> Self {
Self {
source,
to_return: None,
}
}
}
#[async_trait]
impl BatchReader for TagsOnlyReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if let Some(to_return) = self.to_return.take() {
if to_return.primary_key() != batch.primary_key() {
self.to_return = Some(batch);
// A new key, store the batch and returns the previous one.
return Ok(Some(to_return));
} else {
// The same key, override the batch.
self.to_return = Some(batch);
}
} else {
// No batch to return, store the current batch.
self.to_return = Some(batch);
}
}
Ok(self.to_return.take())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;