1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::{Sender, UnboundedReceiver};
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{CachedSstMeta, FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40 CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41 WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46use crate::sst::parquet::reader::MetadataCacheMetrics;
47
48const FILE_DIR: &str = "cache/object/write/";
52
53pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
55
56const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
58
59const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
61
62struct DownloadTask {
64 index_key: IndexKey,
65 remote_path: String,
66 remote_store: ObjectStore,
67 file_size: u64,
68}
69
70#[derive(Debug)]
72struct FileCacheInner {
73 local_store: ObjectStore,
75 parquet_index: Cache<IndexKey, IndexValue>,
77 puffin_index: Cache<IndexKey, IndexValue>,
79}
80
81impl FileCacheInner {
82 fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
84 match file_type {
85 FileType::Parquet => &self.parquet_index,
86 FileType::Puffin { .. } => &self.puffin_index,
87 }
88 }
89
90 fn cache_file_path(&self, key: IndexKey) -> String {
92 cache_file_path(FILE_DIR, key)
93 }
94
95 async fn put(&self, key: IndexKey, value: IndexValue) {
99 CACHE_BYTES
100 .with_label_values(&[key.file_type.metric_label()])
101 .add(value.file_size.into());
102 let index = self.memory_index(key.file_type);
103 index.insert(key, value).await;
104
105 index.run_pending_tasks().await;
107 }
108
109 async fn recover(&self) -> Result<()> {
111 let now = Instant::now();
112 let mut lister = self
113 .local_store
114 .lister_with(FILE_DIR)
115 .await
116 .context(OpenDalSnafu)?;
117 let (mut total_size, mut total_keys) = (0i64, 0);
120 let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
121 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
122 let meta = entry.metadata();
123 if !meta.is_file() {
124 continue;
125 }
126 let Some(key) = parse_index_key(entry.name()) else {
127 continue;
128 };
129
130 let meta = self
131 .local_store
132 .stat(entry.path())
133 .await
134 .context(OpenDalSnafu)?;
135 let file_size = meta.content_length() as u32;
136 let index = self.memory_index(key.file_type);
137 index.insert(key, IndexValue { file_size }).await;
138 let size = i64::from(file_size);
139 total_size += size;
140 total_keys += 1;
141
142 match key.file_type {
144 FileType::Parquet => parquet_size += size,
145 FileType::Puffin { .. } => puffin_size += size,
146 }
147 }
148 CACHE_BYTES
150 .with_label_values(&[FILE_TYPE])
151 .add(parquet_size);
152 CACHE_BYTES
153 .with_label_values(&[INDEX_TYPE])
154 .add(puffin_size);
155
156 self.parquet_index.run_pending_tasks().await;
159 self.puffin_index.run_pending_tasks().await;
160
161 let parquet_weight = self.parquet_index.weighted_size();
162 let parquet_count = self.parquet_index.entry_count();
163 let puffin_weight = self.puffin_index.weighted_size();
164 let puffin_count = self.puffin_index.entry_count();
165 info!(
166 "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
167 total_keys,
168 total_size,
169 parquet_count,
170 parquet_weight,
171 puffin_count,
172 puffin_weight,
173 now.elapsed()
174 );
175 Ok(())
176 }
177
178 async fn download_without_cleaning(
180 &self,
181 index_key: IndexKey,
182 remote_path: &str,
183 remote_store: &ObjectStore,
184 file_size: u64,
185 concurrency: usize,
186 ) -> Result<()> {
187 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
188
189 let file_type = index_key.file_type;
190 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
191 .with_label_values(&[match file_type {
192 FileType::Parquet => "download_parquet",
193 FileType::Puffin { .. } => "download_puffin",
194 }])
195 .start_timer();
196
197 let reader = remote_store
198 .reader_with(remote_path)
199 .concurrent(concurrency)
200 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
201 .await
202 .context(error::OpenDalSnafu)?
203 .into_futures_async_read(0..file_size)
204 .await
205 .context(error::OpenDalSnafu)?;
206
207 let cache_path = self.cache_file_path(index_key);
208 let mut writer = self
209 .local_store
210 .writer(&cache_path)
211 .await
212 .context(error::OpenDalSnafu)?
213 .into_futures_async_write();
214
215 let region_id = index_key.region_id;
216 let file_id = index_key.file_id;
217 let bytes_written =
218 futures::io::copy(reader, &mut writer)
219 .await
220 .context(error::DownloadSnafu {
221 region_id,
222 file_id,
223 file_type,
224 })?;
225 writer.close().await.context(error::DownloadSnafu {
226 region_id,
227 file_id,
228 file_type,
229 })?;
230
231 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
232
233 let elapsed = timer.stop_and_record();
234 debug!(
235 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
236 remote_path, cache_path, bytes_written, region_id, elapsed,
237 );
238
239 let index_value = IndexValue {
240 file_size: bytes_written as _,
241 };
242 self.put(index_key, index_value).await;
243 Ok(())
244 }
245
246 async fn download(
248 &self,
249 index_key: IndexKey,
250 remote_path: &str,
251 remote_store: &ObjectStore,
252 file_size: u64,
253 concurrency: usize,
254 ) -> Result<()> {
255 if let Err(e) = self
256 .download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
257 .await
258 {
259 error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
260
261 let filename = index_key.to_string();
262 TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
263
264 return Err(e);
265 }
266
267 Ok(())
268 }
269
270 fn contains_key(&self, key: &IndexKey) -> bool {
272 self.memory_index(key.file_type).contains_key(key)
273 }
274}
275
276#[derive(Debug, Clone)]
279pub(crate) struct FileCache {
280 inner: Arc<FileCacheInner>,
282 puffin_capacity: u64,
284 download_task_tx: Option<Sender<DownloadTask>>,
286}
287
288pub(crate) type FileCacheRef = Arc<FileCache>;
289
290impl FileCache {
291 fn split_cache_capacities(total_capacity: u64, index_percent: u8) -> (u64, u64) {
294 let desired_puffin_capacity = total_capacity * u64::from(index_percent) / 100;
295 let min_cache_capacity = MIN_CACHE_CAPACITY.min(total_capacity / 2);
296 let puffin_capacity =
297 desired_puffin_capacity.clamp(min_cache_capacity, total_capacity - min_cache_capacity);
298 let parquet_capacity = total_capacity - puffin_capacity;
299 (parquet_capacity, puffin_capacity)
300 }
301
302 pub(crate) fn new(
304 local_store: ObjectStore,
305 capacity: ReadableSize,
306 ttl: Option<Duration>,
307 index_cache_percent: Option<u8>,
308 enable_background_worker: bool,
309 ) -> FileCache {
310 let index_percent = index_cache_percent
312 .filter(|&percent| percent > 0 && percent < 100)
313 .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
314 let total_capacity = capacity.as_bytes();
315
316 let (parquet_capacity, puffin_capacity) =
317 Self::split_cache_capacities(total_capacity, index_percent);
318
319 info!(
320 "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
321 index_percent,
322 ReadableSize(total_capacity),
323 ReadableSize(parquet_capacity),
324 ReadableSize(puffin_capacity)
325 );
326
327 let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
328 let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
329
330 let inner = Arc::new(FileCacheInner {
332 local_store,
333 parquet_index,
334 puffin_index,
335 });
336
337 let download_task_tx = if enable_background_worker {
339 let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
340 Self::spawn_download_worker(inner.clone(), rx);
341 Some(tx)
342 } else {
343 None
344 };
345
346 FileCache {
347 inner,
348 puffin_capacity,
349 download_task_tx,
350 }
351 }
352
353 fn spawn_download_worker(
355 inner: Arc<FileCacheInner>,
356 mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
357 ) {
358 tokio::spawn(async move {
359 info!("Background download worker started");
360 while let Some(task) = download_task_rx.recv().await {
361 if inner.contains_key(&task.index_key) {
363 debug!(
364 "Skipping background download for region {}, file {} - already in cache",
365 task.index_key.region_id, task.index_key.file_id
366 );
367 continue;
368 }
369
370 let _ = inner
372 .download(
373 task.index_key,
374 &task.remote_path,
375 &task.remote_store,
376 task.file_size,
377 1, )
379 .await;
380 }
381 info!("Background download worker stopped");
382 });
383 }
384
385 fn build_cache(
387 local_store: ObjectStore,
388 capacity: u64,
389 ttl: Option<Duration>,
390 label: &'static str,
391 ) -> Cache<IndexKey, IndexValue> {
392 let cache_store = local_store;
393 let mut builder = Cache::builder()
394 .eviction_policy(EvictionPolicy::lru())
395 .weigher(|_key, value: &IndexValue| -> u32 {
396 value.file_size
398 })
399 .max_capacity(capacity)
400 .async_eviction_listener(move |key, value, cause| {
401 let store = cache_store.clone();
402 let file_path = cache_file_path(FILE_DIR, *key);
404 async move {
405 if let RemovalCause::Replaced = cause {
406 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
409 return;
410 }
411
412 match store.delete(&file_path).await {
413 Ok(()) => {
414 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
415 }
416 Err(e) => {
417 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
418 }
419 }
420 }
421 .boxed()
422 });
423 if let Some(ttl) = ttl {
424 builder = builder.time_to_idle(ttl);
425 }
426 builder.build()
427 }
428
429 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
433 self.inner.put(key, value).await
434 }
435
436 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
437 self.inner.memory_index(key.file_type).get(&key).await
438 }
439
440 #[allow(unused)]
442 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
443 let index = self.inner.memory_index(key.file_type);
446 if index.get(&key).await.is_none() {
447 CACHE_MISS
448 .with_label_values(&[key.file_type.metric_label()])
449 .inc();
450 return None;
451 }
452
453 let file_path = self.inner.cache_file_path(key);
454 match self.get_reader(&file_path).await {
455 Ok(Some(reader)) => {
456 CACHE_HIT
457 .with_label_values(&[key.file_type.metric_label()])
458 .inc();
459 return Some(reader);
460 }
461 Err(e) => {
462 if e.kind() != ErrorKind::NotFound {
463 warn!(e; "Failed to get file for key {:?}", key);
464 }
465 }
466 Ok(None) => {}
467 }
468
469 index.remove(&key).await;
471 CACHE_MISS
472 .with_label_values(&[key.file_type.metric_label()])
473 .inc();
474 None
475 }
476
477 pub(crate) async fn read_ranges(
479 &self,
480 key: IndexKey,
481 ranges: &[Range<u64>],
482 ) -> Option<Vec<Bytes>> {
483 let index = self.inner.memory_index(key.file_type);
484 if index.get(&key).await.is_none() {
485 CACHE_MISS
486 .with_label_values(&[key.file_type.metric_label()])
487 .inc();
488 return None;
489 }
490
491 let file_path = self.inner.cache_file_path(key);
492 let bytes_result =
495 fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
496 match bytes_result {
497 Ok(bytes) => {
498 CACHE_HIT
499 .with_label_values(&[key.file_type.metric_label()])
500 .inc();
501 Some(bytes)
502 }
503 Err(e) => {
504 if e.kind() != ErrorKind::NotFound {
505 warn!(e; "Failed to get file for key {:?}", key);
506 }
507
508 index.remove(&key).await;
510 CACHE_MISS
511 .with_label_values(&[key.file_type.metric_label()])
512 .inc();
513 None
514 }
515 }
516 }
517
518 pub(crate) async fn remove(&self, key: IndexKey) {
522 let file_path = self.inner.cache_file_path(key);
523 self.inner.memory_index(key.file_type).remove(&key).await;
524 if let Err(e) = self.inner.local_store.delete(&file_path).await {
526 warn!(e; "Failed to delete a cached file {}", file_path);
527 }
528 }
529
530 pub(crate) async fn recover(
535 &self,
536 sync: bool,
537 task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
538 ) {
539 let moved_self = self.clone();
540 let handle = tokio::spawn(async move {
541 if let Err(err) = moved_self.inner.recover().await {
542 error!(err; "Failed to recover file cache.")
543 }
544
545 if let Some(mut receiver) = task_receiver {
548 info!("Spawning background task for processing region load cache tasks");
549 tokio::spawn(async move {
550 while let Some(task) = receiver.recv().await {
551 task.fill_cache(&moved_self).await;
552 }
553 info!("Background task for processing region load cache tasks stopped");
554 });
555 }
556 });
557
558 if sync {
559 let _ = handle.await;
560 }
561 }
562
563 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
565 self.inner.cache_file_path(key)
566 }
567
568 pub(crate) fn local_store(&self) -> ObjectStore {
570 self.inner.local_store.clone()
571 }
572
573 pub(crate) async fn get_parquet_meta_data(
576 &self,
577 key: IndexKey,
578 cache_metrics: &mut MetadataCacheMetrics,
579 page_index_policy: PageIndexPolicy,
580 ) -> Option<ParquetMetaData> {
581 if let Some(index_value) = self.inner.parquet_index.get(&key).await {
583 let local_store = self.local_store();
585 let file_path = self.inner.cache_file_path(key);
586 let file_size = index_value.file_size as u64;
587 let mut metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
588 metadata_loader.with_page_index_policy(page_index_policy);
589
590 match metadata_loader.load(cache_metrics).await {
591 Ok(metadata) => {
592 CACHE_HIT
593 .with_label_values(&[key.file_type.metric_label()])
594 .inc();
595 Some(metadata)
596 }
597 Err(e) => {
598 if !e.is_object_not_found() {
599 warn!(
600 e; "Failed to get parquet metadata for key {:?}",
601 key
602 );
603 }
604 self.inner.parquet_index.remove(&key).await;
606 CACHE_MISS
607 .with_label_values(&[key.file_type.metric_label()])
608 .inc();
609 None
610 }
611 }
612 } else {
613 CACHE_MISS
614 .with_label_values(&[key.file_type.metric_label()])
615 .inc();
616 None
617 }
618 }
619
620 pub(crate) async fn get_sst_meta_data(
623 &self,
624 key: IndexKey,
625 cache_metrics: &mut MetadataCacheMetrics,
626 page_index_policy: PageIndexPolicy,
627 ) -> Option<Arc<CachedSstMeta>> {
628 let file_path = self.inner.cache_file_path(key);
629 self.get_parquet_meta_data(key, cache_metrics, page_index_policy)
630 .await
631 .and_then(|metadata| {
632 match CachedSstMeta::try_new_with_page_index_policy(
633 &file_path,
634 metadata,
635 None,
636 page_index_policy,
637 ) {
638 Ok(metadata) => Some(Arc::new(metadata)),
639 Err(err) => {
640 CACHE_MISS
641 .with_label_values(&[key.file_type.metric_label()])
642 .inc();
643 warn!(
644 err; "Failed to decode cached parquet metadata for key {:?}",
645 key
646 );
647 None
648 }
649 }
650 })
651 }
652
653 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
654 if self.inner.local_store.exists(file_path).await? {
655 Ok(Some(self.inner.local_store.reader(file_path).await?))
656 } else {
657 Ok(None)
658 }
659 }
660
661 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
663 self.inner.contains_key(key)
664 }
665
666 pub(crate) fn puffin_cache_capacity(&self) -> u64 {
668 self.puffin_capacity
669 }
670
671 pub(crate) fn puffin_cache_size(&self) -> u64 {
673 self.inner.puffin_index.weighted_size()
674 }
675
676 pub(crate) async fn download(
679 &self,
680 index_key: IndexKey,
681 remote_path: &str,
682 remote_store: &ObjectStore,
683 file_size: u64,
684 ) -> Result<()> {
685 self.inner
686 .download(index_key, remote_path, remote_store, file_size, 8) .await
688 }
689
690 pub(crate) fn maybe_download_background(
696 &self,
697 index_key: IndexKey,
698 remote_path: String,
699 remote_store: ObjectStore,
700 file_size: u64,
701 ) {
702 let Some(tx) = &self.download_task_tx else {
704 return;
705 };
706
707 let task = DownloadTask {
708 index_key,
709 remote_path,
710 remote_store,
711 file_size,
712 };
713
714 if let Err(e) = tx.try_send(task) {
716 debug!(
717 "Failed to queue background download task for region {}, file {}: {:?}",
718 index_key.region_id, index_key.file_id, e
719 );
720 }
721 }
722}
723
724#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
726pub struct IndexKey {
727 pub region_id: RegionId,
728 pub file_id: FileId,
729 pub file_type: FileType,
730}
731
732impl IndexKey {
733 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
735 IndexKey {
736 region_id,
737 file_id,
738 file_type,
739 }
740 }
741}
742
743impl fmt::Display for IndexKey {
744 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
745 write!(
746 f,
747 "{}.{}.{}",
748 self.region_id.as_u64(),
749 self.file_id,
750 self.file_type
751 )
752 }
753}
754
755#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
757pub enum FileType {
758 Parquet,
760 Puffin(u64),
762}
763
764impl fmt::Display for FileType {
765 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
766 match self {
767 FileType::Parquet => write!(f, "parquet"),
768 FileType::Puffin(version) => write!(f, "{}.puffin", version),
769 }
770 }
771}
772
773impl FileType {
774 pub(crate) fn parse(s: &str) -> Option<FileType> {
776 match s {
777 "parquet" => Some(FileType::Parquet),
778 "puffin" => Some(FileType::Puffin(0)),
779 _ => {
780 if let Some(version_str) = s.strip_suffix(".puffin") {
782 let version = version_str.parse::<u64>().ok()?;
783 Some(FileType::Puffin(version))
784 } else {
785 None
786 }
787 }
788 }
789 }
790
791 fn metric_label(&self) -> &'static str {
793 match self {
794 FileType::Parquet => FILE_TYPE,
795 FileType::Puffin(_) => INDEX_TYPE,
796 }
797 }
798}
799
800#[derive(Debug, Clone)]
804pub(crate) struct IndexValue {
805 pub(crate) file_size: u32,
807}
808
809fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
813 join_path(cache_file_dir, &key.to_string())
814}
815
816fn parse_index_key(name: &str) -> Option<IndexKey> {
818 let mut split = name.splitn(3, '.');
819 let region_id = split.next().and_then(|s| {
820 let id = s.parse::<u64>().ok()?;
821 Some(RegionId::from_u64(id))
822 })?;
823 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
824 let file_type = split.next().and_then(FileType::parse)?;
825
826 Some(IndexKey::new(region_id, file_id, file_type))
827}
828
829#[cfg(test)]
830mod tests {
831 use common_test_util::temp_dir::create_temp_dir;
832 use object_store::services::Fs;
833
834 use super::*;
835
836 fn new_fs_store(path: &str) -> ObjectStore {
837 let builder = Fs::default().root(path);
838 ObjectStore::new(builder).unwrap().finish()
839 }
840
841 #[tokio::test]
842 async fn test_file_cache_ttl() {
843 let dir = create_temp_dir("");
844 let local_store = new_fs_store(dir.path().to_str().unwrap());
845
846 let cache = FileCache::new(
847 local_store.clone(),
848 ReadableSize::mb(10),
849 Some(Duration::from_millis(10)),
850 None,
851 true, );
853 let region_id = RegionId::new(2000, 0);
854 let file_id = FileId::random();
855 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
856 let file_path = cache.cache_file_path(key);
857
858 assert!(cache.reader(key).await.is_none());
860
861 local_store
863 .write(&file_path, b"hello".as_slice())
864 .await
865 .unwrap();
866
867 cache
869 .put(
870 IndexKey::new(region_id, file_id, FileType::Parquet),
871 IndexValue { file_size: 5 },
872 )
873 .await;
874
875 let exist = cache.reader(key).await;
876 assert!(exist.is_some());
877 tokio::time::sleep(Duration::from_millis(15)).await;
878 cache.inner.parquet_index.run_pending_tasks().await;
879 let non = cache.reader(key).await;
880 assert!(non.is_none());
881 }
882
883 #[tokio::test]
884 async fn test_file_cache_basic() {
885 let dir = create_temp_dir("");
886 let local_store = new_fs_store(dir.path().to_str().unwrap());
887
888 let cache = FileCache::new(
889 local_store.clone(),
890 ReadableSize::mb(10),
891 None,
892 None,
893 true, );
895 let region_id = RegionId::new(2000, 0);
896 let file_id = FileId::random();
897 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
898 let file_path = cache.cache_file_path(key);
899
900 assert!(cache.reader(key).await.is_none());
902
903 local_store
905 .write(&file_path, b"hello".as_slice())
906 .await
907 .unwrap();
908 cache
910 .put(
911 IndexKey::new(region_id, file_id, FileType::Parquet),
912 IndexValue { file_size: 5 },
913 )
914 .await;
915
916 let reader = cache.reader(key).await.unwrap();
918 let buf = reader.read(..).await.unwrap().to_vec();
919 assert_eq!("hello", String::from_utf8(buf).unwrap());
920
921 cache.inner.parquet_index.run_pending_tasks().await;
923 assert_eq!(5, cache.inner.parquet_index.weighted_size());
924
925 cache.remove(key).await;
927 assert!(cache.reader(key).await.is_none());
928
929 cache.inner.parquet_index.run_pending_tasks().await;
931
932 assert!(!local_store.exists(&file_path).await.unwrap());
934 assert_eq!(0, cache.inner.parquet_index.weighted_size());
935 }
936
937 #[tokio::test]
938 async fn test_file_cache_file_removed() {
939 let dir = create_temp_dir("");
940 let local_store = new_fs_store(dir.path().to_str().unwrap());
941
942 let cache = FileCache::new(
943 local_store.clone(),
944 ReadableSize::mb(10),
945 None,
946 None,
947 true, );
949 let region_id = RegionId::new(2000, 0);
950 let file_id = FileId::random();
951 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
952 let file_path = cache.cache_file_path(key);
953
954 local_store
956 .write(&file_path, b"hello".as_slice())
957 .await
958 .unwrap();
959 cache
961 .put(
962 IndexKey::new(region_id, file_id, FileType::Parquet),
963 IndexValue { file_size: 5 },
964 )
965 .await;
966
967 local_store.delete(&file_path).await.unwrap();
969
970 assert!(cache.reader(key).await.is_none());
972 assert!(!cache.inner.parquet_index.contains_key(&key));
974 }
975
976 #[tokio::test]
977 async fn test_file_cache_recover() {
978 let dir = create_temp_dir("");
979 let local_store = new_fs_store(dir.path().to_str().unwrap());
980 let cache = FileCache::new(
981 local_store.clone(),
982 ReadableSize::mb(10),
983 None,
984 None,
985 true, );
987
988 let region_id = RegionId::new(2000, 0);
989 let file_type = FileType::Parquet;
990 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
992 let mut total_size = 0;
993 for (i, file_id) in file_ids.iter().enumerate() {
994 let key = IndexKey::new(region_id, *file_id, file_type);
995 let file_path = cache.cache_file_path(key);
996 let bytes = i.to_string().into_bytes();
997 local_store.write(&file_path, bytes.clone()).await.unwrap();
998
999 cache
1001 .put(
1002 IndexKey::new(region_id, *file_id, file_type),
1003 IndexValue {
1004 file_size: bytes.len() as u32,
1005 },
1006 )
1007 .await;
1008 total_size += bytes.len();
1009 }
1010
1011 let cache = FileCache::new(
1013 local_store.clone(),
1014 ReadableSize::mb(10),
1015 None,
1016 None,
1017 true, );
1019 assert!(
1021 cache
1022 .reader(IndexKey::new(region_id, file_ids[0], file_type))
1023 .await
1024 .is_none()
1025 );
1026 cache.recover(true, None).await;
1027
1028 cache.inner.parquet_index.run_pending_tasks().await;
1030 assert_eq!(
1031 total_size,
1032 cache.inner.parquet_index.weighted_size() as usize
1033 );
1034
1035 for (i, file_id) in file_ids.iter().enumerate() {
1036 let key = IndexKey::new(region_id, *file_id, file_type);
1037 let reader = cache.reader(key).await.unwrap();
1038 let buf = reader.read(..).await.unwrap().to_vec();
1039 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
1040 }
1041 }
1042
1043 #[tokio::test]
1044 async fn test_file_cache_read_ranges() {
1045 let dir = create_temp_dir("");
1046 let local_store = new_fs_store(dir.path().to_str().unwrap());
1047 let file_cache = FileCache::new(
1048 local_store.clone(),
1049 ReadableSize::mb(10),
1050 None,
1051 None,
1052 true, );
1054 let region_id = RegionId::new(2000, 0);
1055 let file_id = FileId::random();
1056 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
1057 let file_path = file_cache.cache_file_path(key);
1058 let data = b"hello greptime database";
1060 local_store
1061 .write(&file_path, data.as_slice())
1062 .await
1063 .unwrap();
1064 file_cache.put(key, IndexValue { file_size: 5 }).await;
1066 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
1068 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
1069
1070 assert_eq!(4, bytes.len());
1071 assert_eq!(b"hello", bytes[0].as_ref());
1072 assert_eq!(b"grep", bytes[1].as_ref());
1073 assert_eq!(b"data", bytes[2].as_ref());
1074 assert_eq!(data, bytes[3].as_ref());
1075 }
1076
1077 #[test]
1078 fn test_file_cache_capacity_respects_total_budget() {
1079 let total_capacity = ReadableSize::mb(256).as_bytes();
1080 let (parquet_capacity, puffin_capacity) =
1081 FileCache::split_cache_capacities(total_capacity, 20);
1082
1083 assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
1084 assert_eq!(ReadableSize::mb(128).as_bytes(), parquet_capacity);
1085 assert_eq!(ReadableSize::mb(128).as_bytes(), puffin_capacity);
1086 }
1087
1088 #[test]
1089 fn test_file_cache_capacity_keeps_split_when_total_allows_it() {
1090 let total_capacity = ReadableSize::gb(5).as_bytes();
1091 let (parquet_capacity, puffin_capacity) =
1092 FileCache::split_cache_capacities(total_capacity, 20);
1093
1094 assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
1095 assert_eq!(ReadableSize::gb(4).as_bytes(), parquet_capacity);
1096 assert_eq!(ReadableSize::gb(1).as_bytes(), puffin_capacity);
1097 }
1098
1099 #[test]
1100 fn test_cache_file_path() {
1101 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1102 assert_eq!(
1103 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1104 cache_file_path(
1105 "test_dir",
1106 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1107 )
1108 );
1109 assert_eq!(
1110 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1111 cache_file_path(
1112 "test_dir/",
1113 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1114 )
1115 );
1116 }
1117
1118 #[test]
1119 fn test_parse_file_name() {
1120 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1121 let region_id = RegionId::new(1234, 5);
1122 assert_eq!(
1123 IndexKey::new(region_id, file_id, FileType::Parquet),
1124 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
1125 );
1126 assert_eq!(
1127 IndexKey::new(region_id, file_id, FileType::Puffin(0)),
1128 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
1129 );
1130 assert_eq!(
1131 IndexKey::new(region_id, file_id, FileType::Puffin(42)),
1132 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
1133 .unwrap()
1134 );
1135 assert!(parse_index_key("").is_none());
1136 assert!(parse_index_key(".").is_none());
1137 assert!(parse_index_key("5299989643269").is_none());
1138 assert!(parse_index_key("5299989643269.").is_none());
1139 assert!(parse_index_key(".5299989643269").is_none());
1140 assert!(parse_index_key("5299989643269.").is_none());
1141 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
1142 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
1143 assert!(
1144 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
1145 );
1146 assert!(
1147 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
1148 .is_none()
1149 );
1150 }
1151}