diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index c89e5afb56..7a9f879669 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -404,7 +404,7 @@ impl PageValue { } /// Cache key for time series row selector result. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct SelectorResultKey { /// Id of the SST file. pub file_id: FileId, @@ -418,12 +418,14 @@ pub struct SelectorResultKey { pub struct SelectorResultValue { /// Batches of rows selected by the selector. pub result: Vec, + /// Projection of rows. + pub projection: Vec, } impl SelectorResultValue { /// Creates a new selector result value. - pub fn new(result: Vec) -> SelectorResultValue { - SelectorResultValue { result } + pub fn new(result: Vec, projection: Vec) -> SelectorResultValue { + SelectorResultValue { result, projection } } /// Returns memory used by the value (estimated). @@ -555,8 +557,8 @@ mod tests { selector: TimeSeriesRowSelector::LastRow, }; assert!(cache.get_selector_result(&key).is_none()); - let result = Arc::new(SelectorResultValue::new(Vec::new())); - cache.put_selector_result(key.clone(), result); + let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new())); + cache.put_selector_result(key, result); assert!(cache.get_selector_result(&key).is_some()); } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 3233e868a1..6496c80f85 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -16,7 +16,7 @@ pub mod compat; pub mod dedup; -pub(crate) mod last_row; +pub mod last_row; pub mod merge; pub mod projection; pub(crate) mod prune; diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index f91f20b4f8..f75a8094d8 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -13,10 +13,17 @@ // limitations under the License. //! Utilities to read the last row of each time series. -use async_trait::async_trait; +use std::sync::Arc; + +use async_trait::async_trait; +use store_api::storage::TimeSeriesRowSelector; + +use crate::cache::{CacheManagerRef, SelectorResultKey, SelectorResultValue}; use crate::error::Result; use crate::read::{Batch, BatchReader, BoxedBatchReader}; +use crate::sst::file::FileId; +use crate::sst::parquet::reader::RowGroupReader; /// Reader to keep the last row for each time series. /// It assumes that batches from the input reader are @@ -60,6 +67,151 @@ impl BatchReader for LastRowReader { } } +/// Cached last row reader for specific row group. +/// If the last rows for current row group are already cached, this reader returns the cached value. +/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache +/// upon finish. +pub(crate) enum RowGroupLastRowCachedReader { + /// Cache hit, reads last rows from cached value. + Hit(LastRowCacheReader), + /// Cache miss, reads from row group reader and update cache. + Miss(RowGroupLastRowReader), +} + +impl RowGroupLastRowCachedReader { + pub(crate) fn new( + file_id: FileId, + row_group_idx: usize, + cache_manager: Option, + row_group_reader: RowGroupReader, + ) -> Self { + let key = SelectorResultKey { + file_id, + row_group_idx, + selector: TimeSeriesRowSelector::LastRow, + }; + + let Some(cache_manager) = cache_manager else { + return Self::Miss(RowGroupLastRowReader::new(key, row_group_reader, None)); + }; + if let Some(value) = cache_manager.get_selector_result(&key) { + let schema_matches = value.projection + == row_group_reader + .context() + .read_format() + .projection_indices(); + if schema_matches { + // Schema matches, use cache batches. + Self::Hit(LastRowCacheReader { value, idx: 0 }) + } else { + Self::Miss(RowGroupLastRowReader::new( + key, + row_group_reader, + Some(cache_manager), + )) + } + } else { + Self::Miss(RowGroupLastRowReader::new( + key, + row_group_reader, + Some(cache_manager), + )) + } + } +} + +#[async_trait] +impl BatchReader for RowGroupLastRowCachedReader { + async fn next_batch(&mut self) -> Result> { + match self { + RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await, + RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await, + } + } +} + +/// Last row reader that returns the cached last rows for row group. +pub(crate) struct LastRowCacheReader { + value: Arc, + idx: usize, +} + +impl LastRowCacheReader { + /// Iterates cached last rows. + async fn next_batch(&mut self) -> Result> { + if self.idx < self.value.result.len() { + let res = Ok(Some(self.value.result[self.idx].clone())); + self.idx += 1; + res + } else { + Ok(None) + } + } +} + +pub(crate) struct RowGroupLastRowReader { + key: SelectorResultKey, + reader: RowGroupReader, + selector: LastRowSelector, + yielded_batches: Vec, + cache_manager: Option, +} + +impl RowGroupLastRowReader { + fn new( + key: SelectorResultKey, + reader: RowGroupReader, + cache_manager: Option, + ) -> Self { + Self { + key, + reader, + selector: LastRowSelector::default(), + yielded_batches: vec![], + cache_manager, + } + } + + async fn next_batch(&mut self) -> Result> { + while let Some(batch) = self.reader.next_batch().await? { + if let Some(yielded) = self.selector.on_next(batch) { + if self.cache_manager.is_some() { + self.yielded_batches.push(yielded.clone()); + } + return Ok(Some(yielded)); + } + } + let last_batch = if let Some(last_batch) = self.selector.finish() { + if self.cache_manager.is_some() { + self.yielded_batches.push(last_batch.clone()); + } + Some(last_batch) + } else { + None + }; + + // All last rows in row group are yielded, update cache. + self.maybe_update_cache(); + Ok(last_batch) + } + + /// Updates row group's last row cache if cache manager is present. + fn maybe_update_cache(&mut self) { + if let Some(cache) = &self.cache_manager { + let value = Arc::new(SelectorResultValue { + result: std::mem::take(&mut self.yielded_batches), + projection: self + .reader + .context() + .read_format() + .projection_indices() + .to_vec(), + }); + cache.put_selector_result(self.key, value) + } + } +} + /// Common struct that selects only the last row of each time series. #[derive(Default)] pub struct LastRowSelector { diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 6fd790f543..58c81a1815 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -13,14 +13,14 @@ // limitations under the License. use crate::error::Result; -use crate::read::last_row::LastRowReader; +use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::{Batch, BatchReader}; use crate::sst::parquet::file_range::FileRangeContextRef; use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader}; pub enum Source { RowGroup(RowGroupReader), - LastRow(LastRowReader), + LastRow(RowGroupLastRowCachedReader), } impl Source { @@ -53,7 +53,7 @@ impl PruneReader { pub(crate) fn new_with_last_row_reader( ctx: FileRangeContextRef, - reader: LastRowReader, + reader: RowGroupLastRowCachedReader, ) -> Self { Self { context: ctx, diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 6c51f8258c..0976c2402f 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -30,7 +30,7 @@ use crate::error::{ DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu, }; use crate::read::compat::CompatBatch; -use crate::read::last_row::LastRowReader; +use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::prune::PruneReader; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec}; @@ -111,13 +111,13 @@ impl FileRange { let prune_reader = if use_last_row_reader { // Row group is PUT only, use LastRowReader to skip unnecessary rows. - PruneReader::new_with_last_row_reader( - self.context.clone(), - LastRowReader::new(Box::new(RowGroupReader::new( - self.context.clone(), - parquet_reader, - )) as _), - ) + let reader = RowGroupLastRowCachedReader::new( + self.file_handle().file_id(), + self.row_group_idx, + self.context.reader_builder.cache_manager().clone(), + RowGroupReader::new(self.context.clone(), parquet_reader), + ); + PruneReader::new_with_last_row_reader(self.context.clone(), reader) } else { // Row group contains DELETE, fallback to default reader. PruneReader::new_with_row_group_reader( diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 6b207a4bd0..4e4956b7d1 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -781,6 +781,10 @@ impl RowGroupReaderBuilder { &self.parquet_meta } + pub(crate) fn cache_manager(&self) -> &Option { + &self.cache_manager + } + /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. pub(crate) async fn build( &self, @@ -1081,6 +1085,9 @@ impl RowGroupReader { pub(crate) fn metrics(&self) -> &ReaderMetrics { &self.metrics } + pub(crate) fn context(&self) -> &FileRangeContextRef { + &self.context + } /// Tries to fetch next [RecordBatch] from the reader. fn fetch_next_record_batch(&mut self) -> Result> {