feat: Add caching for last row reader and expose cache manager (#4375)

* Add caching for last row reader and expose cache manager
 - Implement `RowGroupLastRowCachedReader` to handle cache hits and misses for last row reads.

* Add projection field to SelectorResultValue and refactor RowGroupLastRowReader

 - Introduced `projection` field in `SelectorResultValue` to store projection indices.
This commit is contained in:
Lei, HUANG
2024-07-16 15:13:39 +08:00
committed by GitHub
parent be3ea0fae7
commit 2010a2a33d
6 changed files with 179 additions and 18 deletions

View File

@@ -404,7 +404,7 @@ impl PageValue {
}
/// Cache key for time series row selector result.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SelectorResultKey {
/// Id of the SST file.
pub file_id: FileId,
@@ -418,12 +418,14 @@ pub struct SelectorResultKey {
pub struct SelectorResultValue {
/// Batches of rows selected by the selector.
pub result: Vec<Batch>,
/// Projection of rows.
pub projection: Vec<usize>,
}
impl SelectorResultValue {
/// Creates a new selector result value.
pub fn new(result: Vec<Batch>) -> SelectorResultValue {
SelectorResultValue { result }
pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
SelectorResultValue { result, projection }
}
/// Returns memory used by the value (estimated).
@@ -555,8 +557,8 @@ mod tests {
selector: TimeSeriesRowSelector::LastRow,
};
assert!(cache.get_selector_result(&key).is_none());
let result = Arc::new(SelectorResultValue::new(Vec::new()));
cache.put_selector_result(key.clone(), result);
let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
cache.put_selector_result(key, result);
assert!(cache.get_selector_result(&key).is_some());
}
}

View File

@@ -16,7 +16,7 @@
pub mod compat;
pub mod dedup;
pub(crate) mod last_row;
pub mod last_row;
pub mod merge;
pub mod projection;
pub(crate) mod prune;

View File

@@ -13,10 +13,17 @@
// limitations under the License.
//! Utilities to read the last row of each time series.
use async_trait::async_trait;
use std::sync::Arc;
use async_trait::async_trait;
use store_api::storage::TimeSeriesRowSelector;
use crate::cache::{CacheManagerRef, SelectorResultKey, SelectorResultValue};
use crate::error::Result;
use crate::read::{Batch, BatchReader, BoxedBatchReader};
use crate::sst::file::FileId;
use crate::sst::parquet::reader::RowGroupReader;
/// Reader to keep the last row for each time series.
/// It assumes that batches from the input reader are
@@ -60,6 +67,151 @@ impl BatchReader for LastRowReader {
}
}
/// Cached last row reader for specific row group.
/// If the last rows for current row group are already cached, this reader returns the cached value.
/// If cache misses, [RowGroupLastRowReader] reads last rows from row group and updates the cache
/// upon finish.
pub(crate) enum RowGroupLastRowCachedReader {
/// Cache hit, reads last rows from cached value.
Hit(LastRowCacheReader),
/// Cache miss, reads from row group reader and update cache.
Miss(RowGroupLastRowReader),
}
impl RowGroupLastRowCachedReader {
pub(crate) fn new(
file_id: FileId,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
row_group_reader: RowGroupReader,
) -> Self {
let key = SelectorResultKey {
file_id,
row_group_idx,
selector: TimeSeriesRowSelector::LastRow,
};
let Some(cache_manager) = cache_manager else {
return Self::Miss(RowGroupLastRowReader::new(key, row_group_reader, None));
};
if let Some(value) = cache_manager.get_selector_result(&key) {
let schema_matches = value.projection
== row_group_reader
.context()
.read_format()
.projection_indices();
if schema_matches {
// Schema matches, use cache batches.
Self::Hit(LastRowCacheReader { value, idx: 0 })
} else {
Self::Miss(RowGroupLastRowReader::new(
key,
row_group_reader,
Some(cache_manager),
))
}
} else {
Self::Miss(RowGroupLastRowReader::new(
key,
row_group_reader,
Some(cache_manager),
))
}
}
}
#[async_trait]
impl BatchReader for RowGroupLastRowCachedReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
RowGroupLastRowCachedReader::Miss(r) => r.next_batch().await,
}
}
}
/// Last row reader that returns the cached last rows for row group.
pub(crate) struct LastRowCacheReader {
value: Arc<SelectorResultValue>,
idx: usize,
}
impl LastRowCacheReader {
/// Iterates cached last rows.
async fn next_batch(&mut self) -> Result<Option<Batch>> {
if self.idx < self.value.result.len() {
let res = Ok(Some(self.value.result[self.idx].clone()));
self.idx += 1;
res
} else {
Ok(None)
}
}
}
pub(crate) struct RowGroupLastRowReader {
key: SelectorResultKey,
reader: RowGroupReader,
selector: LastRowSelector,
yielded_batches: Vec<Batch>,
cache_manager: Option<CacheManagerRef>,
}
impl RowGroupLastRowReader {
fn new(
key: SelectorResultKey,
reader: RowGroupReader,
cache_manager: Option<CacheManagerRef>,
) -> Self {
Self {
key,
reader,
selector: LastRowSelector::default(),
yielded_batches: vec![],
cache_manager,
}
}
async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.reader.next_batch().await? {
if let Some(yielded) = self.selector.on_next(batch) {
if self.cache_manager.is_some() {
self.yielded_batches.push(yielded.clone());
}
return Ok(Some(yielded));
}
}
let last_batch = if let Some(last_batch) = self.selector.finish() {
if self.cache_manager.is_some() {
self.yielded_batches.push(last_batch.clone());
}
Some(last_batch)
} else {
None
};
// All last rows in row group are yielded, update cache.
self.maybe_update_cache();
Ok(last_batch)
}
/// Updates row group's last row cache if cache manager is present.
fn maybe_update_cache(&mut self) {
if let Some(cache) = &self.cache_manager {
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self
.reader
.context()
.read_format()
.projection_indices()
.to_vec(),
});
cache.put_selector_result(self.key, value)
}
}
}
/// Common struct that selects only the last row of each time series.
#[derive(Default)]
pub struct LastRowSelector {

View File

@@ -13,14 +13,14 @@
// limitations under the License.
use crate::error::Result;
use crate::read::last_row::LastRowReader;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::{Batch, BatchReader};
use crate::sst::parquet::file_range::FileRangeContextRef;
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
pub enum Source {
RowGroup(RowGroupReader),
LastRow(LastRowReader),
LastRow(RowGroupLastRowCachedReader),
}
impl Source {
@@ -53,7 +53,7 @@ impl PruneReader {
pub(crate) fn new_with_last_row_reader(
ctx: FileRangeContextRef,
reader: LastRowReader,
reader: RowGroupLastRowCachedReader,
) -> Self {
Self {
context: ctx,

View File

@@ -30,7 +30,7 @@ use crate::error::{
DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu,
};
use crate::read::compat::CompatBatch;
use crate::read::last_row::LastRowReader;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::prune::PruneReader;
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec};
@@ -111,13 +111,13 @@ impl FileRange {
let prune_reader = if use_last_row_reader {
// Row group is PUT only, use LastRowReader to skip unnecessary rows.
PruneReader::new_with_last_row_reader(
self.context.clone(),
LastRowReader::new(Box::new(RowGroupReader::new(
self.context.clone(),
parquet_reader,
)) as _),
)
let reader = RowGroupLastRowCachedReader::new(
self.file_handle().file_id(),
self.row_group_idx,
self.context.reader_builder.cache_manager().clone(),
RowGroupReader::new(self.context.clone(), parquet_reader),
);
PruneReader::new_with_last_row_reader(self.context.clone(), reader)
} else {
// Row group contains DELETE, fallback to default reader.
PruneReader::new_with_row_group_reader(

View File

@@ -781,6 +781,10 @@ impl RowGroupReaderBuilder {
&self.parquet_meta
}
pub(crate) fn cache_manager(&self) -> &Option<CacheManagerRef> {
&self.cache_manager
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
pub(crate) async fn build(
&self,
@@ -1081,6 +1085,9 @@ impl RowGroupReader {
pub(crate) fn metrics(&self) -> &ReaderMetrics {
&self.metrics
}
pub(crate) fn context(&self) -> &FileRangeContextRef {
&self.context
}
/// Tries to fetch next [RecordBatch] from the reader.
fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {