mito2/cache/
file_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for files.
16
17use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::{Sender, UnboundedReceiver};
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40    CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41    WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46
47/// Subdirectory of cached files for write.
48///
49/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
50const FILE_DIR: &str = "cache/object/write/";
51
52/// Default percentage for index (puffin) cache (20% of total capacity).
53pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
54
55/// Minimum capacity for each cache (512MB).
56const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
57
58/// Channel capacity for background download tasks.
59const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
60
61/// A task to download a file in the background.
62struct DownloadTask {
63    index_key: IndexKey,
64    remote_path: String,
65    remote_store: ObjectStore,
66    file_size: u64,
67}
68
69/// Inner struct for FileCache that can be used in spawned tasks.
70#[derive(Debug)]
71struct FileCacheInner {
72    /// Local store to cache files.
73    local_store: ObjectStore,
74    /// Index to track cached Parquet files.
75    parquet_index: Cache<IndexKey, IndexValue>,
76    /// Index to track cached Puffin files.
77    puffin_index: Cache<IndexKey, IndexValue>,
78}
79
80impl FileCacheInner {
81    /// Returns the appropriate memory index for the given file type.
82    fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
83        match file_type {
84            FileType::Parquet => &self.parquet_index,
85            FileType::Puffin { .. } => &self.puffin_index,
86        }
87    }
88
89    /// Returns the cache file path for the key.
90    fn cache_file_path(&self, key: IndexKey) -> String {
91        cache_file_path(FILE_DIR, key)
92    }
93
94    /// Puts a file into the cache index.
95    ///
96    /// The `WriteCache` should ensure the file is in the correct path.
97    async fn put(&self, key: IndexKey, value: IndexValue) {
98        CACHE_BYTES
99            .with_label_values(&[key.file_type.metric_label()])
100            .add(value.file_size.into());
101        let index = self.memory_index(key.file_type);
102        index.insert(key, value).await;
103
104        // Since files are large items, we run the pending tasks immediately.
105        index.run_pending_tasks().await;
106    }
107
108    /// Recovers the index from local store.
109    async fn recover(&self) -> Result<()> {
110        let now = Instant::now();
111        let mut lister = self
112            .local_store
113            .lister_with(FILE_DIR)
114            .await
115            .context(OpenDalSnafu)?;
116        // Use i64 for total_size to reduce the risk of overflow.
117        // It is possible that the total size of the cache is larger than i32::MAX.
118        let (mut total_size, mut total_keys) = (0i64, 0);
119        let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
120        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
121            let meta = entry.metadata();
122            if !meta.is_file() {
123                continue;
124            }
125            let Some(key) = parse_index_key(entry.name()) else {
126                continue;
127            };
128
129            let meta = self
130                .local_store
131                .stat(entry.path())
132                .await
133                .context(OpenDalSnafu)?;
134            let file_size = meta.content_length() as u32;
135            let index = self.memory_index(key.file_type);
136            index.insert(key, IndexValue { file_size }).await;
137            let size = i64::from(file_size);
138            total_size += size;
139            total_keys += 1;
140
141            // Track sizes separately for each file type
142            match key.file_type {
143                FileType::Parquet => parquet_size += size,
144                FileType::Puffin { .. } => puffin_size += size,
145            }
146        }
147        // The metrics is a signed int gauge so we can updates it finally.
148        CACHE_BYTES
149            .with_label_values(&[FILE_TYPE])
150            .add(parquet_size);
151        CACHE_BYTES
152            .with_label_values(&[INDEX_TYPE])
153            .add(puffin_size);
154
155        // Run all pending tasks of the moka cache so that the cache size is updated
156        // and the eviction policy is applied.
157        self.parquet_index.run_pending_tasks().await;
158        self.puffin_index.run_pending_tasks().await;
159
160        let parquet_weight = self.parquet_index.weighted_size();
161        let parquet_count = self.parquet_index.entry_count();
162        let puffin_weight = self.puffin_index.weighted_size();
163        let puffin_count = self.puffin_index.entry_count();
164        info!(
165            "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
166            total_keys,
167            total_size,
168            parquet_count,
169            parquet_weight,
170            puffin_count,
171            puffin_weight,
172            now.elapsed()
173        );
174        Ok(())
175    }
176
177    /// Downloads a file without cleaning up on error.
178    async fn download_without_cleaning(
179        &self,
180        index_key: IndexKey,
181        remote_path: &str,
182        remote_store: &ObjectStore,
183        file_size: u64,
184        concurrency: usize,
185    ) -> Result<()> {
186        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
187
188        let file_type = index_key.file_type;
189        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
190            .with_label_values(&[match file_type {
191                FileType::Parquet => "download_parquet",
192                FileType::Puffin { .. } => "download_puffin",
193            }])
194            .start_timer();
195
196        let reader = remote_store
197            .reader_with(remote_path)
198            .concurrent(concurrency)
199            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
200            .await
201            .context(error::OpenDalSnafu)?
202            .into_futures_async_read(0..file_size)
203            .await
204            .context(error::OpenDalSnafu)?;
205
206        let cache_path = self.cache_file_path(index_key);
207        let mut writer = self
208            .local_store
209            .writer(&cache_path)
210            .await
211            .context(error::OpenDalSnafu)?
212            .into_futures_async_write();
213
214        let region_id = index_key.region_id;
215        let file_id = index_key.file_id;
216        let bytes_written =
217            futures::io::copy(reader, &mut writer)
218                .await
219                .context(error::DownloadSnafu {
220                    region_id,
221                    file_id,
222                    file_type,
223                })?;
224        writer.close().await.context(error::DownloadSnafu {
225            region_id,
226            file_id,
227            file_type,
228        })?;
229
230        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
231
232        let elapsed = timer.stop_and_record();
233        debug!(
234            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
235            remote_path, cache_path, bytes_written, region_id, elapsed,
236        );
237
238        let index_value = IndexValue {
239            file_size: bytes_written as _,
240        };
241        self.put(index_key, index_value).await;
242        Ok(())
243    }
244
245    /// Downloads a file from remote store to local cache.
246    async fn download(
247        &self,
248        index_key: IndexKey,
249        remote_path: &str,
250        remote_store: &ObjectStore,
251        file_size: u64,
252        concurrency: usize,
253    ) -> Result<()> {
254        if let Err(e) = self
255            .download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
256            .await
257        {
258            error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
259
260            let filename = index_key.to_string();
261            TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
262
263            return Err(e);
264        }
265
266        Ok(())
267    }
268
269    /// Checks if the key is in the file cache.
270    fn contains_key(&self, key: &IndexKey) -> bool {
271        self.memory_index(key.file_type).contains_key(key)
272    }
273}
274
275/// A file cache manages files on local store and evict files based
276/// on size.
277#[derive(Debug, Clone)]
278pub(crate) struct FileCache {
279    /// Inner cache state shared with background worker.
280    inner: Arc<FileCacheInner>,
281    /// Capacity of the puffin (index) cache in bytes.
282    puffin_capacity: u64,
283    /// Channel for background download tasks. None if background worker is disabled.
284    download_task_tx: Option<Sender<DownloadTask>>,
285}
286
287pub(crate) type FileCacheRef = Arc<FileCache>;
288
289impl FileCache {
290    /// Creates a new file cache.
291    pub(crate) fn new(
292        local_store: ObjectStore,
293        capacity: ReadableSize,
294        ttl: Option<Duration>,
295        index_cache_percent: Option<u8>,
296        enable_background_worker: bool,
297    ) -> FileCache {
298        // Validate and use the provided percent or default
299        let index_percent = index_cache_percent
300            .filter(|&percent| percent > 0 && percent < 100)
301            .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
302        let total_capacity = capacity.as_bytes();
303
304        // Convert percent to ratio and calculate capacity for each cache
305        let index_ratio = index_percent as f64 / 100.0;
306        let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
307        let parquet_capacity = total_capacity - puffin_capacity;
308
309        // Ensure both capacities are at least 512MB
310        let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
311        let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
312
313        info!(
314            "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
315            index_percent,
316            ReadableSize(total_capacity),
317            ReadableSize(parquet_capacity),
318            ReadableSize(puffin_capacity)
319        );
320
321        let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
322        let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
323
324        // Create inner cache shared with background worker
325        let inner = Arc::new(FileCacheInner {
326            local_store,
327            parquet_index,
328            puffin_index,
329        });
330
331        // Only create channel and spawn worker if background download is enabled
332        let download_task_tx = if enable_background_worker {
333            let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
334            Self::spawn_download_worker(inner.clone(), rx);
335            Some(tx)
336        } else {
337            None
338        };
339
340        FileCache {
341            inner,
342            puffin_capacity,
343            download_task_tx,
344        }
345    }
346
347    /// Spawns a background worker to process download tasks.
348    fn spawn_download_worker(
349        inner: Arc<FileCacheInner>,
350        mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
351    ) {
352        tokio::spawn(async move {
353            info!("Background download worker started");
354            while let Some(task) = download_task_rx.recv().await {
355                // Check if the file is already in the cache
356                if inner.contains_key(&task.index_key) {
357                    debug!(
358                        "Skipping background download for region {}, file {} - already in cache",
359                        task.index_key.region_id, task.index_key.file_id
360                    );
361                    continue;
362                }
363
364                // Ignores background download errors.
365                let _ = inner
366                    .download(
367                        task.index_key,
368                        &task.remote_path,
369                        &task.remote_store,
370                        task.file_size,
371                        1, // Background downloads use concurrency=1
372                    )
373                    .await;
374            }
375            info!("Background download worker stopped");
376        });
377    }
378
379    /// Builds a cache for a specific file type.
380    fn build_cache(
381        local_store: ObjectStore,
382        capacity: u64,
383        ttl: Option<Duration>,
384        label: &'static str,
385    ) -> Cache<IndexKey, IndexValue> {
386        let cache_store = local_store;
387        let mut builder = Cache::builder()
388            .eviction_policy(EvictionPolicy::lru())
389            .weigher(|_key, value: &IndexValue| -> u32 {
390                // We only measure space on local store.
391                value.file_size
392            })
393            .max_capacity(capacity)
394            .async_eviction_listener(move |key, value, cause| {
395                let store = cache_store.clone();
396                // Stores files under FILE_DIR.
397                let file_path = cache_file_path(FILE_DIR, *key);
398                async move {
399                    if let RemovalCause::Replaced = cause {
400                        // The cache is replaced by another file (maybe download again). We don't remove the same
401                        // file but updates the metrics as the file is already replaced by users.
402                        CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
403                        return;
404                    }
405
406                    match store.delete(&file_path).await {
407                        Ok(()) => {
408                            CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
409                        }
410                        Err(e) => {
411                            warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
412                        }
413                    }
414                }
415                .boxed()
416            });
417        if let Some(ttl) = ttl {
418            builder = builder.time_to_idle(ttl);
419        }
420        builder.build()
421    }
422
423    /// Puts a file into the cache index.
424    ///
425    /// The `WriteCache` should ensure the file is in the correct path.
426    pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
427        self.inner.put(key, value).await
428    }
429
430    pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
431        self.inner.memory_index(key.file_type).get(&key).await
432    }
433
434    /// Reads a file from the cache.
435    #[allow(unused)]
436    pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
437        // We must use `get()` to update the estimator of the cache.
438        // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
439        let index = self.inner.memory_index(key.file_type);
440        if index.get(&key).await.is_none() {
441            CACHE_MISS
442                .with_label_values(&[key.file_type.metric_label()])
443                .inc();
444            return None;
445        }
446
447        let file_path = self.inner.cache_file_path(key);
448        match self.get_reader(&file_path).await {
449            Ok(Some(reader)) => {
450                CACHE_HIT
451                    .with_label_values(&[key.file_type.metric_label()])
452                    .inc();
453                return Some(reader);
454            }
455            Err(e) => {
456                if e.kind() != ErrorKind::NotFound {
457                    warn!(e; "Failed to get file for key {:?}", key);
458                }
459            }
460            Ok(None) => {}
461        }
462
463        // We removes the file from the index.
464        index.remove(&key).await;
465        CACHE_MISS
466            .with_label_values(&[key.file_type.metric_label()])
467            .inc();
468        None
469    }
470
471    /// Reads ranges from the cache.
472    pub(crate) async fn read_ranges(
473        &self,
474        key: IndexKey,
475        ranges: &[Range<u64>],
476    ) -> Option<Vec<Bytes>> {
477        let index = self.inner.memory_index(key.file_type);
478        if index.get(&key).await.is_none() {
479            CACHE_MISS
480                .with_label_values(&[key.file_type.metric_label()])
481                .inc();
482            return None;
483        }
484
485        let file_path = self.inner.cache_file_path(key);
486        // In most cases, it will use blocking read,
487        // because FileCache is normally based on local file system, which supports blocking read.
488        let bytes_result =
489            fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
490        match bytes_result {
491            Ok(bytes) => {
492                CACHE_HIT
493                    .with_label_values(&[key.file_type.metric_label()])
494                    .inc();
495                Some(bytes)
496            }
497            Err(e) => {
498                if e.kind() != ErrorKind::NotFound {
499                    warn!(e; "Failed to get file for key {:?}", key);
500                }
501
502                // We removes the file from the index.
503                index.remove(&key).await;
504                CACHE_MISS
505                    .with_label_values(&[key.file_type.metric_label()])
506                    .inc();
507                None
508            }
509        }
510    }
511
512    /// Removes a file from the cache explicitly.
513    /// It always tries to remove the file from the local store because we may not have the file
514    /// in the memory index if upload is failed.
515    pub(crate) async fn remove(&self, key: IndexKey) {
516        let file_path = self.inner.cache_file_path(key);
517        self.inner.memory_index(key.file_type).remove(&key).await;
518        // Always delete the file from the local store.
519        if let Err(e) = self.inner.local_store.delete(&file_path).await {
520            warn!(e; "Failed to delete a cached file {}", file_path);
521        }
522    }
523
524    /// Recovers the index from local store.
525    ///
526    /// If `task_receiver` is provided, spawns a background task after recovery
527    /// to process `RegionLoadCacheTask` messages for loading files into the cache.
528    pub(crate) async fn recover(
529        &self,
530        sync: bool,
531        task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
532    ) {
533        let moved_self = self.clone();
534        let handle = tokio::spawn(async move {
535            if let Err(err) = moved_self.inner.recover().await {
536                error!(err; "Failed to recover file cache.")
537            }
538
539            // Spawns background task to process region load cache tasks after recovery.
540            // So it won't block the recovery when `sync` is true.
541            if let Some(mut receiver) = task_receiver {
542                info!("Spawning background task for processing region load cache tasks");
543                tokio::spawn(async move {
544                    while let Some(task) = receiver.recv().await {
545                        task.fill_cache(&moved_self).await;
546                    }
547                    info!("Background task for processing region load cache tasks stopped");
548                });
549            }
550        });
551
552        if sync {
553            let _ = handle.await;
554        }
555    }
556
557    /// Returns the cache file path for the key.
558    pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
559        self.inner.cache_file_path(key)
560    }
561
562    /// Returns the local store of the file cache.
563    pub(crate) fn local_store(&self) -> ObjectStore {
564        self.inner.local_store.clone()
565    }
566
567    /// Get the parquet metadata in file cache.
568    /// If the file is not in the cache or fail to load metadata, return None.
569    pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
570        // Check if file cache contains the key
571        if let Some(index_value) = self.inner.parquet_index.get(&key).await {
572            // Load metadata from file cache
573            let local_store = self.local_store();
574            let file_path = self.inner.cache_file_path(key);
575            let file_size = index_value.file_size as u64;
576            let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
577
578            match metadata_loader.load().await {
579                Ok(metadata) => {
580                    CACHE_HIT
581                        .with_label_values(&[key.file_type.metric_label()])
582                        .inc();
583                    Some(metadata)
584                }
585                Err(e) => {
586                    if !e.is_object_not_found() {
587                        warn!(
588                            e; "Failed to get parquet metadata for key {:?}",
589                            key
590                        );
591                    }
592                    // We removes the file from the index.
593                    self.inner.parquet_index.remove(&key).await;
594                    CACHE_MISS
595                        .with_label_values(&[key.file_type.metric_label()])
596                        .inc();
597                    None
598                }
599            }
600        } else {
601            CACHE_MISS
602                .with_label_values(&[key.file_type.metric_label()])
603                .inc();
604            None
605        }
606    }
607
608    async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
609        if self.inner.local_store.exists(file_path).await? {
610            Ok(Some(self.inner.local_store.reader(file_path).await?))
611        } else {
612            Ok(None)
613        }
614    }
615
616    /// Checks if the key is in the file cache.
617    pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
618        self.inner.contains_key(key)
619    }
620
621    /// Returns the capacity of the puffin (index) cache in bytes.
622    pub(crate) fn puffin_cache_capacity(&self) -> u64 {
623        self.puffin_capacity
624    }
625
626    /// Returns the current weighted size (used bytes) of the puffin (index) cache.
627    pub(crate) fn puffin_cache_size(&self) -> u64 {
628        self.inner.puffin_index.weighted_size()
629    }
630
631    /// Downloads a file in `remote_path` from the remote object store to the local cache
632    /// (specified by `index_key`).
633    pub(crate) async fn download(
634        &self,
635        index_key: IndexKey,
636        remote_path: &str,
637        remote_store: &ObjectStore,
638        file_size: u64,
639    ) -> Result<()> {
640        self.inner
641            .download(index_key, remote_path, remote_store, file_size, 8) // Foreground uses concurrency=8
642            .await
643    }
644
645    /// Downloads a file in `remote_path` from the remote object store to the local cache
646    /// (specified by `index_key`) in the background. Errors are logged but not returned.
647    ///
648    /// This method attempts to send a download task to the background worker.
649    /// If the channel is full, the task is silently dropped.
650    pub(crate) fn maybe_download_background(
651        &self,
652        index_key: IndexKey,
653        remote_path: String,
654        remote_store: ObjectStore,
655        file_size: u64,
656    ) {
657        // Do nothing if background worker is disabled (channel is None)
658        let Some(tx) = &self.download_task_tx else {
659            return;
660        };
661
662        let task = DownloadTask {
663            index_key,
664            remote_path,
665            remote_store,
666            file_size,
667        };
668
669        // Try to send the task; if the channel is full, just drop it
670        if let Err(e) = tx.try_send(task) {
671            debug!(
672                "Failed to queue background download task for region {}, file {}: {:?}",
673                index_key.region_id, index_key.file_id, e
674            );
675        }
676    }
677}
678
679/// Key of file cache index.
680#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
681pub struct IndexKey {
682    pub region_id: RegionId,
683    pub file_id: FileId,
684    pub file_type: FileType,
685}
686
687impl IndexKey {
688    /// Creates a new index key.
689    pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
690        IndexKey {
691            region_id,
692            file_id,
693            file_type,
694        }
695    }
696}
697
698impl fmt::Display for IndexKey {
699    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
700        write!(
701            f,
702            "{}.{}.{}",
703            self.region_id.as_u64(),
704            self.file_id,
705            self.file_type
706        )
707    }
708}
709
710/// Type of the file.
711#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
712pub enum FileType {
713    /// Parquet file.
714    Parquet,
715    /// Puffin file.
716    Puffin(u64),
717}
718
719impl fmt::Display for FileType {
720    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721        match self {
722            FileType::Parquet => write!(f, "parquet"),
723            FileType::Puffin(version) => write!(f, "{}.puffin", version),
724        }
725    }
726}
727
728impl FileType {
729    /// Parses the file type from string.
730    pub(crate) fn parse(s: &str) -> Option<FileType> {
731        match s {
732            "parquet" => Some(FileType::Parquet),
733            "puffin" => Some(FileType::Puffin(0)),
734            _ => {
735                // if post-fix with .puffin, try to parse the version
736                if let Some(version_str) = s.strip_suffix(".puffin") {
737                    let version = version_str.parse::<u64>().ok()?;
738                    Some(FileType::Puffin(version))
739                } else {
740                    None
741                }
742            }
743        }
744    }
745
746    /// Returns the metric label for this file type.
747    fn metric_label(&self) -> &'static str {
748        match self {
749            FileType::Parquet => FILE_TYPE,
750            FileType::Puffin(_) => INDEX_TYPE,
751        }
752    }
753}
754
755/// An entity that describes the file in the file cache.
756///
757/// It should only keep minimal information needed by the cache.
758#[derive(Debug, Clone)]
759pub(crate) struct IndexValue {
760    /// Size of the file in bytes.
761    pub(crate) file_size: u32,
762}
763
764/// Generates the path to the cached file.
765///
766/// The file name format is `{region_id}.{file_id}.{file_type}`
767fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
768    join_path(cache_file_dir, &key.to_string())
769}
770
771/// Parse index key from the file name.
772fn parse_index_key(name: &str) -> Option<IndexKey> {
773    let mut split = name.splitn(3, '.');
774    let region_id = split.next().and_then(|s| {
775        let id = s.parse::<u64>().ok()?;
776        Some(RegionId::from_u64(id))
777    })?;
778    let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
779    let file_type = split.next().and_then(FileType::parse)?;
780
781    Some(IndexKey::new(region_id, file_id, file_type))
782}
783
784#[cfg(test)]
785mod tests {
786    use common_test_util::temp_dir::create_temp_dir;
787    use object_store::services::Fs;
788
789    use super::*;
790
791    fn new_fs_store(path: &str) -> ObjectStore {
792        let builder = Fs::default().root(path);
793        ObjectStore::new(builder).unwrap().finish()
794    }
795
796    #[tokio::test]
797    async fn test_file_cache_ttl() {
798        let dir = create_temp_dir("");
799        let local_store = new_fs_store(dir.path().to_str().unwrap());
800
801        let cache = FileCache::new(
802            local_store.clone(),
803            ReadableSize::mb(10),
804            Some(Duration::from_millis(10)),
805            None,
806            true, // enable_background_worker
807        );
808        let region_id = RegionId::new(2000, 0);
809        let file_id = FileId::random();
810        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
811        let file_path = cache.cache_file_path(key);
812
813        // Get an empty file.
814        assert!(cache.reader(key).await.is_none());
815
816        // Write a file.
817        local_store
818            .write(&file_path, b"hello".as_slice())
819            .await
820            .unwrap();
821
822        // Add to the cache.
823        cache
824            .put(
825                IndexKey::new(region_id, file_id, FileType::Parquet),
826                IndexValue { file_size: 5 },
827            )
828            .await;
829
830        let exist = cache.reader(key).await;
831        assert!(exist.is_some());
832        tokio::time::sleep(Duration::from_millis(15)).await;
833        cache.inner.parquet_index.run_pending_tasks().await;
834        let non = cache.reader(key).await;
835        assert!(non.is_none());
836    }
837
838    #[tokio::test]
839    async fn test_file_cache_basic() {
840        let dir = create_temp_dir("");
841        let local_store = new_fs_store(dir.path().to_str().unwrap());
842
843        let cache = FileCache::new(
844            local_store.clone(),
845            ReadableSize::mb(10),
846            None,
847            None,
848            true, // enable_background_worker
849        );
850        let region_id = RegionId::new(2000, 0);
851        let file_id = FileId::random();
852        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
853        let file_path = cache.cache_file_path(key);
854
855        // Get an empty file.
856        assert!(cache.reader(key).await.is_none());
857
858        // Write a file.
859        local_store
860            .write(&file_path, b"hello".as_slice())
861            .await
862            .unwrap();
863        // Add to the cache.
864        cache
865            .put(
866                IndexKey::new(region_id, file_id, FileType::Parquet),
867                IndexValue { file_size: 5 },
868            )
869            .await;
870
871        // Read file content.
872        let reader = cache.reader(key).await.unwrap();
873        let buf = reader.read(..).await.unwrap().to_vec();
874        assert_eq!("hello", String::from_utf8(buf).unwrap());
875
876        // Get weighted size.
877        cache.inner.parquet_index.run_pending_tasks().await;
878        assert_eq!(5, cache.inner.parquet_index.weighted_size());
879
880        // Remove the file.
881        cache.remove(key).await;
882        assert!(cache.reader(key).await.is_none());
883
884        // Ensure all pending tasks of the moka cache is done before assertion.
885        cache.inner.parquet_index.run_pending_tasks().await;
886
887        // The file also not exists.
888        assert!(!local_store.exists(&file_path).await.unwrap());
889        assert_eq!(0, cache.inner.parquet_index.weighted_size());
890    }
891
892    #[tokio::test]
893    async fn test_file_cache_file_removed() {
894        let dir = create_temp_dir("");
895        let local_store = new_fs_store(dir.path().to_str().unwrap());
896
897        let cache = FileCache::new(
898            local_store.clone(),
899            ReadableSize::mb(10),
900            None,
901            None,
902            true, // enable_background_worker
903        );
904        let region_id = RegionId::new(2000, 0);
905        let file_id = FileId::random();
906        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
907        let file_path = cache.cache_file_path(key);
908
909        // Write a file.
910        local_store
911            .write(&file_path, b"hello".as_slice())
912            .await
913            .unwrap();
914        // Add to the cache.
915        cache
916            .put(
917                IndexKey::new(region_id, file_id, FileType::Parquet),
918                IndexValue { file_size: 5 },
919            )
920            .await;
921
922        // Remove the file but keep the index.
923        local_store.delete(&file_path).await.unwrap();
924
925        // Reader is none.
926        assert!(cache.reader(key).await.is_none());
927        // Key is removed.
928        assert!(!cache.inner.parquet_index.contains_key(&key));
929    }
930
931    #[tokio::test]
932    async fn test_file_cache_recover() {
933        let dir = create_temp_dir("");
934        let local_store = new_fs_store(dir.path().to_str().unwrap());
935        let cache = FileCache::new(
936            local_store.clone(),
937            ReadableSize::mb(10),
938            None,
939            None,
940            true, // enable_background_worker
941        );
942
943        let region_id = RegionId::new(2000, 0);
944        let file_type = FileType::Parquet;
945        // Write N files.
946        let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
947        let mut total_size = 0;
948        for (i, file_id) in file_ids.iter().enumerate() {
949            let key = IndexKey::new(region_id, *file_id, file_type);
950            let file_path = cache.cache_file_path(key);
951            let bytes = i.to_string().into_bytes();
952            local_store.write(&file_path, bytes.clone()).await.unwrap();
953
954            // Add to the cache.
955            cache
956                .put(
957                    IndexKey::new(region_id, *file_id, file_type),
958                    IndexValue {
959                        file_size: bytes.len() as u32,
960                    },
961                )
962                .await;
963            total_size += bytes.len();
964        }
965
966        // Recover the cache.
967        let cache = FileCache::new(
968            local_store.clone(),
969            ReadableSize::mb(10),
970            None,
971            None,
972            true, // enable_background_worker
973        );
974        // No entry before recovery.
975        assert!(
976            cache
977                .reader(IndexKey::new(region_id, file_ids[0], file_type))
978                .await
979                .is_none()
980        );
981        cache.recover(true, None).await;
982
983        // Check size.
984        cache.inner.parquet_index.run_pending_tasks().await;
985        assert_eq!(
986            total_size,
987            cache.inner.parquet_index.weighted_size() as usize
988        );
989
990        for (i, file_id) in file_ids.iter().enumerate() {
991            let key = IndexKey::new(region_id, *file_id, file_type);
992            let reader = cache.reader(key).await.unwrap();
993            let buf = reader.read(..).await.unwrap().to_vec();
994            assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
995        }
996    }
997
998    #[tokio::test]
999    async fn test_file_cache_read_ranges() {
1000        let dir = create_temp_dir("");
1001        let local_store = new_fs_store(dir.path().to_str().unwrap());
1002        let file_cache = FileCache::new(
1003            local_store.clone(),
1004            ReadableSize::mb(10),
1005            None,
1006            None,
1007            true, // enable_background_worker
1008        );
1009        let region_id = RegionId::new(2000, 0);
1010        let file_id = FileId::random();
1011        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
1012        let file_path = file_cache.cache_file_path(key);
1013        // Write a file.
1014        let data = b"hello greptime database";
1015        local_store
1016            .write(&file_path, data.as_slice())
1017            .await
1018            .unwrap();
1019        // Add to the cache.
1020        file_cache.put(key, IndexValue { file_size: 5 }).await;
1021        // Ranges
1022        let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
1023        let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
1024
1025        assert_eq!(4, bytes.len());
1026        assert_eq!(b"hello", bytes[0].as_ref());
1027        assert_eq!(b"grep", bytes[1].as_ref());
1028        assert_eq!(b"data", bytes[2].as_ref());
1029        assert_eq!(data, bytes[3].as_ref());
1030    }
1031
1032    #[test]
1033    fn test_cache_file_path() {
1034        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1035        assert_eq!(
1036            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1037            cache_file_path(
1038                "test_dir",
1039                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1040            )
1041        );
1042        assert_eq!(
1043            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1044            cache_file_path(
1045                "test_dir/",
1046                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1047            )
1048        );
1049    }
1050
1051    #[test]
1052    fn test_parse_file_name() {
1053        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1054        let region_id = RegionId::new(1234, 5);
1055        assert_eq!(
1056            IndexKey::new(region_id, file_id, FileType::Parquet),
1057            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
1058        );
1059        assert_eq!(
1060            IndexKey::new(region_id, file_id, FileType::Puffin(0)),
1061            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
1062        );
1063        assert_eq!(
1064            IndexKey::new(region_id, file_id, FileType::Puffin(42)),
1065            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
1066                .unwrap()
1067        );
1068        assert!(parse_index_key("").is_none());
1069        assert!(parse_index_key(".").is_none());
1070        assert!(parse_index_key("5299989643269").is_none());
1071        assert!(parse_index_key("5299989643269.").is_none());
1072        assert!(parse_index_key(".5299989643269").is_none());
1073        assert!(parse_index_key("5299989643269.").is_none());
1074        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
1075        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
1076        assert!(
1077            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
1078        );
1079        assert!(
1080            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
1081                .is_none()
1082        );
1083    }
1084}