feat: use tag only reader

This commit is contained in:
evenyag
2025-03-13 23:07:37 +08:00
parent f935921831
commit fa57df9dc2
2 changed files with 8 additions and 4 deletions

View File

@@ -582,13 +582,13 @@ impl<I: Iterator<Item = Result<Batch>>> Iterator for LastNonNullIter<I> {
}
/// A reader that only returns tags for select distinct.
pub(crate) struct TagsOnlyReader {
pub(crate) struct TagOnlyReader {
source: BoxedBatchReader,
/// Batch to return.
to_return: Option<Batch>,
}
impl TagsOnlyReader {
impl TagOnlyReader {
/// Creates a new tags only reader.
pub(crate) fn new(source: BoxedBatchReader) -> Self {
Self {
@@ -599,7 +599,7 @@ impl TagsOnlyReader {
}
#[async_trait]
impl BatchReader for TagsOnlyReader {
impl BatchReader for TagOnlyReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if let Some(to_return) = self.to_return.take() {

View File

@@ -33,7 +33,7 @@ use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow, TagOnlyReader};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
@@ -216,6 +216,7 @@ impl SeqScan {
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = self.new_partition_metrics(partition);
let tag_only = self.stream_ctx.input.tag_only_distinct;
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -241,6 +242,9 @@ impl SeqScan {
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
if tag_only {
reader = Box::new(TagOnlyReader::new(reader));
}
let cache = &stream_ctx.input.cache_strategy;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();