feat: add a cache for last value result in row group (#4369)

* feat: add selector result cache to cache manager

* feat: expose config
This commit is contained in:
Yingwen
2024-07-15 20:33:36 +08:00
committed by GitHub
parent 2b912d93fb
commit 2e7b12c344
6 changed files with 135 additions and 9 deletions

View File

@@ -30,23 +30,26 @@ use datatypes::vectors::VectorRef;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::{ConcreteDataType, RegionId};
use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};
use crate::cache::cache_size::parquet_meta_size;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::index::{InvertedIndexCache, InvertedIndexCacheRef};
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::FileId;
// Metrics type key for sst meta.
/// Metrics type key for sst meta.
const SST_META_TYPE: &str = "sst_meta";
// Metrics type key for vector.
/// Metrics type key for vector.
const VECTOR_TYPE: &str = "vector";
// Metrics type key for pages.
/// Metrics type key for pages.
const PAGE_TYPE: &str = "page";
// Metrics type key for files on the local store.
/// Metrics type key for files on the local store.
const FILE_TYPE: &str = "file";
/// Metrics type key for selector result cache.
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Manages cached data for the engine.
///
@@ -63,6 +66,8 @@ pub struct CacheManager {
write_cache: Option<WriteCacheRef>,
/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,
/// Cache for time series selectors.
selector_result_cache: Option<SelectorResultCache>,
}
pub type CacheManagerRef = Arc<CacheManager>;
@@ -167,6 +172,33 @@ impl CacheManager {
}
}
/// Gets result of for the selector.
pub fn get_selector_result(
&self,
selector_key: &SelectorResultKey,
) -> Option<Arc<SelectorResultValue>> {
self.selector_result_cache
.as_ref()
.and_then(|selector_result_cache| {
let value = selector_result_cache.get(selector_key);
update_hit_miss(value, SELECTOR_RESULT_TYPE)
})
}
/// Puts result of the selector into the cache.
pub fn put_selector_result(
&self,
selector_key: SelectorResultKey,
result: Arc<SelectorResultValue>,
) {
if let Some(cache) = &self.selector_result_cache {
CACHE_BYTES
.with_label_values(&[SELECTOR_RESULT_TYPE])
.add(selector_result_cache_weight(&selector_key, &result).into());
cache.insert(selector_key, result);
}
}
/// Gets the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
@@ -186,6 +218,7 @@ pub struct CacheManagerBuilder {
index_metadata_size: u64,
index_content_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
}
impl CacheManagerBuilder {
@@ -225,6 +258,12 @@ impl CacheManagerBuilder {
self
}
/// Sets selector result cache size.
pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
self.selector_result_cache_size = bytes;
self
}
/// Builds the [CacheManager].
pub fn build(self) -> CacheManager {
let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
@@ -261,15 +300,27 @@ impl CacheManagerBuilder {
})
.build()
});
let inverted_index_cache =
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.selector_result_cache_size)
.weigher(selector_result_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = selector_result_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[SELECTOR_RESULT_TYPE])
.sub(size.into());
})
.build()
});
CacheManager {
sst_meta_cache,
vector_cache,
page_cache,
write_cache: self.write_cache,
index_cache: Some(Arc::new(inverted_index_cache)),
selector_result_cache,
}
}
}
@@ -288,6 +339,10 @@ fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
(k.estimated_size() + v.estimated_size()) as u32
}
fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
(mem::size_of_val(k) + v.estimated_size()) as u32
}
/// Updates cache hit/miss metrics.
fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
if value.is_some() {
@@ -348,6 +403,36 @@ impl PageValue {
}
}
/// Cache key for time series row selector result.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SelectorResultKey {
/// Id of the SST file.
pub file_id: FileId,
/// Index of the row group.
pub row_group_idx: usize,
/// Time series row selector.
pub selector: TimeSeriesRowSelector,
}
/// Cached result for time series row selector.
pub struct SelectorResultValue {
/// Batches of rows selected by the selector.
pub result: Vec<Batch>,
}
impl SelectorResultValue {
/// Creates a new selector result value.
pub fn new(result: Vec<Batch>) -> SelectorResultValue {
SelectorResultValue { result }
}
/// Returns memory used by the value (estimated).
fn estimated_size(&self) -> usize {
// We only consider heap size of all batches.
self.result.iter().map(|batch| batch.memory_size()).sum()
}
}
/// Maps (region id, file id) to [ParquetMetaData].
type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
/// Maps [Value] to a vector that holds this value repeatedly.
@@ -356,9 +441,13 @@ type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
/// Maps (region, file, row group, column) to [PageValue].
type PageCache = Cache<PageKey, Arc<PageValue>>;
/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::Int64Vector;
use super::*;
@@ -453,4 +542,21 @@ mod tests {
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_some());
}
#[test]
fn test_selector_result_cache() {
let cache = CacheManager::builder()
.selector_result_cache_size(1000)
.build();
let file_id = FileId::random();
let key = SelectorResultKey {
file_id,
row_group_idx: 0,
selector: TimeSeriesRowSelector::LastRow,
};
assert!(cache.get_selector_result(&key).is_none());
let result = Arc::new(SelectorResultValue::new(Vec::new()));
cache.put_selector_result(key.clone(), result);
assert!(cache.get_selector_result(&key).is_some());
}
}

View File

@@ -84,6 +84,8 @@ pub struct MitoConfig {
pub vector_cache_size: ReadableSize,
/// Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
pub page_cache_size: ReadableSize,
/// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
pub selector_result_cache_size: ReadableSize,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// File system path for write cache, defaults to `{data_home}/write_cache`.
@@ -133,6 +135,7 @@ impl Default for MitoConfig {
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::mb(512),
@@ -237,6 +240,7 @@ impl MitoConfig {
self.sst_meta_cache_size = sst_meta_cache_size;
self.vector_cache_size = mem_cache_size;
self.page_cache_size = mem_cache_size;
self.selector_result_cache_size = mem_cache_size;
}
/// Enable experimental write cache.

View File

@@ -375,6 +375,19 @@ impl Batch {
self.take_in_place(&indices)
}
/// Returns the estimated memory size of the batch.
pub fn memory_size(&self) -> usize {
let mut size = std::mem::size_of::<Self>();
size += self.primary_key.len();
size += self.timestamps.memory_size();
size += self.sequences.memory_size();
size += self.op_types.memory_size();
for batch_column in &self.fields {
size += batch_column.data.memory_size();
}
size
}
/// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
pub(crate) fn projected_fields(
metadata: &RegionMetadata,

View File

@@ -158,6 +158,7 @@ impl WorkerGroup {
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes())
.index_content_size(config.inverted_index.content_cache_size.as_bytes())
.write_cache(write_cache)
@@ -292,6 +293,7 @@ impl WorkerGroup {
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);

View File

@@ -17,7 +17,7 @@ use datafusion_expr::expr::Expr;
use strum::Display;
/// A hint on how to select rows from a time-series.
#[derive(Clone, Debug, PartialEq, Eq, Display)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Display)]
pub enum TimeSeriesRowSelector {
/// Only keep the last row of each time-series.
LastRow,

View File

@@ -888,6 +888,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"sst_meta_cache_size =",
"vector_cache_size =",
"page_cache_size =",
"selector_result_cache_size =",
];
input
@@ -1141,10 +1142,10 @@ processors:
- dissect:
fields:
- line
patterns:
patterns:
- "%{+ts} %{+ts} %{content}"
- date:
fields:
fields:
- ts
formats:
- "%Y-%m-%d %H:%M:%S%.3f"