Skip to main content

mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_base::readable_size::ReadableSize;
32use common_telemetry::warn;
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::value::Value;
35use datatypes::vectors::VectorRef;
36use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
37use index::result_cache::IndexResultCache;
38use moka::notification::RemovalCause;
39use moka::sync::Cache;
40use object_store::ObjectStore;
41use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
42use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
46
47use crate::cache::cache_size::parquet_meta_size;
48use crate::cache::file_cache::{FileType, IndexKey};
49use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
50#[cfg(feature = "vector_index")]
51use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
52use crate::cache::write_cache::WriteCacheRef;
53use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
54use crate::memtable::record_batch_estimated_size;
55use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
56use crate::read::Batch;
57use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
58use crate::sst::file::{RegionFileId, RegionIndexId};
59use crate::sst::parquet::PARQUET_METADATA_KEY;
60use crate::sst::parquet::read_columns::ParquetReadColumns;
61use crate::sst::parquet::reader::MetadataCacheMetrics;
62
63/// Metrics type key for sst meta.
64const SST_META_TYPE: &str = "sst_meta";
65/// Metrics type key for vector.
66const VECTOR_TYPE: &str = "vector";
67/// Metrics type key for pages.
68const PAGE_TYPE: &str = "page";
69/// Metrics type key for files on the local store.
70const FILE_TYPE: &str = "file";
71/// Metrics type key for index files (puffin) on the local store.
72const INDEX_TYPE: &str = "index";
73/// Metrics type key for selector result cache.
74const SELECTOR_RESULT_TYPE: &str = "selector_result";
75/// Metrics type key for range scan result cache.
76const RANGE_RESULT_TYPE: &str = "range_result";
77const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
78const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
79
80#[derive(Debug)]
81pub(crate) struct RangeResultMemoryLimiter {
82    semaphore: Arc<tokio::sync::Semaphore>,
83    permit_bytes: usize,
84    total_permits: usize,
85}
86
87impl Default for RangeResultMemoryLimiter {
88    fn default() -> Self {
89        Self::new(
90            RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
91            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
92        )
93    }
94}
95
96impl RangeResultMemoryLimiter {
97    pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
98        let permit_bytes = permit_bytes.max(1);
99        let total_permits = limit_bytes
100            .div_ceil(permit_bytes)
101            .clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
102        Self {
103            semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
104            permit_bytes,
105            total_permits,
106        }
107    }
108
109    #[cfg(test)]
110    pub(crate) fn permit_bytes(&self) -> usize {
111        self.permit_bytes
112    }
113
114    #[cfg(test)]
115    pub(crate) fn available_permits(&self) -> usize {
116        self.semaphore.available_permits()
117    }
118
119    pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
120        let permits = bytes.div_ceil(self.permit_bytes).max(1);
121        if permits > self.total_permits {
122            return UnexpectedSnafu {
123                reason: format!(
124                    "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
125                    self.total_permits.saturating_mul(self.permit_bytes)
126                ),
127            }
128            .fail();
129        }
130        self.semaphore
131            .acquire_many(permits as u32)
132            .await
133            .map_err(|_| {
134                UnexpectedSnafu {
135                    reason: "range result memory limiter is unexpectedly closed",
136                }
137                .build()
138            })
139    }
140}
141
142/// Cached SST metadata combines the parquet footer with the decoded region metadata.
143///
144/// The cached parquet footer strips the `greptime:metadata` JSON payload and stores the decoded
145/// [RegionMetadata] separately so readers can skip repeated deserialization work.
146#[derive(Debug)]
147pub(crate) struct CachedSstMeta {
148    parquet_metadata: Arc<ParquetMetaData>,
149    region_metadata: RegionMetadataRef,
150    region_metadata_weight: usize,
151    page_index_policy: PageIndexPolicy,
152}
153
154impl CachedSstMeta {
155    #[cfg(test)]
156    pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
157        let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
158        Self::try_new_with_page_index_policy(file_path, parquet_metadata, None, page_index_policy)
159    }
160
161    pub(crate) fn try_new_with_region_metadata(
162        file_path: &str,
163        parquet_metadata: ParquetMetaData,
164        region_metadata: Option<RegionMetadataRef>,
165    ) -> Result<Self> {
166        let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
167        Self::try_new_with_page_index_policy(
168            file_path,
169            parquet_metadata,
170            region_metadata,
171            page_index_policy,
172        )
173    }
174
175    pub(crate) fn try_new_with_page_index_policy(
176        file_path: &str,
177        parquet_metadata: ParquetMetaData,
178        region_metadata: Option<RegionMetadataRef>,
179        page_index_policy: PageIndexPolicy,
180    ) -> Result<Self> {
181        let file_metadata = parquet_metadata.file_metadata();
182        let key_values = file_metadata
183            .key_value_metadata()
184            .context(InvalidParquetSnafu {
185                file: file_path,
186                reason: "missing key value meta",
187            })?;
188        let meta_value = key_values
189            .iter()
190            .find(|kv| kv.key == PARQUET_METADATA_KEY)
191            .with_context(|| InvalidParquetSnafu {
192                file: file_path,
193                reason: format!("key {} not found", PARQUET_METADATA_KEY),
194            })?;
195        let json = meta_value
196            .value
197            .as_ref()
198            .with_context(|| InvalidParquetSnafu {
199                file: file_path,
200                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
201            })?;
202        let region_metadata = match region_metadata {
203            Some(region_metadata) => region_metadata,
204            None => Arc::new(
205                store_api::metadata::RegionMetadata::from_json(json)
206                    .context(InvalidMetadataSnafu)?,
207            ),
208        };
209        // Keep the previous JSON-byte floor and charge the decoded structures as well.
210        let region_metadata_weight = region_metadata.estimated_size().max(json.len());
211        let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
212
213        Ok(Self {
214            parquet_metadata,
215            region_metadata,
216            region_metadata_weight,
217            page_index_policy,
218        })
219    }
220
221    pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
222        self.parquet_metadata.clone()
223    }
224
225    pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
226        self.region_metadata.clone()
227    }
228
229    fn satisfies_page_index_policy(&self, requested: PageIndexPolicy) -> bool {
230        match requested {
231            PageIndexPolicy::Skip => true,
232            PageIndexPolicy::Optional => self.page_index_policy != PageIndexPolicy::Skip,
233            PageIndexPolicy::Required => self.page_index_policy == PageIndexPolicy::Required,
234        }
235    }
236}
237
238fn infer_loaded_page_index_policy(parquet_metadata: &ParquetMetaData) -> PageIndexPolicy {
239    if parquet_metadata.column_index().is_some() || parquet_metadata.offset_index().is_some() {
240        PageIndexPolicy::Optional
241    } else {
242        PageIndexPolicy::Skip
243    }
244}
245
246fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
247    let file_metadata = parquet_metadata.file_metadata();
248    let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
249        let filtered = key_values
250            .iter()
251            .filter(|kv| kv.key != PARQUET_METADATA_KEY)
252            .cloned()
253            .collect::<Vec<_>>();
254        (!filtered.is_empty()).then_some(filtered)
255    });
256    let stripped_file_metadata = FileMetaData::new(
257        file_metadata.version(),
258        file_metadata.num_rows(),
259        file_metadata.created_by().map(ToString::to_string),
260        filtered_key_values,
261        file_metadata.schema_descr_ptr(),
262        file_metadata.column_orders().cloned(),
263    );
264
265    let mut builder = parquet_metadata.into_builder();
266    let row_groups = builder.take_row_groups();
267    let column_index = builder.take_column_index();
268    let offset_index = builder.take_offset_index();
269
270    parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
271        .set_row_groups(row_groups)
272        .set_column_index(column_index)
273        .set_offset_index(offset_index)
274        .build()
275}
276
277/// Cache strategies that may only enable a subset of caches.
278#[derive(Clone)]
279pub enum CacheStrategy {
280    /// Strategy for normal operations.
281    /// Doesn't disable any cache.
282    EnableAll(CacheManagerRef),
283    /// Strategy for compaction.
284    /// Disables some caches during compaction to avoid affecting queries.
285    /// Enables the write cache so that the compaction can read files cached
286    /// in the write cache and write the compacted files back to the write cache.
287    Compaction(CacheManagerRef),
288    /// Do not use any cache.
289    Disabled,
290}
291
292impl CacheStrategy {
293    /// Gets fused SST metadata with cache metrics tracking.
294    pub(crate) async fn get_sst_meta_data(
295        &self,
296        file_id: RegionFileId,
297        metrics: &mut MetadataCacheMetrics,
298        page_index_policy: PageIndexPolicy,
299    ) -> Option<Arc<CachedSstMeta>> {
300        match self {
301            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
302                cache_manager
303                    .get_sst_meta_data(file_id, metrics, page_index_policy)
304                    .await
305            }
306            CacheStrategy::Disabled => {
307                metrics.cache_miss += 1;
308                None
309            }
310        }
311    }
312
313    /// Calls [CacheManager::get_sst_meta_data_from_mem_cache()].
314    pub(crate) fn get_sst_meta_data_from_mem_cache(
315        &self,
316        file_id: RegionFileId,
317        page_index_policy: PageIndexPolicy,
318    ) -> Option<Arc<CachedSstMeta>> {
319        match self {
320            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
321                cache_manager.get_sst_meta_data_from_mem_cache(file_id, page_index_policy)
322            }
323            CacheStrategy::Disabled => None,
324        }
325    }
326
327    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
328    pub fn get_parquet_meta_data_from_mem_cache(
329        &self,
330        file_id: RegionFileId,
331    ) -> Option<Arc<ParquetMetaData>> {
332        self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
333            .map(|metadata| metadata.parquet_metadata())
334    }
335
336    /// Calls [CacheManager::put_sst_meta_data()].
337    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
338        match self {
339            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
340                cache_manager.put_sst_meta_data(file_id, metadata);
341            }
342            CacheStrategy::Disabled => {}
343        }
344    }
345
346    /// Calls [CacheManager::put_parquet_meta_data()].
347    pub fn put_parquet_meta_data(
348        &self,
349        file_id: RegionFileId,
350        metadata: Arc<ParquetMetaData>,
351        region_metadata: Option<RegionMetadataRef>,
352    ) {
353        match self {
354            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
355                cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
356            }
357            CacheStrategy::Disabled => {}
358        }
359    }
360
361    /// Calls [CacheManager::remove_parquet_meta_data()].
362    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
363        match self {
364            CacheStrategy::EnableAll(cache_manager) => {
365                cache_manager.remove_parquet_meta_data(file_id);
366            }
367            CacheStrategy::Compaction(cache_manager) => {
368                cache_manager.remove_parquet_meta_data(file_id);
369            }
370            CacheStrategy::Disabled => {}
371        }
372    }
373
374    /// Calls [CacheManager::get_repeated_vector()].
375    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
376    pub fn get_repeated_vector(
377        &self,
378        data_type: &ConcreteDataType,
379        value: &Value,
380    ) -> Option<VectorRef> {
381        match self {
382            CacheStrategy::EnableAll(cache_manager) => {
383                cache_manager.get_repeated_vector(data_type, value)
384            }
385            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
386        }
387    }
388
389    /// Calls [CacheManager::put_repeated_vector()].
390    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
391    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
392        if let CacheStrategy::EnableAll(cache_manager) = self {
393            cache_manager.put_repeated_vector(value, vector);
394        }
395    }
396
397    /// Calls [CacheManager::get_pages()].
398    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
399    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
400        match self {
401            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
402            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
403        }
404    }
405
406    /// Calls [CacheManager::put_pages()].
407    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
408    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
409        if let CacheStrategy::EnableAll(cache_manager) = self {
410            cache_manager.put_pages(page_key, pages);
411        }
412    }
413
414    /// Calls [CacheManager::evict_puffin_cache()].
415    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
416        match self {
417            CacheStrategy::EnableAll(cache_manager) => {
418                cache_manager.evict_puffin_cache(file_id).await
419            }
420            CacheStrategy::Compaction(cache_manager) => {
421                cache_manager.evict_puffin_cache(file_id).await
422            }
423            CacheStrategy::Disabled => {}
424        }
425    }
426
427    /// Calls [CacheManager::get_selector_result()].
428    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
429    pub fn get_selector_result(
430        &self,
431        selector_key: &SelectorResultKey,
432    ) -> Option<Arc<SelectorResultValue>> {
433        match self {
434            CacheStrategy::EnableAll(cache_manager) => {
435                cache_manager.get_selector_result(selector_key)
436            }
437            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
438        }
439    }
440
441    /// Calls [CacheManager::put_selector_result()].
442    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
443    pub fn put_selector_result(
444        &self,
445        selector_key: SelectorResultKey,
446        result: Arc<SelectorResultValue>,
447    ) {
448        if let CacheStrategy::EnableAll(cache_manager) = self {
449            cache_manager.put_selector_result(selector_key, result);
450        }
451    }
452
453    /// Calls [CacheManager::get_range_result()].
454    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
455    #[allow(dead_code)]
456    pub(crate) fn get_range_result(
457        &self,
458        key: &RangeScanCacheKey,
459    ) -> Option<Arc<RangeScanCacheValue>> {
460        match self {
461            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
462            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
463        }
464    }
465
466    /// Calls [CacheManager::put_range_result()].
467    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
468    pub(crate) fn put_range_result(
469        &self,
470        key: RangeScanCacheKey,
471        result: Arc<RangeScanCacheValue>,
472    ) {
473        if let CacheStrategy::EnableAll(cache_manager) = self {
474            cache_manager.put_range_result(key, result);
475        }
476    }
477
478    /// Returns true if the range result cache is enabled.
479    pub(crate) fn has_range_result_cache(&self) -> bool {
480        match self {
481            CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
482            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
483        }
484    }
485
486    pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
487        match self {
488            CacheStrategy::EnableAll(cache_manager) => {
489                Some(cache_manager.range_result_memory_limiter())
490            }
491            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
492        }
493    }
494
495    pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
496        match self {
497            CacheStrategy::EnableAll(cache_manager) => {
498                Some(cache_manager.range_result_cache_size())
499            }
500            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
501        }
502    }
503
504    /// Calls [CacheManager::write_cache()].
505    /// It returns None if the strategy is [CacheStrategy::Disabled].
506    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
507        match self {
508            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
509            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
510            CacheStrategy::Disabled => None,
511        }
512    }
513
514    /// Calls [CacheManager::index_cache()].
515    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
516    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
517        match self {
518            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
519            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
520        }
521    }
522
523    /// Calls [CacheManager::bloom_filter_index_cache()].
524    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
525    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
526        match self {
527            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
528            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
529        }
530    }
531
532    /// Calls [CacheManager::vector_index_cache()].
533    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
534    #[cfg(feature = "vector_index")]
535    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
536        match self {
537            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
538            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
539        }
540    }
541
542    /// Calls [CacheManager::puffin_metadata_cache()].
543    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
544    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
545        match self {
546            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
547            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
548        }
549    }
550
551    /// Calls [CacheManager::index_result_cache()].
552    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
553    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
554        match self {
555            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
556            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
557        }
558    }
559
560    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
561    pub fn maybe_download_background(
562        &self,
563        index_key: IndexKey,
564        remote_path: String,
565        remote_store: ObjectStore,
566        file_size: u64,
567    ) {
568        if let CacheStrategy::EnableAll(cache_manager) = self
569            && let Some(write_cache) = cache_manager.write_cache()
570        {
571            write_cache.file_cache().maybe_download_background(
572                index_key,
573                remote_path,
574                remote_store,
575                file_size,
576            );
577        }
578    }
579}
580
581/// Manages cached data for the engine.
582///
583/// All caches are disabled by default.
584#[derive(Default)]
585pub struct CacheManager {
586    /// Cache for SST metadata.
587    sst_meta_cache: Option<SstMetaCache>,
588    /// Cache for vectors.
589    vector_cache: Option<VectorCache>,
590    /// Cache for SST pages.
591    page_cache: Option<PageCache>,
592    /// A Cache for writing files to object stores.
593    write_cache: Option<WriteCacheRef>,
594    /// Cache for inverted index.
595    inverted_index_cache: Option<InvertedIndexCacheRef>,
596    /// Cache for bloom filter index.
597    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
598    /// Cache for vector index.
599    #[cfg(feature = "vector_index")]
600    vector_index_cache: Option<VectorIndexCacheRef>,
601    /// Puffin metadata cache.
602    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
603    /// Cache for time series selectors.
604    selector_result_cache: Option<SelectorResultCache>,
605    /// Cache for range scan outputs in flat format.
606    range_result_cache: Option<RangeResultCache>,
607    /// Configured capacity for range scan outputs in flat format.
608    range_result_cache_size: u64,
609    /// Shared memory limiter for async range-result cache tasks.
610    range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
611    /// Cache for index result.
612    index_result_cache: Option<IndexResultCache>,
613}
614
615pub type CacheManagerRef = Arc<CacheManager>;
616
617impl CacheManager {
618    /// Returns a builder to build the cache.
619    pub fn builder() -> CacheManagerBuilder {
620        CacheManagerBuilder::default()
621    }
622
623    /// Gets fused SST metadata with metrics tracking.
624    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
625    pub(crate) async fn get_sst_meta_data(
626        &self,
627        file_id: RegionFileId,
628        metrics: &mut MetadataCacheMetrics,
629        page_index_policy: PageIndexPolicy,
630    ) -> Option<Arc<CachedSstMeta>> {
631        if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) {
632            metrics.mem_cache_hit += 1;
633            return Some(metadata);
634        }
635
636        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
637        if let Some(write_cache) = &self.write_cache
638            && let Some(metadata) = write_cache
639                .file_cache()
640                .get_sst_meta_data(key, metrics, page_index_policy)
641                .await
642        {
643            metrics.file_cache_hit += 1;
644            self.put_sst_meta_data(file_id, metadata.clone());
645            return Some(metadata);
646        }
647
648        metrics.cache_miss += 1;
649        None
650    }
651
652    /// Gets cached [ParquetMetaData] with metrics tracking.
653    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
654    pub(crate) async fn get_parquet_meta_data(
655        &self,
656        file_id: RegionFileId,
657        metrics: &mut MetadataCacheMetrics,
658        page_index_policy: PageIndexPolicy,
659    ) -> Option<Arc<ParquetMetaData>> {
660        self.get_sst_meta_data(file_id, metrics, page_index_policy)
661            .await
662            .map(|metadata| metadata.parquet_metadata())
663    }
664
665    /// Gets cached fused SST metadata from in-memory cache.
666    /// This method does not perform I/O.
667    pub(crate) fn get_sst_meta_data_from_mem_cache(
668        &self,
669        file_id: RegionFileId,
670        page_index_policy: PageIndexPolicy,
671    ) -> Option<Arc<CachedSstMeta>> {
672        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
673            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
674            let value =
675                value.filter(|metadata| metadata.satisfies_page_index_policy(page_index_policy));
676            update_hit_miss(value, SST_META_TYPE)
677        })
678    }
679
680    /// Gets cached [ParquetMetaData] from in-memory cache.
681    /// This method does not perform I/O.
682    pub fn get_parquet_meta_data_from_mem_cache(
683        &self,
684        file_id: RegionFileId,
685    ) -> Option<Arc<ParquetMetaData>> {
686        self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
687            .map(|metadata| metadata.parquet_metadata())
688    }
689
690    /// Puts fused SST metadata into the cache.
691    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
692        if let Some(cache) = &self.sst_meta_cache {
693            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
694            CACHE_BYTES
695                .with_label_values(&[SST_META_TYPE])
696                .add(meta_cache_weight(&key, &metadata).into());
697            cache.insert(key, metadata);
698        }
699    }
700
701    /// Puts [ParquetMetaData] into the cache.
702    pub fn put_parquet_meta_data(
703        &self,
704        file_id: RegionFileId,
705        metadata: Arc<ParquetMetaData>,
706        region_metadata: Option<RegionMetadataRef>,
707    ) {
708        if self.sst_meta_cache.is_some() {
709            let file_path = format!(
710                "region_id={}, file_id={}",
711                file_id.region_id(),
712                file_id.file_id()
713            );
714            match CachedSstMeta::try_new_with_region_metadata(
715                &file_path,
716                Arc::unwrap_or_clone(metadata),
717                region_metadata,
718            ) {
719                Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
720                Err(err) => warn!(
721                    err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
722                    file_id.region_id(),
723                    file_id.file_id()
724                ),
725            }
726        }
727    }
728
729    /// Removes [ParquetMetaData] from the cache.
730    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
731        if let Some(cache) = &self.sst_meta_cache {
732            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
733        }
734    }
735
736    /// Returns the total weighted size of the in-memory SST meta cache.
737    pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
738        self.sst_meta_cache
739            .as_ref()
740            .map(|cache| cache.weighted_size())
741            .unwrap_or(0)
742    }
743
744    /// Returns true if the in-memory SST meta cache is enabled.
745    pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
746        self.sst_meta_cache.is_some()
747    }
748
749    /// Gets a vector with repeated value for specific `key`.
750    pub fn get_repeated_vector(
751        &self,
752        data_type: &ConcreteDataType,
753        value: &Value,
754    ) -> Option<VectorRef> {
755        self.vector_cache.as_ref().and_then(|vector_cache| {
756            let value = vector_cache.get(&(data_type.clone(), value.clone()));
757            update_hit_miss(value, VECTOR_TYPE)
758        })
759    }
760
761    /// Puts a vector with repeated value into the cache.
762    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
763        if let Some(cache) = &self.vector_cache {
764            let key = (vector.data_type(), value);
765            CACHE_BYTES
766                .with_label_values(&[VECTOR_TYPE])
767                .add(vector_cache_weight(&key, &vector).into());
768            cache.insert(key, vector);
769        }
770    }
771
772    /// Gets pages for the row group.
773    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
774        self.page_cache.as_ref().and_then(|page_cache| {
775            let value = page_cache.get(page_key);
776            update_hit_miss(value, PAGE_TYPE)
777        })
778    }
779
780    /// Puts pages of the row group into the cache.
781    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
782        if let Some(cache) = &self.page_cache {
783            CACHE_BYTES
784                .with_label_values(&[PAGE_TYPE])
785                .add(page_cache_weight(&page_key, &pages).into());
786            cache.insert(page_key, pages);
787        }
788    }
789
790    /// Evicts every puffin-related cache entry for the given file.
791    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
792        if let Some(cache) = &self.bloom_filter_index_cache {
793            cache.invalidate_file(file_id.file_id());
794        }
795
796        if let Some(cache) = &self.inverted_index_cache {
797            cache.invalidate_file(file_id.file_id());
798        }
799
800        if let Some(cache) = &self.index_result_cache {
801            cache.invalidate_file(file_id.file_id());
802        }
803
804        #[cfg(feature = "vector_index")]
805        if let Some(cache) = &self.vector_index_cache {
806            cache.invalidate_file(file_id.file_id());
807        }
808
809        if let Some(cache) = &self.puffin_metadata_cache {
810            cache.remove(&file_id.to_string());
811        }
812
813        if let Some(write_cache) = &self.write_cache {
814            write_cache
815                .remove(IndexKey::new(
816                    file_id.region_id(),
817                    file_id.file_id(),
818                    FileType::Puffin(file_id.version),
819                ))
820                .await;
821        }
822    }
823
824    /// Gets result of for the selector.
825    pub fn get_selector_result(
826        &self,
827        selector_key: &SelectorResultKey,
828    ) -> Option<Arc<SelectorResultValue>> {
829        self.selector_result_cache
830            .as_ref()
831            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
832    }
833
834    /// Puts result of the selector into the cache.
835    pub fn put_selector_result(
836        &self,
837        selector_key: SelectorResultKey,
838        result: Arc<SelectorResultValue>,
839    ) {
840        if let Some(cache) = &self.selector_result_cache {
841            CACHE_BYTES
842                .with_label_values(&[SELECTOR_RESULT_TYPE])
843                .add(selector_result_cache_weight(&selector_key, &result).into());
844            cache.insert(selector_key, result);
845        }
846    }
847
848    /// Gets cached result for range scan.
849    #[allow(dead_code)]
850    pub(crate) fn get_range_result(
851        &self,
852        key: &RangeScanCacheKey,
853    ) -> Option<Arc<RangeScanCacheValue>> {
854        self.range_result_cache
855            .as_ref()
856            .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
857    }
858
859    /// Puts range scan result into cache.
860    pub(crate) fn put_range_result(
861        &self,
862        key: RangeScanCacheKey,
863        result: Arc<RangeScanCacheValue>,
864    ) {
865        if let Some(cache) = &self.range_result_cache {
866            CACHE_BYTES
867                .with_label_values(&[RANGE_RESULT_TYPE])
868                .add(range_result_cache_weight(&key, &result).into());
869            cache.insert(key, result);
870        }
871    }
872
873    /// Returns true if the range result cache is enabled.
874    pub(crate) fn has_range_result_cache(&self) -> bool {
875        self.range_result_cache.is_some()
876    }
877
878    pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
879        &self.range_result_memory_limiter
880    }
881
882    pub(crate) fn range_result_cache_size(&self) -> usize {
883        self.range_result_cache_size as usize
884    }
885
886    /// Gets the write cache.
887    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
888        self.write_cache.as_ref()
889    }
890
891    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
892        self.inverted_index_cache.as_ref()
893    }
894
895    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
896        self.bloom_filter_index_cache.as_ref()
897    }
898
899    #[cfg(feature = "vector_index")]
900    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
901        self.vector_index_cache.as_ref()
902    }
903
904    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
905        self.puffin_metadata_cache.as_ref()
906    }
907
908    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
909        self.index_result_cache.as_ref()
910    }
911}
912
913/// Increases selector cache miss metrics.
914pub fn selector_result_cache_miss() {
915    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
916}
917
918/// Increases selector cache hit metrics.
919pub fn selector_result_cache_hit() {
920    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
921}
922
923/// Builder to construct a [CacheManager].
924#[derive(Default)]
925pub struct CacheManagerBuilder {
926    sst_meta_cache_size: u64,
927    vector_cache_size: u64,
928    page_cache_size: u64,
929    index_metadata_size: u64,
930    index_content_size: u64,
931    index_content_page_size: u64,
932    index_result_cache_size: u64,
933    puffin_metadata_size: u64,
934    write_cache: Option<WriteCacheRef>,
935    selector_result_cache_size: u64,
936    range_result_cache_size: u64,
937}
938
939impl CacheManagerBuilder {
940    /// Sets meta cache size.
941    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
942        self.sst_meta_cache_size = bytes;
943        self
944    }
945
946    /// Sets vector cache size.
947    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
948        self.vector_cache_size = bytes;
949        self
950    }
951
952    /// Sets page cache size.
953    pub fn page_cache_size(mut self, bytes: u64) -> Self {
954        self.page_cache_size = bytes;
955        self
956    }
957
958    /// Sets write cache.
959    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
960        self.write_cache = cache;
961        self
962    }
963
964    /// Sets cache size for index metadata.
965    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
966        self.index_metadata_size = bytes;
967        self
968    }
969
970    /// Sets cache size for index content.
971    pub fn index_content_size(mut self, bytes: u64) -> Self {
972        self.index_content_size = bytes;
973        self
974    }
975
976    /// Sets page size for index content.
977    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
978        self.index_content_page_size = bytes;
979        self
980    }
981
982    /// Sets cache size for index result.
983    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
984        self.index_result_cache_size = bytes;
985        self
986    }
987
988    /// Sets cache size for puffin metadata.
989    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
990        self.puffin_metadata_size = bytes;
991        self
992    }
993
994    /// Sets selector result cache size.
995    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
996        self.selector_result_cache_size = bytes;
997        self
998    }
999
1000    /// Sets range result cache size.
1001    pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
1002        self.range_result_cache_size = bytes;
1003        self
1004    }
1005
1006    /// Builds the [CacheManager].
1007    pub fn build(self) -> CacheManager {
1008        fn to_str(cause: RemovalCause) -> &'static str {
1009            match cause {
1010                RemovalCause::Expired => "expired",
1011                RemovalCause::Explicit => "explicit",
1012                RemovalCause::Replaced => "replaced",
1013                RemovalCause::Size => "size",
1014            }
1015        }
1016
1017        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
1018            Cache::builder()
1019                .max_capacity(self.sst_meta_cache_size)
1020                .weigher(meta_cache_weight)
1021                .eviction_listener(|k, v, cause| {
1022                    let size = meta_cache_weight(&k, &v);
1023                    CACHE_BYTES
1024                        .with_label_values(&[SST_META_TYPE])
1025                        .sub(size.into());
1026                    CACHE_EVICTION
1027                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
1028                        .inc();
1029                })
1030                .build()
1031        });
1032        let vector_cache = (self.vector_cache_size != 0).then(|| {
1033            Cache::builder()
1034                .max_capacity(self.vector_cache_size)
1035                .weigher(vector_cache_weight)
1036                .eviction_listener(|k, v, cause| {
1037                    let size = vector_cache_weight(&k, &v);
1038                    CACHE_BYTES
1039                        .with_label_values(&[VECTOR_TYPE])
1040                        .sub(size.into());
1041                    CACHE_EVICTION
1042                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
1043                        .inc();
1044                })
1045                .build()
1046        });
1047        let page_cache = (self.page_cache_size != 0).then(|| {
1048            Cache::builder()
1049                .max_capacity(self.page_cache_size)
1050                .weigher(page_cache_weight)
1051                .eviction_listener(|k, v, cause| {
1052                    let size = page_cache_weight(&k, &v);
1053                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
1054                    CACHE_EVICTION
1055                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
1056                        .inc();
1057                })
1058                .build()
1059        });
1060        let inverted_index_cache = InvertedIndexCache::new(
1061            self.index_metadata_size,
1062            self.index_content_size,
1063            self.index_content_page_size,
1064        );
1065        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
1066        let bloom_filter_index_cache = BloomFilterIndexCache::new(
1067            self.index_metadata_size,
1068            self.index_content_size,
1069            self.index_content_page_size,
1070        );
1071        #[cfg(feature = "vector_index")]
1072        let vector_index_cache = (self.index_content_size != 0)
1073            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
1074        let index_result_cache = (self.index_result_cache_size != 0)
1075            .then(|| IndexResultCache::new(self.index_result_cache_size));
1076        let puffin_metadata_cache =
1077            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
1078        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1079            Cache::builder()
1080                .max_capacity(self.selector_result_cache_size)
1081                .weigher(selector_result_cache_weight)
1082                .eviction_listener(|k, v, cause| {
1083                    let size = selector_result_cache_weight(&k, &v);
1084                    CACHE_BYTES
1085                        .with_label_values(&[SELECTOR_RESULT_TYPE])
1086                        .sub(size.into());
1087                    CACHE_EVICTION
1088                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
1089                        .inc();
1090                })
1091                .build()
1092        });
1093        let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1094            Cache::builder()
1095                .max_capacity(self.range_result_cache_size)
1096                .weigher(range_result_cache_weight)
1097                .eviction_listener(move |k, v, cause| {
1098                    let size = range_result_cache_weight(&k, &v);
1099                    CACHE_BYTES
1100                        .with_label_values(&[RANGE_RESULT_TYPE])
1101                        .sub(size.into());
1102                    CACHE_EVICTION
1103                        .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
1104                        .inc();
1105                })
1106                .build()
1107        });
1108        CacheManager {
1109            sst_meta_cache,
1110            vector_cache,
1111            page_cache,
1112            write_cache: self.write_cache,
1113            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1114            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1115            #[cfg(feature = "vector_index")]
1116            vector_index_cache,
1117            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1118            selector_result_cache,
1119            range_result_cache,
1120            range_result_cache_size: self.range_result_cache_size,
1121            range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
1122                self.range_result_cache_size as usize,
1123                RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
1124            )),
1125            index_result_cache,
1126        }
1127    }
1128}
1129
1130fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1131    // We ignore the size of `Arc`.
1132    (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
1133}
1134
1135fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1136    // We ignore the heap size of `Value`.
1137    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1138}
1139
1140fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
1141    (k.estimated_size() + v.estimated_size()) as u32
1142}
1143
1144fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1145    (mem::size_of_val(k) + v.estimated_size()) as u32
1146}
1147
1148fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1149    (k.estimated_size() + v.estimated_size()) as u32
1150}
1151
1152/// Updates cache hit/miss metrics.
1153fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1154    if value.is_some() {
1155        CACHE_HIT.with_label_values(&[cache_type]).inc();
1156    } else {
1157        CACHE_MISS.with_label_values(&[cache_type]).inc();
1158    }
1159    value
1160}
1161
1162/// Cache key (region id, file id) for SST meta.
1163#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1164struct SstMetaKey(RegionId, FileId);
1165
1166impl SstMetaKey {
1167    /// Returns memory used by the key (estimated).
1168    fn estimated_size(&self) -> usize {
1169        mem::size_of::<Self>()
1170    }
1171}
1172
1173/// Path to column pages in the SST file.
1174#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1175pub struct ColumnPagePath {
1176    /// Region id of the SST file to cache.
1177    region_id: RegionId,
1178    /// Id of the SST file to cache.
1179    file_id: FileId,
1180    /// Index of the row group.
1181    row_group_idx: usize,
1182    /// Index of the column in the row group.
1183    column_idx: usize,
1184}
1185
1186/// Cache key to pages in a row group (after projection).
1187///
1188/// Different projections will have different cache keys.
1189/// We cache all ranges together because they may refer to the same `Bytes`.
1190#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1191pub struct PageKey {
1192    /// Id of the SST file to cache.
1193    file_id: FileId,
1194    /// Index of the row group.
1195    row_group_idx: usize,
1196    /// Byte ranges of the pages to cache.
1197    ranges: Vec<Range<u64>>,
1198}
1199
1200impl PageKey {
1201    /// Creates a key for a list of pages.
1202    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1203        PageKey {
1204            file_id,
1205            row_group_idx,
1206            ranges,
1207        }
1208    }
1209
1210    /// Returns memory used by the key (estimated).
1211    fn estimated_size(&self) -> usize {
1212        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1213    }
1214}
1215
1216/// Cached row group pages for a column.
1217// We don't use enum here to make it easier to mock and use the struct.
1218#[derive(Default)]
1219pub struct PageValue {
1220    /// Compressed page in the row group.
1221    pub compressed: Vec<Bytes>,
1222    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
1223    pub page_size: u64,
1224}
1225
1226impl PageValue {
1227    /// Creates a new value from a range of compressed pages.
1228    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1229        PageValue {
1230            compressed: bytes,
1231            page_size,
1232        }
1233    }
1234
1235    /// Returns memory used by the value (estimated).
1236    fn estimated_size(&self) -> usize {
1237        mem::size_of::<Self>()
1238            + self.page_size as usize
1239            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1240    }
1241}
1242
1243/// Cache key for time series row selector result.
1244#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1245pub struct SelectorResultKey {
1246    /// Id of the SST file.
1247    pub file_id: FileId,
1248    /// Index of the row group.
1249    pub row_group_idx: usize,
1250    /// Time series row selector.
1251    pub selector: TimeSeriesRowSelector,
1252}
1253
1254/// Result stored in the selector result cache.
1255pub enum SelectorResult {
1256    /// Batches in the primary key format.
1257    PrimaryKey(Vec<Batch>),
1258    /// Record batches in the flat format.
1259    Flat(Vec<RecordBatch>),
1260}
1261
1262/// Cached result for time series row selector.
1263pub struct SelectorResultValue {
1264    /// Batches of rows selected by the selector.
1265    pub result: SelectorResult,
1266    /// The read columns of rows.
1267    pub read_cols: ParquetReadColumns,
1268}
1269
1270impl SelectorResultValue {
1271    /// Creates a new selector result value with primary key format.
1272    pub fn new(result: Vec<Batch>, read_cols: ParquetReadColumns) -> SelectorResultValue {
1273        SelectorResultValue {
1274            result: SelectorResult::PrimaryKey(result),
1275            read_cols,
1276        }
1277    }
1278
1279    /// Creates a new selector result value with flat format.
1280    pub fn new_flat(
1281        result: Vec<RecordBatch>,
1282        read_cols: ParquetReadColumns,
1283    ) -> SelectorResultValue {
1284        SelectorResultValue {
1285            result: SelectorResult::Flat(result),
1286            read_cols,
1287        }
1288    }
1289
1290    /// Returns memory used by the value (estimated).
1291    fn estimated_size(&self) -> usize {
1292        match &self.result {
1293            SelectorResult::PrimaryKey(batches) => {
1294                batches.iter().map(|batch| batch.memory_size()).sum()
1295            }
1296            SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1297        }
1298    }
1299}
1300
1301/// Maps (region id, file id) to fused SST metadata.
1302type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1303/// Maps [Value] to a vector that holds this value repeatedly.
1304///
1305/// e.g. `"hello" => ["hello", "hello", "hello"]`
1306type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1307/// Maps (region, file, row group, column) to [PageValue].
1308type PageCache = Cache<PageKey, Arc<PageValue>>;
1309/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
1310type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1311/// Maps partition-range scan key to cached flat batches.
1312type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1313
1314#[cfg(test)]
1315mod tests {
1316    use std::sync::Arc;
1317
1318    use api::v1::SemanticType;
1319    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1320    use datatypes::schema::ColumnSchema;
1321    use datatypes::vectors::Int64Vector;
1322    use puffin::file_metadata::FileMetadata;
1323    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1324    use store_api::storage::ColumnId;
1325
1326    use super::*;
1327    use crate::cache::index::bloom_filter_index::Tag;
1328    use crate::cache::index::result_cache::PredicateKey;
1329    use crate::cache::test_util::{
1330        parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1331    };
1332    use crate::read::range_cache::{
1333        RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1334    };
1335    use crate::read::read_columns::ReadColumns;
1336    use crate::sst::parquet::row_selection::RowGroupSelection;
1337
1338    #[tokio::test]
1339    async fn test_disable_cache() {
1340        let cache = CacheManager::default();
1341        assert!(cache.sst_meta_cache.is_none());
1342        assert!(cache.vector_cache.is_none());
1343        assert!(cache.page_cache.is_none());
1344
1345        let region_id = RegionId::new(1, 1);
1346        let file_id = RegionFileId::new(region_id, FileId::random());
1347        let metadata = parquet_meta();
1348        let mut metrics = MetadataCacheMetrics::default();
1349        cache.put_parquet_meta_data(file_id, metadata, None);
1350        assert!(
1351            cache
1352                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1353                .await
1354                .is_none()
1355        );
1356
1357        let value = Value::Int64(10);
1358        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1359        cache.put_repeated_vector(value.clone(), vector.clone());
1360        assert!(
1361            cache
1362                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1363                .is_none()
1364        );
1365
1366        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1367        let pages = Arc::new(PageValue::default());
1368        cache.put_pages(key.clone(), pages);
1369        assert!(cache.get_pages(&key).is_none());
1370
1371        assert!(cache.write_cache().is_none());
1372    }
1373
1374    #[tokio::test]
1375    async fn test_parquet_meta_cache() {
1376        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1377        let mut metrics = MetadataCacheMetrics::default();
1378        let region_id = RegionId::new(1, 1);
1379        let file_id = RegionFileId::new(region_id, FileId::random());
1380        assert!(
1381            cache
1382                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1383                .await
1384                .is_none()
1385        );
1386        let (metadata, region_metadata) = sst_parquet_meta();
1387        cache.put_parquet_meta_data(file_id, metadata, None);
1388        let cached = cache
1389            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1390            .await
1391            .unwrap();
1392        assert_eq!(region_metadata, cached.region_metadata());
1393        assert!(
1394            cached
1395                .parquet_metadata()
1396                .file_metadata()
1397                .key_value_metadata()
1398                .is_none_or(|key_values| {
1399                    key_values
1400                        .iter()
1401                        .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1402                })
1403        );
1404        cache.remove_parquet_meta_data(file_id);
1405        assert!(
1406            cache
1407                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1408                .await
1409                .is_none()
1410        );
1411    }
1412
1413    #[tokio::test]
1414    async fn test_parquet_meta_cache_with_provided_region_metadata() {
1415        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1416        let mut metrics = MetadataCacheMetrics::default();
1417        let region_id = RegionId::new(1, 1);
1418        let file_id = RegionFileId::new(region_id, FileId::random());
1419        let (metadata, region_metadata) = sst_parquet_meta();
1420
1421        cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1422
1423        let cached = cache
1424            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1425            .await
1426            .unwrap();
1427        assert!(Arc::ptr_eq(&region_metadata, &cached.region_metadata()));
1428    }
1429
1430    #[tokio::test]
1431    async fn test_parquet_meta_cache_respects_page_index_policy() {
1432        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1433        let region_id = RegionId::new(1, 1);
1434        let file_id = RegionFileId::new(region_id, FileId::random());
1435        let (metadata, _) = sst_parquet_meta();
1436
1437        let skip_metadata = Arc::new(
1438            CachedSstMeta::try_new_with_page_index_policy(
1439                "test.parquet",
1440                Arc::unwrap_or_clone(metadata.clone()),
1441                None,
1442                PageIndexPolicy::Skip,
1443            )
1444            .unwrap(),
1445        );
1446        cache.put_sst_meta_data(file_id, skip_metadata);
1447
1448        let mut metrics = MetadataCacheMetrics::default();
1449        assert!(
1450            cache
1451                .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
1452                .await
1453                .is_none()
1454        );
1455        assert_eq!(1, metrics.cache_miss);
1456
1457        let optional_metadata = Arc::new(
1458            CachedSstMeta::try_new_with_page_index_policy(
1459                "test.parquet",
1460                Arc::unwrap_or_clone(metadata),
1461                None,
1462                PageIndexPolicy::Optional,
1463            )
1464            .unwrap(),
1465        );
1466        cache.put_sst_meta_data(file_id, optional_metadata);
1467
1468        let mut metrics = MetadataCacheMetrics::default();
1469        assert!(
1470            cache
1471                .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
1472                .await
1473                .is_some()
1474        );
1475        assert_eq!(1, metrics.mem_cache_hit);
1476
1477        let mut metrics = MetadataCacheMetrics::default();
1478        assert!(
1479            cache
1480                .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Skip)
1481                .await
1482                .is_some()
1483        );
1484        assert_eq!(1, metrics.mem_cache_hit);
1485    }
1486
1487    #[test]
1488    fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1489        let region_metadata = Arc::new(wide_region_metadata(128));
1490        let json_len = region_metadata.to_json().unwrap().len();
1491        let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1492        let cached = Arc::new(
1493            CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1494        );
1495        let key = SstMetaKey(region_metadata.region_id, FileId::random());
1496
1497        assert!(cached.region_metadata_weight > json_len);
1498        assert_eq!(
1499            meta_cache_weight(&key, &cached) as usize,
1500            key.estimated_size()
1501                + parquet_meta_size(&cached.parquet_metadata)
1502                + cached.region_metadata_weight
1503        );
1504    }
1505
1506    #[test]
1507    fn test_repeated_vector_cache() {
1508        let cache = CacheManager::builder().vector_cache_size(4096).build();
1509        let value = Value::Int64(10);
1510        assert!(
1511            cache
1512                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1513                .is_none()
1514        );
1515        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1516        cache.put_repeated_vector(value.clone(), vector.clone());
1517        let cached = cache
1518            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1519            .unwrap();
1520        assert_eq!(vector, cached);
1521    }
1522
1523    #[test]
1524    fn test_page_cache() {
1525        let cache = CacheManager::builder().page_cache_size(1000).build();
1526        let file_id = FileId::random();
1527        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1528        assert!(cache.get_pages(&key).is_none());
1529        let pages = Arc::new(PageValue::default());
1530        cache.put_pages(key.clone(), pages);
1531        assert!(cache.get_pages(&key).is_some());
1532    }
1533
1534    #[test]
1535    fn test_selector_result_cache() {
1536        let cache = CacheManager::builder()
1537            .selector_result_cache_size(1000)
1538            .build();
1539        let file_id = FileId::random();
1540        let key = SelectorResultKey {
1541            file_id,
1542            row_group_idx: 0,
1543            selector: TimeSeriesRowSelector::LastRow,
1544        };
1545        assert!(cache.get_selector_result(&key).is_none());
1546        let result = Arc::new(SelectorResultValue::new(
1547            Vec::new(),
1548            ParquetReadColumns::from_deduped(Vec::new()),
1549        ));
1550        cache.put_selector_result(key, result);
1551        assert!(cache.get_selector_result(&key).is_some());
1552    }
1553
1554    #[test]
1555    fn test_range_result_cache() {
1556        let cache = Arc::new(
1557            CacheManager::builder()
1558                .range_result_cache_size(1024 * 1024)
1559                .build(),
1560        );
1561
1562        let key = RangeScanCacheKey {
1563            region_id: RegionId::new(1, 1),
1564            row_groups: vec![(FileId::random(), 0)],
1565            scan: ScanRequestFingerprintBuilder {
1566                read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
1567                read_column_types: vec![],
1568                filters: vec!["tag_0 = 1".to_string()],
1569                time_filters: vec![],
1570                series_row_selector: None,
1571                append_mode: false,
1572                filter_deleted: true,
1573                merge_mode: crate::region::options::MergeMode::LastRow,
1574                partition_expr_version: 0,
1575            }
1576            .build(),
1577        };
1578        let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1579
1580        assert!(cache.get_range_result(&key).is_none());
1581        cache.put_range_result(key.clone(), value.clone());
1582        assert!(cache.get_range_result(&key).is_some());
1583
1584        let enable_all = CacheStrategy::EnableAll(cache.clone());
1585        assert!(enable_all.get_range_result(&key).is_some());
1586
1587        let compaction = CacheStrategy::Compaction(cache.clone());
1588        assert!(compaction.get_range_result(&key).is_none());
1589        compaction.put_range_result(key.clone(), value.clone());
1590        assert!(cache.get_range_result(&key).is_some());
1591
1592        let disabled = CacheStrategy::Disabled;
1593        assert!(disabled.get_range_result(&key).is_none());
1594        disabled.put_range_result(key.clone(), value);
1595        assert!(cache.get_range_result(&key).is_some());
1596    }
1597
1598    #[test]
1599    fn test_range_result_cache_size_configures_limiter() {
1600        let cache_size = 3 * 1024_u64;
1601        let cache = CacheManager::builder()
1602            .range_result_cache_size(cache_size)
1603            .build();
1604
1605        assert_eq!(cache.range_result_cache_size(), cache_size as usize);
1606        assert_eq!(
1607            cache.range_result_memory_limiter().permit_bytes(),
1608            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
1609        );
1610        assert_eq!(
1611            cache.range_result_memory_limiter().available_permits(),
1612            (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
1613        );
1614    }
1615
1616    #[tokio::test]
1617    async fn range_result_memory_limiter_rejects_oversized_request() {
1618        let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1619        assert_eq!(limiter.available_permits(), 2);
1620
1621        let err = limiter.acquire(10 * 1024).await.unwrap_err();
1622        assert!(
1623            err.to_string().contains("exceeds limiter capacity"),
1624            "unexpected error: {err}"
1625        );
1626        assert_eq!(limiter.available_permits(), 2);
1627    }
1628
1629    #[tokio::test]
1630    async fn range_result_memory_limiter_allows_request_up_to_capacity() {
1631        let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1632        let permit = limiter.acquire(2 * 1024).await.unwrap();
1633        assert_eq!(limiter.available_permits(), 0);
1634        drop(permit);
1635        assert_eq!(limiter.available_permits(), 2);
1636    }
1637
1638    #[tokio::test]
1639    async fn test_evict_puffin_cache_clears_all_entries() {
1640        use std::collections::{BTreeMap, HashMap};
1641
1642        let cache = CacheManager::builder()
1643            .index_metadata_size(128)
1644            .index_content_size(128)
1645            .index_content_page_size(64)
1646            .index_result_cache_size(128)
1647            .puffin_metadata_size(128)
1648            .build();
1649        let cache = Arc::new(cache);
1650
1651        let region_id = RegionId::new(1, 1);
1652        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1653        let column_id: ColumnId = 1;
1654
1655        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1656        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1657        let result_cache = cache.index_result_cache().unwrap();
1658        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1659
1660        let bloom_key = (
1661            index_id.file_id(),
1662            index_id.version,
1663            column_id,
1664            Tag::Skipping,
1665        );
1666        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1667        inverted_cache.put_metadata(
1668            (index_id.file_id(), index_id.version),
1669            Arc::new(InvertedIndexMetas::default()),
1670        );
1671        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1672        let selection = Arc::new(RowGroupSelection::default());
1673        result_cache.put(predicate.clone(), index_id.file_id(), selection);
1674        let file_id_str = index_id.to_string();
1675        let metadata = Arc::new(FileMetadata {
1676            blobs: Vec::new(),
1677            properties: HashMap::new(),
1678        });
1679        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1680
1681        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1682        assert!(
1683            inverted_cache
1684                .get_metadata((index_id.file_id(), index_id.version))
1685                .is_some()
1686        );
1687        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1688        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1689
1690        cache.evict_puffin_cache(index_id).await;
1691
1692        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1693        assert!(
1694            inverted_cache
1695                .get_metadata((index_id.file_id(), index_id.version))
1696                .is_none()
1697        );
1698        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1699        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1700
1701        // Refill caches and evict via CacheStrategy to ensure delegation works.
1702        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1703        inverted_cache.put_metadata(
1704            (index_id.file_id(), index_id.version),
1705            Arc::new(InvertedIndexMetas::default()),
1706        );
1707        result_cache.put(
1708            predicate.clone(),
1709            index_id.file_id(),
1710            Arc::new(RowGroupSelection::default()),
1711        );
1712        puffin_metadata_cache.put_metadata(
1713            file_id_str.clone(),
1714            Arc::new(FileMetadata {
1715                blobs: Vec::new(),
1716                properties: HashMap::new(),
1717            }),
1718        );
1719
1720        let strategy = CacheStrategy::EnableAll(cache.clone());
1721        strategy.evict_puffin_cache(index_id).await;
1722
1723        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1724        assert!(
1725            inverted_cache
1726                .get_metadata((index_id.file_id(), index_id.version))
1727                .is_none()
1728        );
1729        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1730        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1731    }
1732
1733    fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1734        let region_id = RegionId::new(1024, 7);
1735        let mut builder = RegionMetadataBuilder::new(region_id);
1736        let mut primary_key = Vec::new();
1737
1738        for column_id in 0..column_count {
1739            let semantic_type = if column_id < 32 {
1740                primary_key.push(column_id);
1741                SemanticType::Tag
1742            } else {
1743                SemanticType::Field
1744            };
1745            let mut column_schema = ColumnSchema::new(
1746                format!("wide_column_{column_id}"),
1747                ConcreteDataType::string_datatype(),
1748                true,
1749            );
1750            column_schema
1751                .mut_metadata()
1752                .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1753            builder.push_column_metadata(ColumnMetadata {
1754                column_schema,
1755                semantic_type,
1756                column_id,
1757            });
1758        }
1759
1760        builder.push_column_metadata(ColumnMetadata {
1761            column_schema: ColumnSchema::new(
1762                "ts",
1763                ConcreteDataType::timestamp_millisecond_datatype(),
1764                false,
1765            ),
1766            semantic_type: SemanticType::Timestamp,
1767            column_id: column_count,
1768        });
1769        builder.primary_key(primary_key);
1770
1771        builder.build().unwrap()
1772    }
1773}