From 2e7b12c344c00a9f6ad073d39667eed35f6565bf Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 15 Jul 2024 20:33:36 +0800 Subject: [PATCH] feat: add a cache for last value result in row group (#4369) * feat: add selector result cache to cache manager * feat: expose config --- src/mito2/src/cache.rs | 118 ++++++++++++++++++++++++-- src/mito2/src/config.rs | 4 + src/mito2/src/read.rs | 13 +++ src/mito2/src/worker.rs | 2 + src/store-api/src/storage/requests.rs | 2 +- tests-integration/tests/http.rs | 5 +- 6 files changed, 135 insertions(+), 9 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index c36bcdbb83..c89e5afb56 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -30,23 +30,26 @@ use datatypes::vectors::VectorRef; use moka::sync::Cache; use parquet::column::page::Page; use parquet::file::metadata::ParquetMetaData; -use store_api::storage::{ConcreteDataType, RegionId}; +use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector}; use crate::cache::cache_size::parquet_meta_size; use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::index::{InvertedIndexCache, InvertedIndexCacheRef}; use crate::cache::write_cache::WriteCacheRef; use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; +use crate::read::Batch; use crate::sst::file::FileId; -// Metrics type key for sst meta. +/// Metrics type key for sst meta. const SST_META_TYPE: &str = "sst_meta"; -// Metrics type key for vector. +/// Metrics type key for vector. const VECTOR_TYPE: &str = "vector"; -// Metrics type key for pages. +/// Metrics type key for pages. const PAGE_TYPE: &str = "page"; -// Metrics type key for files on the local store. +/// Metrics type key for files on the local store. const FILE_TYPE: &str = "file"; +/// Metrics type key for selector result cache. +const SELECTOR_RESULT_TYPE: &str = "selector_result"; /// Manages cached data for the engine. /// @@ -63,6 +66,8 @@ pub struct CacheManager { write_cache: Option, /// Cache for inverted index. index_cache: Option, + /// Cache for time series selectors. + selector_result_cache: Option, } pub type CacheManagerRef = Arc; @@ -167,6 +172,33 @@ impl CacheManager { } } + /// Gets result of for the selector. + pub fn get_selector_result( + &self, + selector_key: &SelectorResultKey, + ) -> Option> { + self.selector_result_cache + .as_ref() + .and_then(|selector_result_cache| { + let value = selector_result_cache.get(selector_key); + update_hit_miss(value, SELECTOR_RESULT_TYPE) + }) + } + + /// Puts result of the selector into the cache. + pub fn put_selector_result( + &self, + selector_key: SelectorResultKey, + result: Arc, + ) { + if let Some(cache) = &self.selector_result_cache { + CACHE_BYTES + .with_label_values(&[SELECTOR_RESULT_TYPE]) + .add(selector_result_cache_weight(&selector_key, &result).into()); + cache.insert(selector_key, result); + } + } + /// Gets the write cache. pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> { self.write_cache.as_ref() @@ -186,6 +218,7 @@ pub struct CacheManagerBuilder { index_metadata_size: u64, index_content_size: u64, write_cache: Option, + selector_result_cache_size: u64, } impl CacheManagerBuilder { @@ -225,6 +258,12 @@ impl CacheManagerBuilder { self } + /// Sets selector result cache size. + pub fn selector_result_cache_size(mut self, bytes: u64) -> Self { + self.selector_result_cache_size = bytes; + self + } + /// Builds the [CacheManager]. pub fn build(self) -> CacheManager { let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| { @@ -261,15 +300,27 @@ impl CacheManagerBuilder { }) .build() }); - let inverted_index_cache = InvertedIndexCache::new(self.index_metadata_size, self.index_content_size); + let selector_result_cache = (self.selector_result_cache_size != 0).then(|| { + Cache::builder() + .max_capacity(self.selector_result_cache_size) + .weigher(selector_result_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = selector_result_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[SELECTOR_RESULT_TYPE]) + .sub(size.into()); + }) + .build() + }); CacheManager { sst_meta_cache, vector_cache, page_cache, write_cache: self.write_cache, index_cache: Some(Arc::new(inverted_index_cache)), + selector_result_cache, } } } @@ -288,6 +339,10 @@ fn page_cache_weight(k: &PageKey, v: &Arc) -> u32 { (k.estimated_size() + v.estimated_size()) as u32 } +fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc) -> u32 { + (mem::size_of_val(k) + v.estimated_size()) as u32 +} + /// Updates cache hit/miss metrics. fn update_hit_miss(value: Option, cache_type: &str) -> Option { if value.is_some() { @@ -348,6 +403,36 @@ impl PageValue { } } +/// Cache key for time series row selector result. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SelectorResultKey { + /// Id of the SST file. + pub file_id: FileId, + /// Index of the row group. + pub row_group_idx: usize, + /// Time series row selector. + pub selector: TimeSeriesRowSelector, +} + +/// Cached result for time series row selector. +pub struct SelectorResultValue { + /// Batches of rows selected by the selector. + pub result: Vec, +} + +impl SelectorResultValue { + /// Creates a new selector result value. + pub fn new(result: Vec) -> SelectorResultValue { + SelectorResultValue { result } + } + + /// Returns memory used by the value (estimated). + fn estimated_size(&self) -> usize { + // We only consider heap size of all batches. + self.result.iter().map(|batch| batch.memory_size()).sum() + } +} + /// Maps (region id, file id) to [ParquetMetaData]. type SstMetaCache = Cache>; /// Maps [Value] to a vector that holds this value repeatedly. @@ -356,9 +441,13 @@ type SstMetaCache = Cache>; type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>; /// Maps (region, file, row group, column) to [PageValue]. type PageCache = Cache>; +/// Maps (file id, row group id, time series row selector) to [SelectorResultValue]. +type SelectorResultCache = Cache>; #[cfg(test)] mod tests { + use std::sync::Arc; + use datatypes::vectors::Int64Vector; use super::*; @@ -453,4 +542,21 @@ mod tests { cache.put_pages(key.clone(), pages); assert!(cache.get_pages(&key).is_some()); } + + #[test] + fn test_selector_result_cache() { + let cache = CacheManager::builder() + .selector_result_cache_size(1000) + .build(); + let file_id = FileId::random(); + let key = SelectorResultKey { + file_id, + row_group_idx: 0, + selector: TimeSeriesRowSelector::LastRow, + }; + assert!(cache.get_selector_result(&key).is_none()); + let result = Arc::new(SelectorResultValue::new(Vec::new())); + cache.put_selector_result(key.clone(), result); + assert!(cache.get_selector_result(&key).is_some()); + } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 1533694bc9..012c31aaad 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -84,6 +84,8 @@ pub struct MitoConfig { pub vector_cache_size: ReadableSize, /// Cache size for pages of SST row groups. Setting it to 0 to disable the cache. pub page_cache_size: ReadableSize, + /// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache. + pub selector_result_cache_size: ReadableSize, /// Whether to enable the experimental write cache. pub enable_experimental_write_cache: bool, /// File system path for write cache, defaults to `{data_home}/write_cache`. @@ -133,6 +135,7 @@ impl Default for MitoConfig { sst_meta_cache_size: ReadableSize::mb(128), vector_cache_size: ReadableSize::mb(512), page_cache_size: ReadableSize::mb(512), + selector_result_cache_size: ReadableSize::mb(512), enable_experimental_write_cache: false, experimental_write_cache_path: String::new(), experimental_write_cache_size: ReadableSize::mb(512), @@ -237,6 +240,7 @@ impl MitoConfig { self.sst_meta_cache_size = sst_meta_cache_size; self.vector_cache_size = mem_cache_size; self.page_cache_size = mem_cache_size; + self.selector_result_cache_size = mem_cache_size; } /// Enable experimental write cache. diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 5c3d17119a..c008014510 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -375,6 +375,19 @@ impl Batch { self.take_in_place(&indices) } + /// Returns the estimated memory size of the batch. + pub fn memory_size(&self) -> usize { + let mut size = std::mem::size_of::(); + size += self.primary_key.len(); + size += self.timestamps.memory_size(); + size += self.sequences.memory_size(); + size += self.op_types.memory_size(); + for batch_column in &self.fields { + size += batch_column.data.memory_size(); + } + size + } + /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`. pub(crate) fn projected_fields( metadata: &RegionMetadata, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 617950cd06..35b65ad0d7 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -158,6 +158,7 @@ impl WorkerGroup { .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes()) .vector_cache_size(config.vector_cache_size.as_bytes()) .page_cache_size(config.page_cache_size.as_bytes()) + .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) .index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes()) .index_content_size(config.inverted_index.content_cache_size.as_bytes()) .write_cache(write_cache) @@ -292,6 +293,7 @@ impl WorkerGroup { .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes()) .vector_cache_size(config.vector_cache_size.as_bytes()) .page_cache_size(config.page_cache_size.as_bytes()) + .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) .write_cache(write_cache) .build(), ); diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index e1bf242468..8316afce2b 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -17,7 +17,7 @@ use datafusion_expr::expr::Expr; use strum::Display; /// A hint on how to select rows from a time-series. -#[derive(Clone, Debug, PartialEq, Eq, Display)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Display)] pub enum TimeSeriesRowSelector { /// Only keep the last row of each time-series. LastRow, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 06ee1a0221..95bd2aae04 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -888,6 +888,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "sst_meta_cache_size =", "vector_cache_size =", "page_cache_size =", + "selector_result_cache_size =", ]; input @@ -1141,10 +1142,10 @@ processors: - dissect: fields: - line - patterns: + patterns: - "%{+ts} %{+ts} %{content}" - date: - fields: + fields: - ts formats: - "%Y-%m-%d %H:%M:%S%.3f"