mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use datatypes::arrow::record_batch::RecordBatch;
32use datatypes::value::Value;
33use datatypes::vectors::VectorRef;
34use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
35use index::result_cache::IndexResultCache;
36use moka::notification::RemovalCause;
37use moka::sync::Cache;
38use object_store::ObjectStore;
39use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
40use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
41use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
42
43use crate::cache::cache_size::parquet_meta_size;
44use crate::cache::file_cache::{FileType, IndexKey};
45use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
46#[cfg(feature = "vector_index")]
47use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
48use crate::cache::write_cache::WriteCacheRef;
49use crate::memtable::record_batch_estimated_size;
50use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
51use crate::read::Batch;
52use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
53use crate::sst::file::{RegionFileId, RegionIndexId};
54use crate::sst::parquet::reader::MetadataCacheMetrics;
55
56/// Metrics type key for sst meta.
57const SST_META_TYPE: &str = "sst_meta";
58/// Metrics type key for vector.
59const VECTOR_TYPE: &str = "vector";
60/// Metrics type key for pages.
61const PAGE_TYPE: &str = "page";
62/// Metrics type key for files on the local store.
63const FILE_TYPE: &str = "file";
64/// Metrics type key for index files (puffin) on the local store.
65const INDEX_TYPE: &str = "index";
66/// Metrics type key for selector result cache.
67const SELECTOR_RESULT_TYPE: &str = "selector_result";
68/// Metrics type key for range scan result cache.
69const RANGE_RESULT_TYPE: &str = "range_result";
70
71/// Cache strategies that may only enable a subset of caches.
72#[derive(Clone)]
73pub enum CacheStrategy {
74    /// Strategy for normal operations.
75    /// Doesn't disable any cache.
76    EnableAll(CacheManagerRef),
77    /// Strategy for compaction.
78    /// Disables some caches during compaction to avoid affecting queries.
79    /// Enables the write cache so that the compaction can read files cached
80    /// in the write cache and write the compacted files back to the write cache.
81    Compaction(CacheManagerRef),
82    /// Do not use any cache.
83    Disabled,
84}
85
86impl CacheStrategy {
87    /// Gets parquet metadata with cache metrics tracking.
88    /// Returns the metadata and updates the provided metrics.
89    pub(crate) async fn get_parquet_meta_data(
90        &self,
91        file_id: RegionFileId,
92        metrics: &mut MetadataCacheMetrics,
93        page_index_policy: PageIndexPolicy,
94    ) -> Option<Arc<ParquetMetaData>> {
95        match self {
96            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
97                cache_manager
98                    .get_parquet_meta_data(file_id, metrics, page_index_policy)
99                    .await
100            }
101            CacheStrategy::Disabled => {
102                metrics.cache_miss += 1;
103                None
104            }
105        }
106    }
107
108    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
109    pub fn get_parquet_meta_data_from_mem_cache(
110        &self,
111        file_id: RegionFileId,
112    ) -> Option<Arc<ParquetMetaData>> {
113        match self {
114            CacheStrategy::EnableAll(cache_manager) => {
115                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
116            }
117            CacheStrategy::Compaction(cache_manager) => {
118                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
119            }
120            CacheStrategy::Disabled => None,
121        }
122    }
123
124    /// Calls [CacheManager::put_parquet_meta_data()].
125    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
126        match self {
127            CacheStrategy::EnableAll(cache_manager) => {
128                cache_manager.put_parquet_meta_data(file_id, metadata);
129            }
130            CacheStrategy::Compaction(cache_manager) => {
131                cache_manager.put_parquet_meta_data(file_id, metadata);
132            }
133            CacheStrategy::Disabled => {}
134        }
135    }
136
137    /// Calls [CacheManager::remove_parquet_meta_data()].
138    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
139        match self {
140            CacheStrategy::EnableAll(cache_manager) => {
141                cache_manager.remove_parquet_meta_data(file_id);
142            }
143            CacheStrategy::Compaction(cache_manager) => {
144                cache_manager.remove_parquet_meta_data(file_id);
145            }
146            CacheStrategy::Disabled => {}
147        }
148    }
149
150    /// Calls [CacheManager::get_repeated_vector()].
151    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
152    pub fn get_repeated_vector(
153        &self,
154        data_type: &ConcreteDataType,
155        value: &Value,
156    ) -> Option<VectorRef> {
157        match self {
158            CacheStrategy::EnableAll(cache_manager) => {
159                cache_manager.get_repeated_vector(data_type, value)
160            }
161            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
162        }
163    }
164
165    /// Calls [CacheManager::put_repeated_vector()].
166    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
167    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
168        if let CacheStrategy::EnableAll(cache_manager) = self {
169            cache_manager.put_repeated_vector(value, vector);
170        }
171    }
172
173    /// Calls [CacheManager::get_pages()].
174    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
175    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
176        match self {
177            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
178            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
179        }
180    }
181
182    /// Calls [CacheManager::put_pages()].
183    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
184    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
185        if let CacheStrategy::EnableAll(cache_manager) = self {
186            cache_manager.put_pages(page_key, pages);
187        }
188    }
189
190    /// Calls [CacheManager::evict_puffin_cache()].
191    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
192        match self {
193            CacheStrategy::EnableAll(cache_manager) => {
194                cache_manager.evict_puffin_cache(file_id).await
195            }
196            CacheStrategy::Compaction(cache_manager) => {
197                cache_manager.evict_puffin_cache(file_id).await
198            }
199            CacheStrategy::Disabled => {}
200        }
201    }
202
203    /// Calls [CacheManager::get_selector_result()].
204    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
205    pub fn get_selector_result(
206        &self,
207        selector_key: &SelectorResultKey,
208    ) -> Option<Arc<SelectorResultValue>> {
209        match self {
210            CacheStrategy::EnableAll(cache_manager) => {
211                cache_manager.get_selector_result(selector_key)
212            }
213            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
214        }
215    }
216
217    /// Calls [CacheManager::put_selector_result()].
218    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
219    pub fn put_selector_result(
220        &self,
221        selector_key: SelectorResultKey,
222        result: Arc<SelectorResultValue>,
223    ) {
224        if let CacheStrategy::EnableAll(cache_manager) = self {
225            cache_manager.put_selector_result(selector_key, result);
226        }
227    }
228
229    /// Calls [CacheManager::get_range_result()].
230    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
231    #[cfg_attr(not(test), allow(dead_code))]
232    pub(crate) fn get_range_result(
233        &self,
234        key: &RangeScanCacheKey,
235    ) -> Option<Arc<RangeScanCacheValue>> {
236        match self {
237            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
238            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
239        }
240    }
241
242    /// Calls [CacheManager::put_range_result()].
243    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
244    #[cfg_attr(not(test), allow(dead_code))]
245    pub(crate) fn put_range_result(
246        &self,
247        key: RangeScanCacheKey,
248        result: Arc<RangeScanCacheValue>,
249    ) {
250        if let CacheStrategy::EnableAll(cache_manager) = self {
251            cache_manager.put_range_result(key, result);
252        }
253    }
254
255    /// Calls [CacheManager::write_cache()].
256    /// It returns None if the strategy is [CacheStrategy::Disabled].
257    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
258        match self {
259            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
260            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
261            CacheStrategy::Disabled => None,
262        }
263    }
264
265    /// Calls [CacheManager::index_cache()].
266    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
267    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
268        match self {
269            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
270            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
271        }
272    }
273
274    /// Calls [CacheManager::bloom_filter_index_cache()].
275    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
276    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
277        match self {
278            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
279            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
280        }
281    }
282
283    /// Calls [CacheManager::vector_index_cache()].
284    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
285    #[cfg(feature = "vector_index")]
286    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
287        match self {
288            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
289            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
290        }
291    }
292
293    /// Calls [CacheManager::puffin_metadata_cache()].
294    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
295    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
296        match self {
297            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
298            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
299        }
300    }
301
302    /// Calls [CacheManager::index_result_cache()].
303    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
304    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
305        match self {
306            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
307            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
308        }
309    }
310
311    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
312    pub fn maybe_download_background(
313        &self,
314        index_key: IndexKey,
315        remote_path: String,
316        remote_store: ObjectStore,
317        file_size: u64,
318    ) {
319        if let CacheStrategy::EnableAll(cache_manager) = self
320            && let Some(write_cache) = cache_manager.write_cache()
321        {
322            write_cache.file_cache().maybe_download_background(
323                index_key,
324                remote_path,
325                remote_store,
326                file_size,
327            );
328        }
329    }
330}
331
332/// Manages cached data for the engine.
333///
334/// All caches are disabled by default.
335#[derive(Default)]
336pub struct CacheManager {
337    /// Cache for SST metadata.
338    sst_meta_cache: Option<SstMetaCache>,
339    /// Cache for vectors.
340    vector_cache: Option<VectorCache>,
341    /// Cache for SST pages.
342    page_cache: Option<PageCache>,
343    /// A Cache for writing files to object stores.
344    write_cache: Option<WriteCacheRef>,
345    /// Cache for inverted index.
346    inverted_index_cache: Option<InvertedIndexCacheRef>,
347    /// Cache for bloom filter index.
348    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
349    /// Cache for vector index.
350    #[cfg(feature = "vector_index")]
351    vector_index_cache: Option<VectorIndexCacheRef>,
352    /// Puffin metadata cache.
353    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
354    /// Cache for time series selectors.
355    selector_result_cache: Option<SelectorResultCache>,
356    /// Cache for range scan outputs in flat format.
357    #[cfg_attr(not(test), allow(dead_code))]
358    range_result_cache: Option<RangeResultCache>,
359    /// Cache for index result.
360    index_result_cache: Option<IndexResultCache>,
361}
362
363pub type CacheManagerRef = Arc<CacheManager>;
364
365impl CacheManager {
366    /// Returns a builder to build the cache.
367    pub fn builder() -> CacheManagerBuilder {
368        CacheManagerBuilder::default()
369    }
370
371    /// Gets cached [ParquetMetaData] with metrics tracking.
372    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
373    pub(crate) async fn get_parquet_meta_data(
374        &self,
375        file_id: RegionFileId,
376        metrics: &mut MetadataCacheMetrics,
377        page_index_policy: PageIndexPolicy,
378    ) -> Option<Arc<ParquetMetaData>> {
379        // Try to get metadata from sst meta cache
380        if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
381            metrics.mem_cache_hit += 1;
382            return Some(metadata);
383        }
384
385        // Try to get metadata from write cache
386        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
387        if let Some(write_cache) = &self.write_cache
388            && let Some(metadata) = write_cache
389                .file_cache()
390                .get_parquet_meta_data(key, metrics, page_index_policy)
391                .await
392        {
393            metrics.file_cache_hit += 1;
394            let metadata = Arc::new(metadata);
395            // Put metadata into sst meta cache
396            self.put_parquet_meta_data(file_id, metadata.clone());
397            return Some(metadata);
398        };
399        metrics.cache_miss += 1;
400
401        None
402    }
403
404    /// Gets cached [ParquetMetaData] from in-memory cache.
405    /// This method does not perform I/O.
406    pub fn get_parquet_meta_data_from_mem_cache(
407        &self,
408        file_id: RegionFileId,
409    ) -> Option<Arc<ParquetMetaData>> {
410        // Try to get metadata from sst meta cache
411        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
412            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
413            update_hit_miss(value, SST_META_TYPE)
414        })
415    }
416
417    /// Puts [ParquetMetaData] into the cache.
418    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
419        if let Some(cache) = &self.sst_meta_cache {
420            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
421            CACHE_BYTES
422                .with_label_values(&[SST_META_TYPE])
423                .add(meta_cache_weight(&key, &metadata).into());
424            cache.insert(key, metadata);
425        }
426    }
427
428    /// Removes [ParquetMetaData] from the cache.
429    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
430        if let Some(cache) = &self.sst_meta_cache {
431            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
432        }
433    }
434
435    /// Returns the total weighted size of the in-memory SST meta cache.
436    pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
437        self.sst_meta_cache
438            .as_ref()
439            .map(|cache| cache.weighted_size())
440            .unwrap_or(0)
441    }
442
443    /// Returns true if the in-memory SST meta cache is enabled.
444    pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
445        self.sst_meta_cache.is_some()
446    }
447
448    /// Gets a vector with repeated value for specific `key`.
449    pub fn get_repeated_vector(
450        &self,
451        data_type: &ConcreteDataType,
452        value: &Value,
453    ) -> Option<VectorRef> {
454        self.vector_cache.as_ref().and_then(|vector_cache| {
455            let value = vector_cache.get(&(data_type.clone(), value.clone()));
456            update_hit_miss(value, VECTOR_TYPE)
457        })
458    }
459
460    /// Puts a vector with repeated value into the cache.
461    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
462        if let Some(cache) = &self.vector_cache {
463            let key = (vector.data_type(), value);
464            CACHE_BYTES
465                .with_label_values(&[VECTOR_TYPE])
466                .add(vector_cache_weight(&key, &vector).into());
467            cache.insert(key, vector);
468        }
469    }
470
471    /// Gets pages for the row group.
472    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
473        self.page_cache.as_ref().and_then(|page_cache| {
474            let value = page_cache.get(page_key);
475            update_hit_miss(value, PAGE_TYPE)
476        })
477    }
478
479    /// Puts pages of the row group into the cache.
480    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
481        if let Some(cache) = &self.page_cache {
482            CACHE_BYTES
483                .with_label_values(&[PAGE_TYPE])
484                .add(page_cache_weight(&page_key, &pages).into());
485            cache.insert(page_key, pages);
486        }
487    }
488
489    /// Evicts every puffin-related cache entry for the given file.
490    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
491        if let Some(cache) = &self.bloom_filter_index_cache {
492            cache.invalidate_file(file_id.file_id());
493        }
494
495        if let Some(cache) = &self.inverted_index_cache {
496            cache.invalidate_file(file_id.file_id());
497        }
498
499        if let Some(cache) = &self.index_result_cache {
500            cache.invalidate_file(file_id.file_id());
501        }
502
503        #[cfg(feature = "vector_index")]
504        if let Some(cache) = &self.vector_index_cache {
505            cache.invalidate_file(file_id.file_id());
506        }
507
508        if let Some(cache) = &self.puffin_metadata_cache {
509            cache.remove(&file_id.to_string());
510        }
511
512        if let Some(write_cache) = &self.write_cache {
513            write_cache
514                .remove(IndexKey::new(
515                    file_id.region_id(),
516                    file_id.file_id(),
517                    FileType::Puffin(file_id.version),
518                ))
519                .await;
520        }
521    }
522
523    /// Gets result of for the selector.
524    pub fn get_selector_result(
525        &self,
526        selector_key: &SelectorResultKey,
527    ) -> Option<Arc<SelectorResultValue>> {
528        self.selector_result_cache
529            .as_ref()
530            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
531    }
532
533    /// Puts result of the selector into the cache.
534    pub fn put_selector_result(
535        &self,
536        selector_key: SelectorResultKey,
537        result: Arc<SelectorResultValue>,
538    ) {
539        if let Some(cache) = &self.selector_result_cache {
540            CACHE_BYTES
541                .with_label_values(&[SELECTOR_RESULT_TYPE])
542                .add(selector_result_cache_weight(&selector_key, &result).into());
543            cache.insert(selector_key, result);
544        }
545    }
546
547    /// Gets cached result for range scan.
548    #[cfg_attr(not(test), allow(dead_code))]
549    pub(crate) fn get_range_result(
550        &self,
551        key: &RangeScanCacheKey,
552    ) -> Option<Arc<RangeScanCacheValue>> {
553        self.range_result_cache
554            .as_ref()
555            .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
556    }
557
558    /// Puts range scan result into the cache.
559    #[cfg_attr(not(test), allow(dead_code))]
560    pub(crate) fn put_range_result(
561        &self,
562        key: RangeScanCacheKey,
563        result: Arc<RangeScanCacheValue>,
564    ) {
565        if let Some(cache) = &self.range_result_cache {
566            CACHE_BYTES
567                .with_label_values(&[RANGE_RESULT_TYPE])
568                .add(range_result_cache_weight(&key, &result).into());
569            cache.insert(key, result);
570        }
571    }
572
573    /// Gets the write cache.
574    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
575        self.write_cache.as_ref()
576    }
577
578    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
579        self.inverted_index_cache.as_ref()
580    }
581
582    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
583        self.bloom_filter_index_cache.as_ref()
584    }
585
586    #[cfg(feature = "vector_index")]
587    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
588        self.vector_index_cache.as_ref()
589    }
590
591    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
592        self.puffin_metadata_cache.as_ref()
593    }
594
595    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
596        self.index_result_cache.as_ref()
597    }
598}
599
600/// Increases selector cache miss metrics.
601pub fn selector_result_cache_miss() {
602    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
603}
604
605/// Increases selector cache hit metrics.
606pub fn selector_result_cache_hit() {
607    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
608}
609
610/// Builder to construct a [CacheManager].
611#[derive(Default)]
612pub struct CacheManagerBuilder {
613    sst_meta_cache_size: u64,
614    vector_cache_size: u64,
615    page_cache_size: u64,
616    index_metadata_size: u64,
617    index_content_size: u64,
618    index_content_page_size: u64,
619    index_result_cache_size: u64,
620    puffin_metadata_size: u64,
621    write_cache: Option<WriteCacheRef>,
622    selector_result_cache_size: u64,
623    range_result_cache_size: u64,
624}
625
626impl CacheManagerBuilder {
627    /// Sets meta cache size.
628    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
629        self.sst_meta_cache_size = bytes;
630        self
631    }
632
633    /// Sets vector cache size.
634    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
635        self.vector_cache_size = bytes;
636        self
637    }
638
639    /// Sets page cache size.
640    pub fn page_cache_size(mut self, bytes: u64) -> Self {
641        self.page_cache_size = bytes;
642        self
643    }
644
645    /// Sets write cache.
646    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
647        self.write_cache = cache;
648        self
649    }
650
651    /// Sets cache size for index metadata.
652    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
653        self.index_metadata_size = bytes;
654        self
655    }
656
657    /// Sets cache size for index content.
658    pub fn index_content_size(mut self, bytes: u64) -> Self {
659        self.index_content_size = bytes;
660        self
661    }
662
663    /// Sets page size for index content.
664    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
665        self.index_content_page_size = bytes;
666        self
667    }
668
669    /// Sets cache size for index result.
670    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
671        self.index_result_cache_size = bytes;
672        self
673    }
674
675    /// Sets cache size for puffin metadata.
676    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
677        self.puffin_metadata_size = bytes;
678        self
679    }
680
681    /// Sets selector result cache size.
682    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
683        self.selector_result_cache_size = bytes;
684        self
685    }
686
687    /// Sets range result cache size.
688    pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
689        self.range_result_cache_size = bytes;
690        self
691    }
692
693    /// Builds the [CacheManager].
694    pub fn build(self) -> CacheManager {
695        fn to_str(cause: RemovalCause) -> &'static str {
696            match cause {
697                RemovalCause::Expired => "expired",
698                RemovalCause::Explicit => "explicit",
699                RemovalCause::Replaced => "replaced",
700                RemovalCause::Size => "size",
701            }
702        }
703
704        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
705            Cache::builder()
706                .max_capacity(self.sst_meta_cache_size)
707                .weigher(meta_cache_weight)
708                .eviction_listener(|k, v, cause| {
709                    let size = meta_cache_weight(&k, &v);
710                    CACHE_BYTES
711                        .with_label_values(&[SST_META_TYPE])
712                        .sub(size.into());
713                    CACHE_EVICTION
714                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
715                        .inc();
716                })
717                .build()
718        });
719        let vector_cache = (self.vector_cache_size != 0).then(|| {
720            Cache::builder()
721                .max_capacity(self.vector_cache_size)
722                .weigher(vector_cache_weight)
723                .eviction_listener(|k, v, cause| {
724                    let size = vector_cache_weight(&k, &v);
725                    CACHE_BYTES
726                        .with_label_values(&[VECTOR_TYPE])
727                        .sub(size.into());
728                    CACHE_EVICTION
729                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
730                        .inc();
731                })
732                .build()
733        });
734        let page_cache = (self.page_cache_size != 0).then(|| {
735            Cache::builder()
736                .max_capacity(self.page_cache_size)
737                .weigher(page_cache_weight)
738                .eviction_listener(|k, v, cause| {
739                    let size = page_cache_weight(&k, &v);
740                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
741                    CACHE_EVICTION
742                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
743                        .inc();
744                })
745                .build()
746        });
747        let inverted_index_cache = InvertedIndexCache::new(
748            self.index_metadata_size,
749            self.index_content_size,
750            self.index_content_page_size,
751        );
752        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
753        let bloom_filter_index_cache = BloomFilterIndexCache::new(
754            self.index_metadata_size,
755            self.index_content_size,
756            self.index_content_page_size,
757        );
758        #[cfg(feature = "vector_index")]
759        let vector_index_cache = (self.index_content_size != 0)
760            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
761        let index_result_cache = (self.index_result_cache_size != 0)
762            .then(|| IndexResultCache::new(self.index_result_cache_size));
763        let puffin_metadata_cache =
764            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
765        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
766            Cache::builder()
767                .max_capacity(self.selector_result_cache_size)
768                .weigher(selector_result_cache_weight)
769                .eviction_listener(|k, v, cause| {
770                    let size = selector_result_cache_weight(&k, &v);
771                    CACHE_BYTES
772                        .with_label_values(&[SELECTOR_RESULT_TYPE])
773                        .sub(size.into());
774                    CACHE_EVICTION
775                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
776                        .inc();
777                })
778                .build()
779        });
780        let range_result_cache = (self.range_result_cache_size != 0).then(|| {
781            Cache::builder()
782                .max_capacity(self.range_result_cache_size)
783                .weigher(range_result_cache_weight)
784                .eviction_listener(|k, v, cause| {
785                    let size = range_result_cache_weight(&k, &v);
786                    CACHE_BYTES
787                        .with_label_values(&[RANGE_RESULT_TYPE])
788                        .sub(size.into());
789                    CACHE_EVICTION
790                        .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
791                        .inc();
792                })
793                .build()
794        });
795        CacheManager {
796            sst_meta_cache,
797            vector_cache,
798            page_cache,
799            write_cache: self.write_cache,
800            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
801            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
802            #[cfg(feature = "vector_index")]
803            vector_index_cache,
804            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
805            selector_result_cache,
806            range_result_cache,
807            index_result_cache,
808        }
809    }
810}
811
812fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
813    // We ignore the size of `Arc`.
814    (k.estimated_size() + parquet_meta_size(v)) as u32
815}
816
817fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
818    // We ignore the heap size of `Value`.
819    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
820}
821
822fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
823    (k.estimated_size() + v.estimated_size()) as u32
824}
825
826fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
827    (mem::size_of_val(k) + v.estimated_size()) as u32
828}
829
830fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
831    (k.estimated_size() + v.estimated_size()) as u32
832}
833
834/// Updates cache hit/miss metrics.
835fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
836    if value.is_some() {
837        CACHE_HIT.with_label_values(&[cache_type]).inc();
838    } else {
839        CACHE_MISS.with_label_values(&[cache_type]).inc();
840    }
841    value
842}
843
844/// Cache key (region id, file id) for SST meta.
845#[derive(Debug, Clone, PartialEq, Eq, Hash)]
846struct SstMetaKey(RegionId, FileId);
847
848impl SstMetaKey {
849    /// Returns memory used by the key (estimated).
850    fn estimated_size(&self) -> usize {
851        mem::size_of::<Self>()
852    }
853}
854
855/// Path to column pages in the SST file.
856#[derive(Debug, Clone, PartialEq, Eq, Hash)]
857pub struct ColumnPagePath {
858    /// Region id of the SST file to cache.
859    region_id: RegionId,
860    /// Id of the SST file to cache.
861    file_id: FileId,
862    /// Index of the row group.
863    row_group_idx: usize,
864    /// Index of the column in the row group.
865    column_idx: usize,
866}
867
868/// Cache key to pages in a row group (after projection).
869///
870/// Different projections will have different cache keys.
871/// We cache all ranges together because they may refer to the same `Bytes`.
872#[derive(Debug, Clone, PartialEq, Eq, Hash)]
873pub struct PageKey {
874    /// Id of the SST file to cache.
875    file_id: FileId,
876    /// Index of the row group.
877    row_group_idx: usize,
878    /// Byte ranges of the pages to cache.
879    ranges: Vec<Range<u64>>,
880}
881
882impl PageKey {
883    /// Creates a key for a list of pages.
884    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
885        PageKey {
886            file_id,
887            row_group_idx,
888            ranges,
889        }
890    }
891
892    /// Returns memory used by the key (estimated).
893    fn estimated_size(&self) -> usize {
894        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
895    }
896}
897
898/// Cached row group pages for a column.
899// We don't use enum here to make it easier to mock and use the struct.
900#[derive(Default)]
901pub struct PageValue {
902    /// Compressed page in the row group.
903    pub compressed: Vec<Bytes>,
904    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
905    pub page_size: u64,
906}
907
908impl PageValue {
909    /// Creates a new value from a range of compressed pages.
910    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
911        PageValue {
912            compressed: bytes,
913            page_size,
914        }
915    }
916
917    /// Returns memory used by the value (estimated).
918    fn estimated_size(&self) -> usize {
919        mem::size_of::<Self>()
920            + self.page_size as usize
921            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
922    }
923}
924
925/// Cache key for time series row selector result.
926#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
927pub struct SelectorResultKey {
928    /// Id of the SST file.
929    pub file_id: FileId,
930    /// Index of the row group.
931    pub row_group_idx: usize,
932    /// Time series row selector.
933    pub selector: TimeSeriesRowSelector,
934}
935
936/// Result stored in the selector result cache.
937pub enum SelectorResult {
938    /// Batches in the primary key format.
939    PrimaryKey(Vec<Batch>),
940    /// Record batches in the flat format.
941    Flat(Vec<RecordBatch>),
942}
943
944/// Cached result for time series row selector.
945pub struct SelectorResultValue {
946    /// Batches of rows selected by the selector.
947    pub result: SelectorResult,
948    /// Projection of rows.
949    pub projection: Vec<usize>,
950}
951
952impl SelectorResultValue {
953    /// Creates a new selector result value with primary key format.
954    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
955        SelectorResultValue {
956            result: SelectorResult::PrimaryKey(result),
957            projection,
958        }
959    }
960
961    /// Creates a new selector result value with flat format.
962    pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
963        SelectorResultValue {
964            result: SelectorResult::Flat(result),
965            projection,
966        }
967    }
968
969    /// Returns memory used by the value (estimated).
970    fn estimated_size(&self) -> usize {
971        match &self.result {
972            SelectorResult::PrimaryKey(batches) => {
973                batches.iter().map(|batch| batch.memory_size()).sum()
974            }
975            SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
976        }
977    }
978}
979
980/// Maps (region id, file id) to [ParquetMetaData].
981type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
982/// Maps [Value] to a vector that holds this value repeatedly.
983///
984/// e.g. `"hello" => ["hello", "hello", "hello"]`
985type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
986/// Maps (region, file, row group, column) to [PageValue].
987type PageCache = Cache<PageKey, Arc<PageValue>>;
988/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
989type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
990/// Maps partition-range scan key to cached flat batches.
991type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
992
993#[cfg(test)]
994mod tests {
995    use std::sync::Arc;
996
997    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
998    use datatypes::vectors::Int64Vector;
999    use puffin::file_metadata::FileMetadata;
1000    use store_api::storage::ColumnId;
1001
1002    use super::*;
1003    use crate::cache::index::bloom_filter_index::Tag;
1004    use crate::cache::index::result_cache::PredicateKey;
1005    use crate::cache::test_util::parquet_meta;
1006    use crate::read::range_cache::{
1007        RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1008    };
1009    use crate::sst::parquet::row_selection::RowGroupSelection;
1010
1011    #[tokio::test]
1012    async fn test_disable_cache() {
1013        let cache = CacheManager::default();
1014        assert!(cache.sst_meta_cache.is_none());
1015        assert!(cache.vector_cache.is_none());
1016        assert!(cache.page_cache.is_none());
1017
1018        let region_id = RegionId::new(1, 1);
1019        let file_id = RegionFileId::new(region_id, FileId::random());
1020        let metadata = parquet_meta();
1021        let mut metrics = MetadataCacheMetrics::default();
1022        cache.put_parquet_meta_data(file_id, metadata);
1023        assert!(
1024            cache
1025                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1026                .await
1027                .is_none()
1028        );
1029
1030        let value = Value::Int64(10);
1031        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1032        cache.put_repeated_vector(value.clone(), vector.clone());
1033        assert!(
1034            cache
1035                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1036                .is_none()
1037        );
1038
1039        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1040        let pages = Arc::new(PageValue::default());
1041        cache.put_pages(key.clone(), pages);
1042        assert!(cache.get_pages(&key).is_none());
1043
1044        assert!(cache.write_cache().is_none());
1045    }
1046
1047    #[tokio::test]
1048    async fn test_parquet_meta_cache() {
1049        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1050        let mut metrics = MetadataCacheMetrics::default();
1051        let region_id = RegionId::new(1, 1);
1052        let file_id = RegionFileId::new(region_id, FileId::random());
1053        assert!(
1054            cache
1055                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1056                .await
1057                .is_none()
1058        );
1059        let metadata = parquet_meta();
1060        cache.put_parquet_meta_data(file_id, metadata);
1061        assert!(
1062            cache
1063                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1064                .await
1065                .is_some()
1066        );
1067        cache.remove_parquet_meta_data(file_id);
1068        assert!(
1069            cache
1070                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1071                .await
1072                .is_none()
1073        );
1074    }
1075
1076    #[test]
1077    fn test_repeated_vector_cache() {
1078        let cache = CacheManager::builder().vector_cache_size(4096).build();
1079        let value = Value::Int64(10);
1080        assert!(
1081            cache
1082                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1083                .is_none()
1084        );
1085        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1086        cache.put_repeated_vector(value.clone(), vector.clone());
1087        let cached = cache
1088            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1089            .unwrap();
1090        assert_eq!(vector, cached);
1091    }
1092
1093    #[test]
1094    fn test_page_cache() {
1095        let cache = CacheManager::builder().page_cache_size(1000).build();
1096        let file_id = FileId::random();
1097        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1098        assert!(cache.get_pages(&key).is_none());
1099        let pages = Arc::new(PageValue::default());
1100        cache.put_pages(key.clone(), pages);
1101        assert!(cache.get_pages(&key).is_some());
1102    }
1103
1104    #[test]
1105    fn test_selector_result_cache() {
1106        let cache = CacheManager::builder()
1107            .selector_result_cache_size(1000)
1108            .build();
1109        let file_id = FileId::random();
1110        let key = SelectorResultKey {
1111            file_id,
1112            row_group_idx: 0,
1113            selector: TimeSeriesRowSelector::LastRow,
1114        };
1115        assert!(cache.get_selector_result(&key).is_none());
1116        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1117        cache.put_selector_result(key, result);
1118        assert!(cache.get_selector_result(&key).is_some());
1119    }
1120
1121    #[test]
1122    fn test_range_result_cache() {
1123        let cache = Arc::new(
1124            CacheManager::builder()
1125                .range_result_cache_size(1024 * 1024)
1126                .build(),
1127        );
1128
1129        let key = RangeScanCacheKey {
1130            region_id: RegionId::new(1, 1),
1131            row_groups: vec![(FileId::random(), 0)],
1132            scan: ScanRequestFingerprintBuilder {
1133                read_column_ids: vec![],
1134                read_column_types: vec![],
1135                filters: vec!["tag_0 = 1".to_string()],
1136                time_filters: vec![],
1137                series_row_selector: None,
1138                append_mode: false,
1139                filter_deleted: true,
1140                merge_mode: crate::region::options::MergeMode::LastRow,
1141                partition_expr_version: 0,
1142            }
1143            .build(),
1144        };
1145        let value = Arc::new(RangeScanCacheValue::new(Vec::new()));
1146
1147        assert!(cache.get_range_result(&key).is_none());
1148        cache.put_range_result(key.clone(), value.clone());
1149        assert!(cache.get_range_result(&key).is_some());
1150
1151        let enable_all = CacheStrategy::EnableAll(cache.clone());
1152        assert!(enable_all.get_range_result(&key).is_some());
1153
1154        let compaction = CacheStrategy::Compaction(cache.clone());
1155        assert!(compaction.get_range_result(&key).is_none());
1156        compaction.put_range_result(key.clone(), value.clone());
1157        assert!(cache.get_range_result(&key).is_some());
1158
1159        let disabled = CacheStrategy::Disabled;
1160        assert!(disabled.get_range_result(&key).is_none());
1161        disabled.put_range_result(key.clone(), value);
1162        assert!(cache.get_range_result(&key).is_some());
1163    }
1164
1165    #[tokio::test]
1166    async fn test_evict_puffin_cache_clears_all_entries() {
1167        use std::collections::{BTreeMap, HashMap};
1168
1169        let cache = CacheManager::builder()
1170            .index_metadata_size(128)
1171            .index_content_size(128)
1172            .index_content_page_size(64)
1173            .index_result_cache_size(128)
1174            .puffin_metadata_size(128)
1175            .build();
1176        let cache = Arc::new(cache);
1177
1178        let region_id = RegionId::new(1, 1);
1179        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1180        let column_id: ColumnId = 1;
1181
1182        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1183        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1184        let result_cache = cache.index_result_cache().unwrap();
1185        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1186
1187        let bloom_key = (
1188            index_id.file_id(),
1189            index_id.version,
1190            column_id,
1191            Tag::Skipping,
1192        );
1193        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1194        inverted_cache.put_metadata(
1195            (index_id.file_id(), index_id.version),
1196            Arc::new(InvertedIndexMetas::default()),
1197        );
1198        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1199        let selection = Arc::new(RowGroupSelection::default());
1200        result_cache.put(predicate.clone(), index_id.file_id(), selection);
1201        let file_id_str = index_id.to_string();
1202        let metadata = Arc::new(FileMetadata {
1203            blobs: Vec::new(),
1204            properties: HashMap::new(),
1205        });
1206        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1207
1208        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1209        assert!(
1210            inverted_cache
1211                .get_metadata((index_id.file_id(), index_id.version))
1212                .is_some()
1213        );
1214        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1215        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1216
1217        cache.evict_puffin_cache(index_id).await;
1218
1219        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1220        assert!(
1221            inverted_cache
1222                .get_metadata((index_id.file_id(), index_id.version))
1223                .is_none()
1224        );
1225        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1226        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1227
1228        // Refill caches and evict via CacheStrategy to ensure delegation works.
1229        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1230        inverted_cache.put_metadata(
1231            (index_id.file_id(), index_id.version),
1232            Arc::new(InvertedIndexMetas::default()),
1233        );
1234        result_cache.put(
1235            predicate.clone(),
1236            index_id.file_id(),
1237            Arc::new(RowGroupSelection::default()),
1238        );
1239        puffin_metadata_cache.put_metadata(
1240            file_id_str.clone(),
1241            Arc::new(FileMetadata {
1242                blobs: Vec::new(),
1243                properties: HashMap::new(),
1244            }),
1245        );
1246
1247        let strategy = CacheStrategy::EnableAll(cache.clone());
1248        strategy.evict_puffin_cache(index_id).await;
1249
1250        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1251        assert!(
1252            inverted_cache
1253                .get_metadata((index_id.file_id(), index_id.version))
1254                .is_none()
1255        );
1256        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1257        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1258    }
1259}