1pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_base::readable_size::ReadableSize;
32use common_telemetry::warn;
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::value::Value;
35use datatypes::vectors::VectorRef;
36use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
37use index::result_cache::IndexResultCache;
38use moka::notification::RemovalCause;
39use moka::sync::Cache;
40use object_store::ObjectStore;
41use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
42use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
46
47use crate::cache::cache_size::parquet_meta_size;
48use crate::cache::file_cache::{FileType, IndexKey};
49use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
50#[cfg(feature = "vector_index")]
51use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
52use crate::cache::write_cache::WriteCacheRef;
53use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
54use crate::memtable::record_batch_estimated_size;
55use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
56use crate::read::Batch;
57use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
58use crate::sst::file::{RegionFileId, RegionIndexId};
59use crate::sst::parquet::PARQUET_METADATA_KEY;
60use crate::sst::parquet::read_columns::ParquetReadColumns;
61use crate::sst::parquet::reader::MetadataCacheMetrics;
62
63const SST_META_TYPE: &str = "sst_meta";
65const VECTOR_TYPE: &str = "vector";
67const PAGE_TYPE: &str = "page";
69const FILE_TYPE: &str = "file";
71const INDEX_TYPE: &str = "index";
73const SELECTOR_RESULT_TYPE: &str = "selector_result";
75const RANGE_RESULT_TYPE: &str = "range_result";
77const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
78const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
79
80#[derive(Debug)]
81pub(crate) struct RangeResultMemoryLimiter {
82 semaphore: Arc<tokio::sync::Semaphore>,
83 permit_bytes: usize,
84 total_permits: usize,
85}
86
87impl Default for RangeResultMemoryLimiter {
88 fn default() -> Self {
89 Self::new(
90 RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
91 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
92 )
93 }
94}
95
96impl RangeResultMemoryLimiter {
97 pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
98 let permit_bytes = permit_bytes.max(1);
99 let total_permits = limit_bytes
100 .div_ceil(permit_bytes)
101 .clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
102 Self {
103 semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
104 permit_bytes,
105 total_permits,
106 }
107 }
108
109 #[cfg(test)]
110 pub(crate) fn permit_bytes(&self) -> usize {
111 self.permit_bytes
112 }
113
114 #[cfg(test)]
115 pub(crate) fn available_permits(&self) -> usize {
116 self.semaphore.available_permits()
117 }
118
119 pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
120 let permits = bytes.div_ceil(self.permit_bytes).max(1);
121 if permits > self.total_permits {
122 return UnexpectedSnafu {
123 reason: format!(
124 "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
125 self.total_permits.saturating_mul(self.permit_bytes)
126 ),
127 }
128 .fail();
129 }
130 self.semaphore
131 .acquire_many(permits as u32)
132 .await
133 .map_err(|_| {
134 UnexpectedSnafu {
135 reason: "range result memory limiter is unexpectedly closed",
136 }
137 .build()
138 })
139 }
140}
141
142#[derive(Debug)]
147pub(crate) struct CachedSstMeta {
148 parquet_metadata: Arc<ParquetMetaData>,
149 region_metadata: RegionMetadataRef,
150 region_metadata_weight: usize,
151 page_index_policy: PageIndexPolicy,
152}
153
154impl CachedSstMeta {
155 #[cfg(test)]
156 pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
157 let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
158 Self::try_new_with_page_index_policy(file_path, parquet_metadata, None, page_index_policy)
159 }
160
161 pub(crate) fn try_new_with_region_metadata(
162 file_path: &str,
163 parquet_metadata: ParquetMetaData,
164 region_metadata: Option<RegionMetadataRef>,
165 ) -> Result<Self> {
166 let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
167 Self::try_new_with_page_index_policy(
168 file_path,
169 parquet_metadata,
170 region_metadata,
171 page_index_policy,
172 )
173 }
174
175 pub(crate) fn try_new_with_page_index_policy(
176 file_path: &str,
177 parquet_metadata: ParquetMetaData,
178 region_metadata: Option<RegionMetadataRef>,
179 page_index_policy: PageIndexPolicy,
180 ) -> Result<Self> {
181 let file_metadata = parquet_metadata.file_metadata();
182 let key_values = file_metadata
183 .key_value_metadata()
184 .context(InvalidParquetSnafu {
185 file: file_path,
186 reason: "missing key value meta",
187 })?;
188 let meta_value = key_values
189 .iter()
190 .find(|kv| kv.key == PARQUET_METADATA_KEY)
191 .with_context(|| InvalidParquetSnafu {
192 file: file_path,
193 reason: format!("key {} not found", PARQUET_METADATA_KEY),
194 })?;
195 let json = meta_value
196 .value
197 .as_ref()
198 .with_context(|| InvalidParquetSnafu {
199 file: file_path,
200 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
201 })?;
202 let region_metadata = match region_metadata {
203 Some(region_metadata) => region_metadata,
204 None => Arc::new(
205 store_api::metadata::RegionMetadata::from_json(json)
206 .context(InvalidMetadataSnafu)?,
207 ),
208 };
209 let region_metadata_weight = region_metadata.estimated_size().max(json.len());
211 let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
212
213 Ok(Self {
214 parquet_metadata,
215 region_metadata,
216 region_metadata_weight,
217 page_index_policy,
218 })
219 }
220
221 pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
222 self.parquet_metadata.clone()
223 }
224
225 pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
226 self.region_metadata.clone()
227 }
228
229 fn satisfies_page_index_policy(&self, requested: PageIndexPolicy) -> bool {
230 match requested {
231 PageIndexPolicy::Skip => true,
232 PageIndexPolicy::Optional => self.page_index_policy != PageIndexPolicy::Skip,
233 PageIndexPolicy::Required => self.page_index_policy == PageIndexPolicy::Required,
234 }
235 }
236}
237
238fn infer_loaded_page_index_policy(parquet_metadata: &ParquetMetaData) -> PageIndexPolicy {
239 if parquet_metadata.column_index().is_some() || parquet_metadata.offset_index().is_some() {
240 PageIndexPolicy::Optional
241 } else {
242 PageIndexPolicy::Skip
243 }
244}
245
246fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
247 let file_metadata = parquet_metadata.file_metadata();
248 let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
249 let filtered = key_values
250 .iter()
251 .filter(|kv| kv.key != PARQUET_METADATA_KEY)
252 .cloned()
253 .collect::<Vec<_>>();
254 (!filtered.is_empty()).then_some(filtered)
255 });
256 let stripped_file_metadata = FileMetaData::new(
257 file_metadata.version(),
258 file_metadata.num_rows(),
259 file_metadata.created_by().map(ToString::to_string),
260 filtered_key_values,
261 file_metadata.schema_descr_ptr(),
262 file_metadata.column_orders().cloned(),
263 );
264
265 let mut builder = parquet_metadata.into_builder();
266 let row_groups = builder.take_row_groups();
267 let column_index = builder.take_column_index();
268 let offset_index = builder.take_offset_index();
269
270 parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
271 .set_row_groups(row_groups)
272 .set_column_index(column_index)
273 .set_offset_index(offset_index)
274 .build()
275}
276
277#[derive(Clone)]
279pub enum CacheStrategy {
280 EnableAll(CacheManagerRef),
283 Compaction(CacheManagerRef),
288 Disabled,
290}
291
292impl CacheStrategy {
293 pub(crate) async fn get_sst_meta_data(
295 &self,
296 file_id: RegionFileId,
297 metrics: &mut MetadataCacheMetrics,
298 page_index_policy: PageIndexPolicy,
299 ) -> Option<Arc<CachedSstMeta>> {
300 match self {
301 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
302 cache_manager
303 .get_sst_meta_data(file_id, metrics, page_index_policy)
304 .await
305 }
306 CacheStrategy::Disabled => {
307 metrics.cache_miss += 1;
308 None
309 }
310 }
311 }
312
313 pub(crate) fn get_sst_meta_data_from_mem_cache(
315 &self,
316 file_id: RegionFileId,
317 page_index_policy: PageIndexPolicy,
318 ) -> Option<Arc<CachedSstMeta>> {
319 match self {
320 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
321 cache_manager.get_sst_meta_data_from_mem_cache(file_id, page_index_policy)
322 }
323 CacheStrategy::Disabled => None,
324 }
325 }
326
327 pub fn get_parquet_meta_data_from_mem_cache(
329 &self,
330 file_id: RegionFileId,
331 ) -> Option<Arc<ParquetMetaData>> {
332 self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
333 .map(|metadata| metadata.parquet_metadata())
334 }
335
336 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
338 match self {
339 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
340 cache_manager.put_sst_meta_data(file_id, metadata);
341 }
342 CacheStrategy::Disabled => {}
343 }
344 }
345
346 pub fn put_parquet_meta_data(
348 &self,
349 file_id: RegionFileId,
350 metadata: Arc<ParquetMetaData>,
351 region_metadata: Option<RegionMetadataRef>,
352 ) {
353 match self {
354 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
355 cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
356 }
357 CacheStrategy::Disabled => {}
358 }
359 }
360
361 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
363 match self {
364 CacheStrategy::EnableAll(cache_manager) => {
365 cache_manager.remove_parquet_meta_data(file_id);
366 }
367 CacheStrategy::Compaction(cache_manager) => {
368 cache_manager.remove_parquet_meta_data(file_id);
369 }
370 CacheStrategy::Disabled => {}
371 }
372 }
373
374 pub fn get_repeated_vector(
377 &self,
378 data_type: &ConcreteDataType,
379 value: &Value,
380 ) -> Option<VectorRef> {
381 match self {
382 CacheStrategy::EnableAll(cache_manager) => {
383 cache_manager.get_repeated_vector(data_type, value)
384 }
385 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
386 }
387 }
388
389 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
392 if let CacheStrategy::EnableAll(cache_manager) = self {
393 cache_manager.put_repeated_vector(value, vector);
394 }
395 }
396
397 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
400 match self {
401 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
402 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
403 }
404 }
405
406 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
409 if let CacheStrategy::EnableAll(cache_manager) = self {
410 cache_manager.put_pages(page_key, pages);
411 }
412 }
413
414 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
416 match self {
417 CacheStrategy::EnableAll(cache_manager) => {
418 cache_manager.evict_puffin_cache(file_id).await
419 }
420 CacheStrategy::Compaction(cache_manager) => {
421 cache_manager.evict_puffin_cache(file_id).await
422 }
423 CacheStrategy::Disabled => {}
424 }
425 }
426
427 pub fn get_selector_result(
430 &self,
431 selector_key: &SelectorResultKey,
432 ) -> Option<Arc<SelectorResultValue>> {
433 match self {
434 CacheStrategy::EnableAll(cache_manager) => {
435 cache_manager.get_selector_result(selector_key)
436 }
437 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
438 }
439 }
440
441 pub fn put_selector_result(
444 &self,
445 selector_key: SelectorResultKey,
446 result: Arc<SelectorResultValue>,
447 ) {
448 if let CacheStrategy::EnableAll(cache_manager) = self {
449 cache_manager.put_selector_result(selector_key, result);
450 }
451 }
452
453 #[allow(dead_code)]
456 pub(crate) fn get_range_result(
457 &self,
458 key: &RangeScanCacheKey,
459 ) -> Option<Arc<RangeScanCacheValue>> {
460 match self {
461 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
462 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
463 }
464 }
465
466 pub(crate) fn put_range_result(
469 &self,
470 key: RangeScanCacheKey,
471 result: Arc<RangeScanCacheValue>,
472 ) {
473 if let CacheStrategy::EnableAll(cache_manager) = self {
474 cache_manager.put_range_result(key, result);
475 }
476 }
477
478 pub(crate) fn has_range_result_cache(&self) -> bool {
480 match self {
481 CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
482 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
483 }
484 }
485
486 pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
487 match self {
488 CacheStrategy::EnableAll(cache_manager) => {
489 Some(cache_manager.range_result_memory_limiter())
490 }
491 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
492 }
493 }
494
495 pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
496 match self {
497 CacheStrategy::EnableAll(cache_manager) => {
498 Some(cache_manager.range_result_cache_size())
499 }
500 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
501 }
502 }
503
504 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
507 match self {
508 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
509 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
510 CacheStrategy::Disabled => None,
511 }
512 }
513
514 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
517 match self {
518 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
519 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
520 }
521 }
522
523 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
526 match self {
527 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
528 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
529 }
530 }
531
532 #[cfg(feature = "vector_index")]
535 pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
536 match self {
537 CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
538 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
539 }
540 }
541
542 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
545 match self {
546 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
547 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
548 }
549 }
550
551 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
554 match self {
555 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
556 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
557 }
558 }
559
560 pub fn maybe_download_background(
562 &self,
563 index_key: IndexKey,
564 remote_path: String,
565 remote_store: ObjectStore,
566 file_size: u64,
567 ) {
568 if let CacheStrategy::EnableAll(cache_manager) = self
569 && let Some(write_cache) = cache_manager.write_cache()
570 {
571 write_cache.file_cache().maybe_download_background(
572 index_key,
573 remote_path,
574 remote_store,
575 file_size,
576 );
577 }
578 }
579}
580
581#[derive(Default)]
585pub struct CacheManager {
586 sst_meta_cache: Option<SstMetaCache>,
588 vector_cache: Option<VectorCache>,
590 page_cache: Option<PageCache>,
592 write_cache: Option<WriteCacheRef>,
594 inverted_index_cache: Option<InvertedIndexCacheRef>,
596 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
598 #[cfg(feature = "vector_index")]
600 vector_index_cache: Option<VectorIndexCacheRef>,
601 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
603 selector_result_cache: Option<SelectorResultCache>,
605 range_result_cache: Option<RangeResultCache>,
607 range_result_cache_size: u64,
609 range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
611 index_result_cache: Option<IndexResultCache>,
613}
614
615pub type CacheManagerRef = Arc<CacheManager>;
616
617impl CacheManager {
618 pub fn builder() -> CacheManagerBuilder {
620 CacheManagerBuilder::default()
621 }
622
623 pub(crate) async fn get_sst_meta_data(
626 &self,
627 file_id: RegionFileId,
628 metrics: &mut MetadataCacheMetrics,
629 page_index_policy: PageIndexPolicy,
630 ) -> Option<Arc<CachedSstMeta>> {
631 if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) {
632 metrics.mem_cache_hit += 1;
633 return Some(metadata);
634 }
635
636 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
637 if let Some(write_cache) = &self.write_cache
638 && let Some(metadata) = write_cache
639 .file_cache()
640 .get_sst_meta_data(key, metrics, page_index_policy)
641 .await
642 {
643 metrics.file_cache_hit += 1;
644 self.put_sst_meta_data(file_id, metadata.clone());
645 return Some(metadata);
646 }
647
648 metrics.cache_miss += 1;
649 None
650 }
651
652 pub(crate) async fn get_parquet_meta_data(
655 &self,
656 file_id: RegionFileId,
657 metrics: &mut MetadataCacheMetrics,
658 page_index_policy: PageIndexPolicy,
659 ) -> Option<Arc<ParquetMetaData>> {
660 self.get_sst_meta_data(file_id, metrics, page_index_policy)
661 .await
662 .map(|metadata| metadata.parquet_metadata())
663 }
664
665 pub(crate) fn get_sst_meta_data_from_mem_cache(
668 &self,
669 file_id: RegionFileId,
670 page_index_policy: PageIndexPolicy,
671 ) -> Option<Arc<CachedSstMeta>> {
672 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
673 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
674 let value =
675 value.filter(|metadata| metadata.satisfies_page_index_policy(page_index_policy));
676 update_hit_miss(value, SST_META_TYPE)
677 })
678 }
679
680 pub fn get_parquet_meta_data_from_mem_cache(
683 &self,
684 file_id: RegionFileId,
685 ) -> Option<Arc<ParquetMetaData>> {
686 self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
687 .map(|metadata| metadata.parquet_metadata())
688 }
689
690 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
692 if let Some(cache) = &self.sst_meta_cache {
693 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
694 CACHE_BYTES
695 .with_label_values(&[SST_META_TYPE])
696 .add(meta_cache_weight(&key, &metadata).into());
697 cache.insert(key, metadata);
698 }
699 }
700
701 pub fn put_parquet_meta_data(
703 &self,
704 file_id: RegionFileId,
705 metadata: Arc<ParquetMetaData>,
706 region_metadata: Option<RegionMetadataRef>,
707 ) {
708 if self.sst_meta_cache.is_some() {
709 let file_path = format!(
710 "region_id={}, file_id={}",
711 file_id.region_id(),
712 file_id.file_id()
713 );
714 match CachedSstMeta::try_new_with_region_metadata(
715 &file_path,
716 Arc::unwrap_or_clone(metadata),
717 region_metadata,
718 ) {
719 Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
720 Err(err) => warn!(
721 err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
722 file_id.region_id(),
723 file_id.file_id()
724 ),
725 }
726 }
727 }
728
729 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
731 if let Some(cache) = &self.sst_meta_cache {
732 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
733 }
734 }
735
736 pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
738 self.sst_meta_cache
739 .as_ref()
740 .map(|cache| cache.weighted_size())
741 .unwrap_or(0)
742 }
743
744 pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
746 self.sst_meta_cache.is_some()
747 }
748
749 pub fn get_repeated_vector(
751 &self,
752 data_type: &ConcreteDataType,
753 value: &Value,
754 ) -> Option<VectorRef> {
755 self.vector_cache.as_ref().and_then(|vector_cache| {
756 let value = vector_cache.get(&(data_type.clone(), value.clone()));
757 update_hit_miss(value, VECTOR_TYPE)
758 })
759 }
760
761 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
763 if let Some(cache) = &self.vector_cache {
764 let key = (vector.data_type(), value);
765 CACHE_BYTES
766 .with_label_values(&[VECTOR_TYPE])
767 .add(vector_cache_weight(&key, &vector).into());
768 cache.insert(key, vector);
769 }
770 }
771
772 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
774 self.page_cache.as_ref().and_then(|page_cache| {
775 let value = page_cache.get(page_key);
776 update_hit_miss(value, PAGE_TYPE)
777 })
778 }
779
780 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
782 if let Some(cache) = &self.page_cache {
783 CACHE_BYTES
784 .with_label_values(&[PAGE_TYPE])
785 .add(page_cache_weight(&page_key, &pages).into());
786 cache.insert(page_key, pages);
787 }
788 }
789
790 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
792 if let Some(cache) = &self.bloom_filter_index_cache {
793 cache.invalidate_file(file_id.file_id());
794 }
795
796 if let Some(cache) = &self.inverted_index_cache {
797 cache.invalidate_file(file_id.file_id());
798 }
799
800 if let Some(cache) = &self.index_result_cache {
801 cache.invalidate_file(file_id.file_id());
802 }
803
804 #[cfg(feature = "vector_index")]
805 if let Some(cache) = &self.vector_index_cache {
806 cache.invalidate_file(file_id.file_id());
807 }
808
809 if let Some(cache) = &self.puffin_metadata_cache {
810 cache.remove(&file_id.to_string());
811 }
812
813 if let Some(write_cache) = &self.write_cache {
814 write_cache
815 .remove(IndexKey::new(
816 file_id.region_id(),
817 file_id.file_id(),
818 FileType::Puffin(file_id.version),
819 ))
820 .await;
821 }
822 }
823
824 pub fn get_selector_result(
826 &self,
827 selector_key: &SelectorResultKey,
828 ) -> Option<Arc<SelectorResultValue>> {
829 self.selector_result_cache
830 .as_ref()
831 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
832 }
833
834 pub fn put_selector_result(
836 &self,
837 selector_key: SelectorResultKey,
838 result: Arc<SelectorResultValue>,
839 ) {
840 if let Some(cache) = &self.selector_result_cache {
841 CACHE_BYTES
842 .with_label_values(&[SELECTOR_RESULT_TYPE])
843 .add(selector_result_cache_weight(&selector_key, &result).into());
844 cache.insert(selector_key, result);
845 }
846 }
847
848 #[allow(dead_code)]
850 pub(crate) fn get_range_result(
851 &self,
852 key: &RangeScanCacheKey,
853 ) -> Option<Arc<RangeScanCacheValue>> {
854 self.range_result_cache
855 .as_ref()
856 .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
857 }
858
859 pub(crate) fn put_range_result(
861 &self,
862 key: RangeScanCacheKey,
863 result: Arc<RangeScanCacheValue>,
864 ) {
865 if let Some(cache) = &self.range_result_cache {
866 CACHE_BYTES
867 .with_label_values(&[RANGE_RESULT_TYPE])
868 .add(range_result_cache_weight(&key, &result).into());
869 cache.insert(key, result);
870 }
871 }
872
873 pub(crate) fn has_range_result_cache(&self) -> bool {
875 self.range_result_cache.is_some()
876 }
877
878 pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
879 &self.range_result_memory_limiter
880 }
881
882 pub(crate) fn range_result_cache_size(&self) -> usize {
883 self.range_result_cache_size as usize
884 }
885
886 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
888 self.write_cache.as_ref()
889 }
890
891 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
892 self.inverted_index_cache.as_ref()
893 }
894
895 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
896 self.bloom_filter_index_cache.as_ref()
897 }
898
899 #[cfg(feature = "vector_index")]
900 pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
901 self.vector_index_cache.as_ref()
902 }
903
904 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
905 self.puffin_metadata_cache.as_ref()
906 }
907
908 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
909 self.index_result_cache.as_ref()
910 }
911}
912
913pub fn selector_result_cache_miss() {
915 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
916}
917
918pub fn selector_result_cache_hit() {
920 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
921}
922
923#[derive(Default)]
925pub struct CacheManagerBuilder {
926 sst_meta_cache_size: u64,
927 vector_cache_size: u64,
928 page_cache_size: u64,
929 index_metadata_size: u64,
930 index_content_size: u64,
931 index_content_page_size: u64,
932 index_result_cache_size: u64,
933 puffin_metadata_size: u64,
934 write_cache: Option<WriteCacheRef>,
935 selector_result_cache_size: u64,
936 range_result_cache_size: u64,
937}
938
939impl CacheManagerBuilder {
940 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
942 self.sst_meta_cache_size = bytes;
943 self
944 }
945
946 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
948 self.vector_cache_size = bytes;
949 self
950 }
951
952 pub fn page_cache_size(mut self, bytes: u64) -> Self {
954 self.page_cache_size = bytes;
955 self
956 }
957
958 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
960 self.write_cache = cache;
961 self
962 }
963
964 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
966 self.index_metadata_size = bytes;
967 self
968 }
969
970 pub fn index_content_size(mut self, bytes: u64) -> Self {
972 self.index_content_size = bytes;
973 self
974 }
975
976 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
978 self.index_content_page_size = bytes;
979 self
980 }
981
982 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
984 self.index_result_cache_size = bytes;
985 self
986 }
987
988 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
990 self.puffin_metadata_size = bytes;
991 self
992 }
993
994 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
996 self.selector_result_cache_size = bytes;
997 self
998 }
999
1000 pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
1002 self.range_result_cache_size = bytes;
1003 self
1004 }
1005
1006 pub fn build(self) -> CacheManager {
1008 fn to_str(cause: RemovalCause) -> &'static str {
1009 match cause {
1010 RemovalCause::Expired => "expired",
1011 RemovalCause::Explicit => "explicit",
1012 RemovalCause::Replaced => "replaced",
1013 RemovalCause::Size => "size",
1014 }
1015 }
1016
1017 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
1018 Cache::builder()
1019 .max_capacity(self.sst_meta_cache_size)
1020 .weigher(meta_cache_weight)
1021 .eviction_listener(|k, v, cause| {
1022 let size = meta_cache_weight(&k, &v);
1023 CACHE_BYTES
1024 .with_label_values(&[SST_META_TYPE])
1025 .sub(size.into());
1026 CACHE_EVICTION
1027 .with_label_values(&[SST_META_TYPE, to_str(cause)])
1028 .inc();
1029 })
1030 .build()
1031 });
1032 let vector_cache = (self.vector_cache_size != 0).then(|| {
1033 Cache::builder()
1034 .max_capacity(self.vector_cache_size)
1035 .weigher(vector_cache_weight)
1036 .eviction_listener(|k, v, cause| {
1037 let size = vector_cache_weight(&k, &v);
1038 CACHE_BYTES
1039 .with_label_values(&[VECTOR_TYPE])
1040 .sub(size.into());
1041 CACHE_EVICTION
1042 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
1043 .inc();
1044 })
1045 .build()
1046 });
1047 let page_cache = (self.page_cache_size != 0).then(|| {
1048 Cache::builder()
1049 .max_capacity(self.page_cache_size)
1050 .weigher(page_cache_weight)
1051 .eviction_listener(|k, v, cause| {
1052 let size = page_cache_weight(&k, &v);
1053 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
1054 CACHE_EVICTION
1055 .with_label_values(&[PAGE_TYPE, to_str(cause)])
1056 .inc();
1057 })
1058 .build()
1059 });
1060 let inverted_index_cache = InvertedIndexCache::new(
1061 self.index_metadata_size,
1062 self.index_content_size,
1063 self.index_content_page_size,
1064 );
1065 let bloom_filter_index_cache = BloomFilterIndexCache::new(
1067 self.index_metadata_size,
1068 self.index_content_size,
1069 self.index_content_page_size,
1070 );
1071 #[cfg(feature = "vector_index")]
1072 let vector_index_cache = (self.index_content_size != 0)
1073 .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
1074 let index_result_cache = (self.index_result_cache_size != 0)
1075 .then(|| IndexResultCache::new(self.index_result_cache_size));
1076 let puffin_metadata_cache =
1077 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
1078 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1079 Cache::builder()
1080 .max_capacity(self.selector_result_cache_size)
1081 .weigher(selector_result_cache_weight)
1082 .eviction_listener(|k, v, cause| {
1083 let size = selector_result_cache_weight(&k, &v);
1084 CACHE_BYTES
1085 .with_label_values(&[SELECTOR_RESULT_TYPE])
1086 .sub(size.into());
1087 CACHE_EVICTION
1088 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
1089 .inc();
1090 })
1091 .build()
1092 });
1093 let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1094 Cache::builder()
1095 .max_capacity(self.range_result_cache_size)
1096 .weigher(range_result_cache_weight)
1097 .eviction_listener(move |k, v, cause| {
1098 let size = range_result_cache_weight(&k, &v);
1099 CACHE_BYTES
1100 .with_label_values(&[RANGE_RESULT_TYPE])
1101 .sub(size.into());
1102 CACHE_EVICTION
1103 .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
1104 .inc();
1105 })
1106 .build()
1107 });
1108 CacheManager {
1109 sst_meta_cache,
1110 vector_cache,
1111 page_cache,
1112 write_cache: self.write_cache,
1113 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1114 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1115 #[cfg(feature = "vector_index")]
1116 vector_index_cache,
1117 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1118 selector_result_cache,
1119 range_result_cache,
1120 range_result_cache_size: self.range_result_cache_size,
1121 range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
1122 self.range_result_cache_size as usize,
1123 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
1124 )),
1125 index_result_cache,
1126 }
1127 }
1128}
1129
1130fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1131 (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
1133}
1134
1135fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1136 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1138}
1139
1140fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
1141 (k.estimated_size() + v.estimated_size()) as u32
1142}
1143
1144fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1145 (mem::size_of_val(k) + v.estimated_size()) as u32
1146}
1147
1148fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1149 (k.estimated_size() + v.estimated_size()) as u32
1150}
1151
1152fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1154 if value.is_some() {
1155 CACHE_HIT.with_label_values(&[cache_type]).inc();
1156 } else {
1157 CACHE_MISS.with_label_values(&[cache_type]).inc();
1158 }
1159 value
1160}
1161
1162#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1164struct SstMetaKey(RegionId, FileId);
1165
1166impl SstMetaKey {
1167 fn estimated_size(&self) -> usize {
1169 mem::size_of::<Self>()
1170 }
1171}
1172
1173#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1175pub struct ColumnPagePath {
1176 region_id: RegionId,
1178 file_id: FileId,
1180 row_group_idx: usize,
1182 column_idx: usize,
1184}
1185
1186#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1191pub struct PageKey {
1192 file_id: FileId,
1194 row_group_idx: usize,
1196 ranges: Vec<Range<u64>>,
1198}
1199
1200impl PageKey {
1201 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1203 PageKey {
1204 file_id,
1205 row_group_idx,
1206 ranges,
1207 }
1208 }
1209
1210 fn estimated_size(&self) -> usize {
1212 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1213 }
1214}
1215
1216#[derive(Default)]
1219pub struct PageValue {
1220 pub compressed: Vec<Bytes>,
1222 pub page_size: u64,
1224}
1225
1226impl PageValue {
1227 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1229 PageValue {
1230 compressed: bytes,
1231 page_size,
1232 }
1233 }
1234
1235 fn estimated_size(&self) -> usize {
1237 mem::size_of::<Self>()
1238 + self.page_size as usize
1239 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1240 }
1241}
1242
1243#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1245pub struct SelectorResultKey {
1246 pub file_id: FileId,
1248 pub row_group_idx: usize,
1250 pub selector: TimeSeriesRowSelector,
1252}
1253
1254pub enum SelectorResult {
1256 PrimaryKey(Vec<Batch>),
1258 Flat(Vec<RecordBatch>),
1260}
1261
1262pub struct SelectorResultValue {
1264 pub result: SelectorResult,
1266 pub read_cols: ParquetReadColumns,
1268}
1269
1270impl SelectorResultValue {
1271 pub fn new(result: Vec<Batch>, read_cols: ParquetReadColumns) -> SelectorResultValue {
1273 SelectorResultValue {
1274 result: SelectorResult::PrimaryKey(result),
1275 read_cols,
1276 }
1277 }
1278
1279 pub fn new_flat(
1281 result: Vec<RecordBatch>,
1282 read_cols: ParquetReadColumns,
1283 ) -> SelectorResultValue {
1284 SelectorResultValue {
1285 result: SelectorResult::Flat(result),
1286 read_cols,
1287 }
1288 }
1289
1290 fn estimated_size(&self) -> usize {
1292 match &self.result {
1293 SelectorResult::PrimaryKey(batches) => {
1294 batches.iter().map(|batch| batch.memory_size()).sum()
1295 }
1296 SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1297 }
1298 }
1299}
1300
1301type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1303type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1307type PageCache = Cache<PageKey, Arc<PageValue>>;
1309type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1311type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1313
1314#[cfg(test)]
1315mod tests {
1316 use std::sync::Arc;
1317
1318 use api::v1::SemanticType;
1319 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1320 use datatypes::schema::ColumnSchema;
1321 use datatypes::vectors::Int64Vector;
1322 use puffin::file_metadata::FileMetadata;
1323 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1324 use store_api::storage::ColumnId;
1325
1326 use super::*;
1327 use crate::cache::index::bloom_filter_index::Tag;
1328 use crate::cache::index::result_cache::PredicateKey;
1329 use crate::cache::test_util::{
1330 parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1331 };
1332 use crate::read::range_cache::{
1333 RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1334 };
1335 use crate::read::read_columns::ReadColumns;
1336 use crate::sst::parquet::row_selection::RowGroupSelection;
1337
1338 #[tokio::test]
1339 async fn test_disable_cache() {
1340 let cache = CacheManager::default();
1341 assert!(cache.sst_meta_cache.is_none());
1342 assert!(cache.vector_cache.is_none());
1343 assert!(cache.page_cache.is_none());
1344
1345 let region_id = RegionId::new(1, 1);
1346 let file_id = RegionFileId::new(region_id, FileId::random());
1347 let metadata = parquet_meta();
1348 let mut metrics = MetadataCacheMetrics::default();
1349 cache.put_parquet_meta_data(file_id, metadata, None);
1350 assert!(
1351 cache
1352 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1353 .await
1354 .is_none()
1355 );
1356
1357 let value = Value::Int64(10);
1358 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1359 cache.put_repeated_vector(value.clone(), vector.clone());
1360 assert!(
1361 cache
1362 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1363 .is_none()
1364 );
1365
1366 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1367 let pages = Arc::new(PageValue::default());
1368 cache.put_pages(key.clone(), pages);
1369 assert!(cache.get_pages(&key).is_none());
1370
1371 assert!(cache.write_cache().is_none());
1372 }
1373
1374 #[tokio::test]
1375 async fn test_parquet_meta_cache() {
1376 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1377 let mut metrics = MetadataCacheMetrics::default();
1378 let region_id = RegionId::new(1, 1);
1379 let file_id = RegionFileId::new(region_id, FileId::random());
1380 assert!(
1381 cache
1382 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1383 .await
1384 .is_none()
1385 );
1386 let (metadata, region_metadata) = sst_parquet_meta();
1387 cache.put_parquet_meta_data(file_id, metadata, None);
1388 let cached = cache
1389 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1390 .await
1391 .unwrap();
1392 assert_eq!(region_metadata, cached.region_metadata());
1393 assert!(
1394 cached
1395 .parquet_metadata()
1396 .file_metadata()
1397 .key_value_metadata()
1398 .is_none_or(|key_values| {
1399 key_values
1400 .iter()
1401 .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1402 })
1403 );
1404 cache.remove_parquet_meta_data(file_id);
1405 assert!(
1406 cache
1407 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1408 .await
1409 .is_none()
1410 );
1411 }
1412
1413 #[tokio::test]
1414 async fn test_parquet_meta_cache_with_provided_region_metadata() {
1415 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1416 let mut metrics = MetadataCacheMetrics::default();
1417 let region_id = RegionId::new(1, 1);
1418 let file_id = RegionFileId::new(region_id, FileId::random());
1419 let (metadata, region_metadata) = sst_parquet_meta();
1420
1421 cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1422
1423 let cached = cache
1424 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1425 .await
1426 .unwrap();
1427 assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata()));
1428 }
1429
1430 #[tokio::test]
1431 async fn test_parquet_meta_cache_respects_page_index_policy() {
1432 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1433 let region_id = RegionId::new(1, 1);
1434 let file_id = RegionFileId::new(region_id, FileId::random());
1435 let (metadata, _) = sst_parquet_meta();
1436
1437 let skip_metadata = Arc::new(
1438 CachedSstMeta::try_new_with_page_index_policy(
1439 "test.parquet",
1440 Arc::unwrap_or_clone(metadata.clone()),
1441 None,
1442 PageIndexPolicy::Skip,
1443 )
1444 .unwrap(),
1445 );
1446 cache.put_sst_meta_data(file_id, skip_metadata);
1447
1448 let mut metrics = MetadataCacheMetrics::default();
1449 assert!(
1450 cache
1451 .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
1452 .await
1453 .is_none()
1454 );
1455 assert_eq!(1, metrics.cache_miss);
1456
1457 let optional_metadata = Arc::new(
1458 CachedSstMeta::try_new_with_page_index_policy(
1459 "test.parquet",
1460 Arc::unwrap_or_clone(metadata),
1461 None,
1462 PageIndexPolicy::Optional,
1463 )
1464 .unwrap(),
1465 );
1466 cache.put_sst_meta_data(file_id, optional_metadata);
1467
1468 let mut metrics = MetadataCacheMetrics::default();
1469 assert!(
1470 cache
1471 .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
1472 .await
1473 .is_some()
1474 );
1475 assert_eq!(1, metrics.mem_cache_hit);
1476
1477 let mut metrics = MetadataCacheMetrics::default();
1478 assert!(
1479 cache
1480 .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Skip)
1481 .await
1482 .is_some()
1483 );
1484 assert_eq!(1, metrics.mem_cache_hit);
1485 }
1486
1487 #[test]
1488 fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1489 let region_metadata = Arc::new(wide_region_metadata(128));
1490 let json_len = region_metadata.to_json().unwrap().len();
1491 let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1492 let cached = Arc::new(
1493 CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1494 );
1495 let key = SstMetaKey(region_metadata.region_id, FileId::random());
1496
1497 assert!(cached.region_metadata_weight > json_len);
1498 assert_eq!(
1499 meta_cache_weight(&key, &cached) as usize,
1500 key.estimated_size()
1501 + parquet_meta_size(&cached.parquet_metadata)
1502 + cached.region_metadata_weight
1503 );
1504 }
1505
1506 #[test]
1507 fn test_repeated_vector_cache() {
1508 let cache = CacheManager::builder().vector_cache_size(4096).build();
1509 let value = Value::Int64(10);
1510 assert!(
1511 cache
1512 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1513 .is_none()
1514 );
1515 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1516 cache.put_repeated_vector(value.clone(), vector.clone());
1517 let cached = cache
1518 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1519 .unwrap();
1520 assert_eq!(vector, cached);
1521 }
1522
1523 #[test]
1524 fn test_page_cache() {
1525 let cache = CacheManager::builder().page_cache_size(1000).build();
1526 let file_id = FileId::random();
1527 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1528 assert!(cache.get_pages(&key).is_none());
1529 let pages = Arc::new(PageValue::default());
1530 cache.put_pages(key.clone(), pages);
1531 assert!(cache.get_pages(&key).is_some());
1532 }
1533
1534 #[test]
1535 fn test_selector_result_cache() {
1536 let cache = CacheManager::builder()
1537 .selector_result_cache_size(1000)
1538 .build();
1539 let file_id = FileId::random();
1540 let key = SelectorResultKey {
1541 file_id,
1542 row_group_idx: 0,
1543 selector: TimeSeriesRowSelector::LastRow,
1544 };
1545 assert!(cache.get_selector_result(&key).is_none());
1546 let result = Arc::new(SelectorResultValue::new(
1547 Vec::new(),
1548 ParquetReadColumns::from_deduped(Vec::new()),
1549 ));
1550 cache.put_selector_result(key, result);
1551 assert!(cache.get_selector_result(&key).is_some());
1552 }
1553
1554 #[test]
1555 fn test_range_result_cache() {
1556 let cache = Arc::new(
1557 CacheManager::builder()
1558 .range_result_cache_size(1024 * 1024)
1559 .build(),
1560 );
1561
1562 let key = RangeScanCacheKey {
1563 region_id: RegionId::new(1, 1),
1564 row_groups: vec![(FileId::random(), 0)],
1565 scan: ScanRequestFingerprintBuilder {
1566 read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
1567 read_column_types: vec![],
1568 filters: vec!["tag_0 = 1".to_string()],
1569 time_filters: vec![],
1570 series_row_selector: None,
1571 append_mode: false,
1572 filter_deleted: true,
1573 merge_mode: crate::region::options::MergeMode::LastRow,
1574 partition_expr_version: 0,
1575 }
1576 .build(),
1577 };
1578 let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1579
1580 assert!(cache.get_range_result(&key).is_none());
1581 cache.put_range_result(key.clone(), value.clone());
1582 assert!(cache.get_range_result(&key).is_some());
1583
1584 let enable_all = CacheStrategy::EnableAll(cache.clone());
1585 assert!(enable_all.get_range_result(&key).is_some());
1586
1587 let compaction = CacheStrategy::Compaction(cache.clone());
1588 assert!(compaction.get_range_result(&key).is_none());
1589 compaction.put_range_result(key.clone(), value.clone());
1590 assert!(cache.get_range_result(&key).is_some());
1591
1592 let disabled = CacheStrategy::Disabled;
1593 assert!(disabled.get_range_result(&key).is_none());
1594 disabled.put_range_result(key.clone(), value);
1595 assert!(cache.get_range_result(&key).is_some());
1596 }
1597
1598 #[test]
1599 fn test_range_result_cache_size_configures_limiter() {
1600 let cache_size = 3 * 1024_u64;
1601 let cache = CacheManager::builder()
1602 .range_result_cache_size(cache_size)
1603 .build();
1604
1605 assert_eq!(cache.range_result_cache_size(), cache_size as usize);
1606 assert_eq!(
1607 cache.range_result_memory_limiter().permit_bytes(),
1608 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
1609 );
1610 assert_eq!(
1611 cache.range_result_memory_limiter().available_permits(),
1612 (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
1613 );
1614 }
1615
1616 #[tokio::test]
1617 async fn range_result_memory_limiter_rejects_oversized_request() {
1618 let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1619 assert_eq!(limiter.available_permits(), 2);
1620
1621 let err = limiter.acquire(10 * 1024).await.unwrap_err();
1622 assert!(
1623 err.to_string().contains("exceeds limiter capacity"),
1624 "unexpected error: {err}"
1625 );
1626 assert_eq!(limiter.available_permits(), 2);
1627 }
1628
1629 #[tokio::test]
1630 async fn range_result_memory_limiter_allows_request_up_to_capacity() {
1631 let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1632 let permit = limiter.acquire(2 * 1024).await.unwrap();
1633 assert_eq!(limiter.available_permits(), 0);
1634 drop(permit);
1635 assert_eq!(limiter.available_permits(), 2);
1636 }
1637
1638 #[tokio::test]
1639 async fn test_evict_puffin_cache_clears_all_entries() {
1640 use std::collections::{BTreeMap, HashMap};
1641
1642 let cache = CacheManager::builder()
1643 .index_metadata_size(128)
1644 .index_content_size(128)
1645 .index_content_page_size(64)
1646 .index_result_cache_size(128)
1647 .puffin_metadata_size(128)
1648 .build();
1649 let cache = Arc::new(cache);
1650
1651 let region_id = RegionId::new(1, 1);
1652 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1653 let column_id: ColumnId = 1;
1654
1655 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1656 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1657 let result_cache = cache.index_result_cache().unwrap();
1658 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1659
1660 let bloom_key = (
1661 index_id.file_id(),
1662 index_id.version,
1663 column_id,
1664 Tag::Skipping,
1665 );
1666 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1667 inverted_cache.put_metadata(
1668 (index_id.file_id(), index_id.version),
1669 Arc::new(InvertedIndexMetas::default()),
1670 );
1671 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1672 let selection = Arc::new(RowGroupSelection::default());
1673 result_cache.put(predicate.clone(), index_id.file_id(), selection);
1674 let file_id_str = index_id.to_string();
1675 let metadata = Arc::new(FileMetadata {
1676 blobs: Vec::new(),
1677 properties: HashMap::new(),
1678 });
1679 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1680
1681 assert!(bloom_cache.get_metadata(bloom_key).is_some());
1682 assert!(
1683 inverted_cache
1684 .get_metadata((index_id.file_id(), index_id.version))
1685 .is_some()
1686 );
1687 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1688 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1689
1690 cache.evict_puffin_cache(index_id).await;
1691
1692 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1693 assert!(
1694 inverted_cache
1695 .get_metadata((index_id.file_id(), index_id.version))
1696 .is_none()
1697 );
1698 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1699 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1700
1701 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1703 inverted_cache.put_metadata(
1704 (index_id.file_id(), index_id.version),
1705 Arc::new(InvertedIndexMetas::default()),
1706 );
1707 result_cache.put(
1708 predicate.clone(),
1709 index_id.file_id(),
1710 Arc::new(RowGroupSelection::default()),
1711 );
1712 puffin_metadata_cache.put_metadata(
1713 file_id_str.clone(),
1714 Arc::new(FileMetadata {
1715 blobs: Vec::new(),
1716 properties: HashMap::new(),
1717 }),
1718 );
1719
1720 let strategy = CacheStrategy::EnableAll(cache.clone());
1721 strategy.evict_puffin_cache(index_id).await;
1722
1723 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1724 assert!(
1725 inverted_cache
1726 .get_metadata((index_id.file_id(), index_id.version))
1727 .is_none()
1728 );
1729 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1730 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1731 }
1732
1733 fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1734 let region_id = RegionId::new(1024, 7);
1735 let mut builder = RegionMetadataBuilder::new(region_id);
1736 let mut primary_key = Vec::new();
1737
1738 for column_id in 0..column_count {
1739 let semantic_type = if column_id < 32 {
1740 primary_key.push(column_id);
1741 SemanticType::Tag
1742 } else {
1743 SemanticType::Field
1744 };
1745 let mut column_schema = ColumnSchema::new(
1746 format!("wide_column_{column_id}"),
1747 ConcreteDataType::string_datatype(),
1748 true,
1749 );
1750 column_schema
1751 .mut_metadata()
1752 .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1753 builder.push_column_metadata(ColumnMetadata {
1754 column_schema,
1755 semantic_type,
1756 column_id,
1757 });
1758 }
1759
1760 builder.push_column_metadata(ColumnMetadata {
1761 column_schema: ColumnSchema::new(
1762 "ts",
1763 ConcreteDataType::timestamp_millisecond_datatype(),
1764 false,
1765 ),
1766 semantic_type: SemanticType::Timestamp,
1767 column_id: column_count,
1768 });
1769 builder.primary_key(primary_key);
1770
1771 builder.build().unwrap()
1772 }
1773}