1use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::{Sender, UnboundedReceiver};
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40 CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41 WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46
47const FILE_DIR: &str = "cache/object/write/";
51
52pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
54
55const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
57
58const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
60
61struct DownloadTask {
63 index_key: IndexKey,
64 remote_path: String,
65 remote_store: ObjectStore,
66 file_size: u64,
67}
68
69#[derive(Debug)]
71struct FileCacheInner {
72 local_store: ObjectStore,
74 parquet_index: Cache<IndexKey, IndexValue>,
76 puffin_index: Cache<IndexKey, IndexValue>,
78}
79
80impl FileCacheInner {
81 fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
83 match file_type {
84 FileType::Parquet => &self.parquet_index,
85 FileType::Puffin { .. } => &self.puffin_index,
86 }
87 }
88
89 fn cache_file_path(&self, key: IndexKey) -> String {
91 cache_file_path(FILE_DIR, key)
92 }
93
94 async fn put(&self, key: IndexKey, value: IndexValue) {
98 CACHE_BYTES
99 .with_label_values(&[key.file_type.metric_label()])
100 .add(value.file_size.into());
101 let index = self.memory_index(key.file_type);
102 index.insert(key, value).await;
103
104 index.run_pending_tasks().await;
106 }
107
108 async fn recover(&self) -> Result<()> {
110 let now = Instant::now();
111 let mut lister = self
112 .local_store
113 .lister_with(FILE_DIR)
114 .await
115 .context(OpenDalSnafu)?;
116 let (mut total_size, mut total_keys) = (0i64, 0);
119 let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
120 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
121 let meta = entry.metadata();
122 if !meta.is_file() {
123 continue;
124 }
125 let Some(key) = parse_index_key(entry.name()) else {
126 continue;
127 };
128
129 let meta = self
130 .local_store
131 .stat(entry.path())
132 .await
133 .context(OpenDalSnafu)?;
134 let file_size = meta.content_length() as u32;
135 let index = self.memory_index(key.file_type);
136 index.insert(key, IndexValue { file_size }).await;
137 let size = i64::from(file_size);
138 total_size += size;
139 total_keys += 1;
140
141 match key.file_type {
143 FileType::Parquet => parquet_size += size,
144 FileType::Puffin { .. } => puffin_size += size,
145 }
146 }
147 CACHE_BYTES
149 .with_label_values(&[FILE_TYPE])
150 .add(parquet_size);
151 CACHE_BYTES
152 .with_label_values(&[INDEX_TYPE])
153 .add(puffin_size);
154
155 self.parquet_index.run_pending_tasks().await;
158 self.puffin_index.run_pending_tasks().await;
159
160 let parquet_weight = self.parquet_index.weighted_size();
161 let parquet_count = self.parquet_index.entry_count();
162 let puffin_weight = self.puffin_index.weighted_size();
163 let puffin_count = self.puffin_index.entry_count();
164 info!(
165 "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
166 total_keys,
167 total_size,
168 parquet_count,
169 parquet_weight,
170 puffin_count,
171 puffin_weight,
172 now.elapsed()
173 );
174 Ok(())
175 }
176
177 async fn download_without_cleaning(
179 &self,
180 index_key: IndexKey,
181 remote_path: &str,
182 remote_store: &ObjectStore,
183 file_size: u64,
184 concurrency: usize,
185 ) -> Result<()> {
186 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
187
188 let file_type = index_key.file_type;
189 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
190 .with_label_values(&[match file_type {
191 FileType::Parquet => "download_parquet",
192 FileType::Puffin { .. } => "download_puffin",
193 }])
194 .start_timer();
195
196 let reader = remote_store
197 .reader_with(remote_path)
198 .concurrent(concurrency)
199 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
200 .await
201 .context(error::OpenDalSnafu)?
202 .into_futures_async_read(0..file_size)
203 .await
204 .context(error::OpenDalSnafu)?;
205
206 let cache_path = self.cache_file_path(index_key);
207 let mut writer = self
208 .local_store
209 .writer(&cache_path)
210 .await
211 .context(error::OpenDalSnafu)?
212 .into_futures_async_write();
213
214 let region_id = index_key.region_id;
215 let file_id = index_key.file_id;
216 let bytes_written =
217 futures::io::copy(reader, &mut writer)
218 .await
219 .context(error::DownloadSnafu {
220 region_id,
221 file_id,
222 file_type,
223 })?;
224 writer.close().await.context(error::DownloadSnafu {
225 region_id,
226 file_id,
227 file_type,
228 })?;
229
230 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
231
232 let elapsed = timer.stop_and_record();
233 debug!(
234 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
235 remote_path, cache_path, bytes_written, region_id, elapsed,
236 );
237
238 let index_value = IndexValue {
239 file_size: bytes_written as _,
240 };
241 self.put(index_key, index_value).await;
242 Ok(())
243 }
244
245 async fn download(
247 &self,
248 index_key: IndexKey,
249 remote_path: &str,
250 remote_store: &ObjectStore,
251 file_size: u64,
252 concurrency: usize,
253 ) -> Result<()> {
254 if let Err(e) = self
255 .download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
256 .await
257 {
258 error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
259
260 let filename = index_key.to_string();
261 TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
262
263 return Err(e);
264 }
265
266 Ok(())
267 }
268
269 fn contains_key(&self, key: &IndexKey) -> bool {
271 self.memory_index(key.file_type).contains_key(key)
272 }
273}
274
275#[derive(Debug, Clone)]
278pub(crate) struct FileCache {
279 inner: Arc<FileCacheInner>,
281 puffin_capacity: u64,
283 download_task_tx: Option<Sender<DownloadTask>>,
285}
286
287pub(crate) type FileCacheRef = Arc<FileCache>;
288
289impl FileCache {
290 pub(crate) fn new(
292 local_store: ObjectStore,
293 capacity: ReadableSize,
294 ttl: Option<Duration>,
295 index_cache_percent: Option<u8>,
296 enable_background_worker: bool,
297 ) -> FileCache {
298 let index_percent = index_cache_percent
300 .filter(|&percent| percent > 0 && percent < 100)
301 .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
302 let total_capacity = capacity.as_bytes();
303
304 let index_ratio = index_percent as f64 / 100.0;
306 let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
307 let parquet_capacity = total_capacity - puffin_capacity;
308
309 let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
311 let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
312
313 info!(
314 "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
315 index_percent,
316 ReadableSize(total_capacity),
317 ReadableSize(parquet_capacity),
318 ReadableSize(puffin_capacity)
319 );
320
321 let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
322 let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
323
324 let inner = Arc::new(FileCacheInner {
326 local_store,
327 parquet_index,
328 puffin_index,
329 });
330
331 let download_task_tx = if enable_background_worker {
333 let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
334 Self::spawn_download_worker(inner.clone(), rx);
335 Some(tx)
336 } else {
337 None
338 };
339
340 FileCache {
341 inner,
342 puffin_capacity,
343 download_task_tx,
344 }
345 }
346
347 fn spawn_download_worker(
349 inner: Arc<FileCacheInner>,
350 mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
351 ) {
352 tokio::spawn(async move {
353 info!("Background download worker started");
354 while let Some(task) = download_task_rx.recv().await {
355 if inner.contains_key(&task.index_key) {
357 debug!(
358 "Skipping background download for region {}, file {} - already in cache",
359 task.index_key.region_id, task.index_key.file_id
360 );
361 continue;
362 }
363
364 let _ = inner
366 .download(
367 task.index_key,
368 &task.remote_path,
369 &task.remote_store,
370 task.file_size,
371 1, )
373 .await;
374 }
375 info!("Background download worker stopped");
376 });
377 }
378
379 fn build_cache(
381 local_store: ObjectStore,
382 capacity: u64,
383 ttl: Option<Duration>,
384 label: &'static str,
385 ) -> Cache<IndexKey, IndexValue> {
386 let cache_store = local_store;
387 let mut builder = Cache::builder()
388 .eviction_policy(EvictionPolicy::lru())
389 .weigher(|_key, value: &IndexValue| -> u32 {
390 value.file_size
392 })
393 .max_capacity(capacity)
394 .async_eviction_listener(move |key, value, cause| {
395 let store = cache_store.clone();
396 let file_path = cache_file_path(FILE_DIR, *key);
398 async move {
399 if let RemovalCause::Replaced = cause {
400 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
403 return;
404 }
405
406 match store.delete(&file_path).await {
407 Ok(()) => {
408 CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
409 }
410 Err(e) => {
411 warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
412 }
413 }
414 }
415 .boxed()
416 });
417 if let Some(ttl) = ttl {
418 builder = builder.time_to_idle(ttl);
419 }
420 builder.build()
421 }
422
423 pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
427 self.inner.put(key, value).await
428 }
429
430 pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
431 self.inner.memory_index(key.file_type).get(&key).await
432 }
433
434 #[allow(unused)]
436 pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
437 let index = self.inner.memory_index(key.file_type);
440 if index.get(&key).await.is_none() {
441 CACHE_MISS
442 .with_label_values(&[key.file_type.metric_label()])
443 .inc();
444 return None;
445 }
446
447 let file_path = self.inner.cache_file_path(key);
448 match self.get_reader(&file_path).await {
449 Ok(Some(reader)) => {
450 CACHE_HIT
451 .with_label_values(&[key.file_type.metric_label()])
452 .inc();
453 return Some(reader);
454 }
455 Err(e) => {
456 if e.kind() != ErrorKind::NotFound {
457 warn!(e; "Failed to get file for key {:?}", key);
458 }
459 }
460 Ok(None) => {}
461 }
462
463 index.remove(&key).await;
465 CACHE_MISS
466 .with_label_values(&[key.file_type.metric_label()])
467 .inc();
468 None
469 }
470
471 pub(crate) async fn read_ranges(
473 &self,
474 key: IndexKey,
475 ranges: &[Range<u64>],
476 ) -> Option<Vec<Bytes>> {
477 let index = self.inner.memory_index(key.file_type);
478 if index.get(&key).await.is_none() {
479 CACHE_MISS
480 .with_label_values(&[key.file_type.metric_label()])
481 .inc();
482 return None;
483 }
484
485 let file_path = self.inner.cache_file_path(key);
486 let bytes_result =
489 fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
490 match bytes_result {
491 Ok(bytes) => {
492 CACHE_HIT
493 .with_label_values(&[key.file_type.metric_label()])
494 .inc();
495 Some(bytes)
496 }
497 Err(e) => {
498 if e.kind() != ErrorKind::NotFound {
499 warn!(e; "Failed to get file for key {:?}", key);
500 }
501
502 index.remove(&key).await;
504 CACHE_MISS
505 .with_label_values(&[key.file_type.metric_label()])
506 .inc();
507 None
508 }
509 }
510 }
511
512 pub(crate) async fn remove(&self, key: IndexKey) {
516 let file_path = self.inner.cache_file_path(key);
517 self.inner.memory_index(key.file_type).remove(&key).await;
518 if let Err(e) = self.inner.local_store.delete(&file_path).await {
520 warn!(e; "Failed to delete a cached file {}", file_path);
521 }
522 }
523
524 pub(crate) async fn recover(
529 &self,
530 sync: bool,
531 task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
532 ) {
533 let moved_self = self.clone();
534 let handle = tokio::spawn(async move {
535 if let Err(err) = moved_self.inner.recover().await {
536 error!(err; "Failed to recover file cache.")
537 }
538
539 if let Some(mut receiver) = task_receiver {
542 info!("Spawning background task for processing region load cache tasks");
543 tokio::spawn(async move {
544 while let Some(task) = receiver.recv().await {
545 task.fill_cache(&moved_self).await;
546 }
547 info!("Background task for processing region load cache tasks stopped");
548 });
549 }
550 });
551
552 if sync {
553 let _ = handle.await;
554 }
555 }
556
557 pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
559 self.inner.cache_file_path(key)
560 }
561
562 pub(crate) fn local_store(&self) -> ObjectStore {
564 self.inner.local_store.clone()
565 }
566
567 pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
570 if let Some(index_value) = self.inner.parquet_index.get(&key).await {
572 let local_store = self.local_store();
574 let file_path = self.inner.cache_file_path(key);
575 let file_size = index_value.file_size as u64;
576 let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
577
578 match metadata_loader.load().await {
579 Ok(metadata) => {
580 CACHE_HIT
581 .with_label_values(&[key.file_type.metric_label()])
582 .inc();
583 Some(metadata)
584 }
585 Err(e) => {
586 if !e.is_object_not_found() {
587 warn!(
588 e; "Failed to get parquet metadata for key {:?}",
589 key
590 );
591 }
592 self.inner.parquet_index.remove(&key).await;
594 CACHE_MISS
595 .with_label_values(&[key.file_type.metric_label()])
596 .inc();
597 None
598 }
599 }
600 } else {
601 CACHE_MISS
602 .with_label_values(&[key.file_type.metric_label()])
603 .inc();
604 None
605 }
606 }
607
608 async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
609 if self.inner.local_store.exists(file_path).await? {
610 Ok(Some(self.inner.local_store.reader(file_path).await?))
611 } else {
612 Ok(None)
613 }
614 }
615
616 pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
618 self.inner.contains_key(key)
619 }
620
621 pub(crate) fn puffin_cache_capacity(&self) -> u64 {
623 self.puffin_capacity
624 }
625
626 pub(crate) fn puffin_cache_size(&self) -> u64 {
628 self.inner.puffin_index.weighted_size()
629 }
630
631 pub(crate) async fn download(
634 &self,
635 index_key: IndexKey,
636 remote_path: &str,
637 remote_store: &ObjectStore,
638 file_size: u64,
639 ) -> Result<()> {
640 self.inner
641 .download(index_key, remote_path, remote_store, file_size, 8) .await
643 }
644
645 pub(crate) fn maybe_download_background(
651 &self,
652 index_key: IndexKey,
653 remote_path: String,
654 remote_store: ObjectStore,
655 file_size: u64,
656 ) {
657 let Some(tx) = &self.download_task_tx else {
659 return;
660 };
661
662 let task = DownloadTask {
663 index_key,
664 remote_path,
665 remote_store,
666 file_size,
667 };
668
669 if let Err(e) = tx.try_send(task) {
671 debug!(
672 "Failed to queue background download task for region {}, file {}: {:?}",
673 index_key.region_id, index_key.file_id, e
674 );
675 }
676 }
677}
678
679#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
681pub struct IndexKey {
682 pub region_id: RegionId,
683 pub file_id: FileId,
684 pub file_type: FileType,
685}
686
687impl IndexKey {
688 pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
690 IndexKey {
691 region_id,
692 file_id,
693 file_type,
694 }
695 }
696}
697
698impl fmt::Display for IndexKey {
699 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
700 write!(
701 f,
702 "{}.{}.{}",
703 self.region_id.as_u64(),
704 self.file_id,
705 self.file_type
706 )
707 }
708}
709
710#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
712pub enum FileType {
713 Parquet,
715 Puffin(u64),
717}
718
719impl fmt::Display for FileType {
720 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721 match self {
722 FileType::Parquet => write!(f, "parquet"),
723 FileType::Puffin(version) => write!(f, "{}.puffin", version),
724 }
725 }
726}
727
728impl FileType {
729 pub(crate) fn parse(s: &str) -> Option<FileType> {
731 match s {
732 "parquet" => Some(FileType::Parquet),
733 "puffin" => Some(FileType::Puffin(0)),
734 _ => {
735 if let Some(version_str) = s.strip_suffix(".puffin") {
737 let version = version_str.parse::<u64>().ok()?;
738 Some(FileType::Puffin(version))
739 } else {
740 None
741 }
742 }
743 }
744 }
745
746 fn metric_label(&self) -> &'static str {
748 match self {
749 FileType::Parquet => FILE_TYPE,
750 FileType::Puffin(_) => INDEX_TYPE,
751 }
752 }
753}
754
755#[derive(Debug, Clone)]
759pub(crate) struct IndexValue {
760 pub(crate) file_size: u32,
762}
763
764fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
768 join_path(cache_file_dir, &key.to_string())
769}
770
771fn parse_index_key(name: &str) -> Option<IndexKey> {
773 let mut split = name.splitn(3, '.');
774 let region_id = split.next().and_then(|s| {
775 let id = s.parse::<u64>().ok()?;
776 Some(RegionId::from_u64(id))
777 })?;
778 let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
779 let file_type = split.next().and_then(FileType::parse)?;
780
781 Some(IndexKey::new(region_id, file_id, file_type))
782}
783
784#[cfg(test)]
785mod tests {
786 use common_test_util::temp_dir::create_temp_dir;
787 use object_store::services::Fs;
788
789 use super::*;
790
791 fn new_fs_store(path: &str) -> ObjectStore {
792 let builder = Fs::default().root(path);
793 ObjectStore::new(builder).unwrap().finish()
794 }
795
796 #[tokio::test]
797 async fn test_file_cache_ttl() {
798 let dir = create_temp_dir("");
799 let local_store = new_fs_store(dir.path().to_str().unwrap());
800
801 let cache = FileCache::new(
802 local_store.clone(),
803 ReadableSize::mb(10),
804 Some(Duration::from_millis(10)),
805 None,
806 true, );
808 let region_id = RegionId::new(2000, 0);
809 let file_id = FileId::random();
810 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
811 let file_path = cache.cache_file_path(key);
812
813 assert!(cache.reader(key).await.is_none());
815
816 local_store
818 .write(&file_path, b"hello".as_slice())
819 .await
820 .unwrap();
821
822 cache
824 .put(
825 IndexKey::new(region_id, file_id, FileType::Parquet),
826 IndexValue { file_size: 5 },
827 )
828 .await;
829
830 let exist = cache.reader(key).await;
831 assert!(exist.is_some());
832 tokio::time::sleep(Duration::from_millis(15)).await;
833 cache.inner.parquet_index.run_pending_tasks().await;
834 let non = cache.reader(key).await;
835 assert!(non.is_none());
836 }
837
838 #[tokio::test]
839 async fn test_file_cache_basic() {
840 let dir = create_temp_dir("");
841 let local_store = new_fs_store(dir.path().to_str().unwrap());
842
843 let cache = FileCache::new(
844 local_store.clone(),
845 ReadableSize::mb(10),
846 None,
847 None,
848 true, );
850 let region_id = RegionId::new(2000, 0);
851 let file_id = FileId::random();
852 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
853 let file_path = cache.cache_file_path(key);
854
855 assert!(cache.reader(key).await.is_none());
857
858 local_store
860 .write(&file_path, b"hello".as_slice())
861 .await
862 .unwrap();
863 cache
865 .put(
866 IndexKey::new(region_id, file_id, FileType::Parquet),
867 IndexValue { file_size: 5 },
868 )
869 .await;
870
871 let reader = cache.reader(key).await.unwrap();
873 let buf = reader.read(..).await.unwrap().to_vec();
874 assert_eq!("hello", String::from_utf8(buf).unwrap());
875
876 cache.inner.parquet_index.run_pending_tasks().await;
878 assert_eq!(5, cache.inner.parquet_index.weighted_size());
879
880 cache.remove(key).await;
882 assert!(cache.reader(key).await.is_none());
883
884 cache.inner.parquet_index.run_pending_tasks().await;
886
887 assert!(!local_store.exists(&file_path).await.unwrap());
889 assert_eq!(0, cache.inner.parquet_index.weighted_size());
890 }
891
892 #[tokio::test]
893 async fn test_file_cache_file_removed() {
894 let dir = create_temp_dir("");
895 let local_store = new_fs_store(dir.path().to_str().unwrap());
896
897 let cache = FileCache::new(
898 local_store.clone(),
899 ReadableSize::mb(10),
900 None,
901 None,
902 true, );
904 let region_id = RegionId::new(2000, 0);
905 let file_id = FileId::random();
906 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
907 let file_path = cache.cache_file_path(key);
908
909 local_store
911 .write(&file_path, b"hello".as_slice())
912 .await
913 .unwrap();
914 cache
916 .put(
917 IndexKey::new(region_id, file_id, FileType::Parquet),
918 IndexValue { file_size: 5 },
919 )
920 .await;
921
922 local_store.delete(&file_path).await.unwrap();
924
925 assert!(cache.reader(key).await.is_none());
927 assert!(!cache.inner.parquet_index.contains_key(&key));
929 }
930
931 #[tokio::test]
932 async fn test_file_cache_recover() {
933 let dir = create_temp_dir("");
934 let local_store = new_fs_store(dir.path().to_str().unwrap());
935 let cache = FileCache::new(
936 local_store.clone(),
937 ReadableSize::mb(10),
938 None,
939 None,
940 true, );
942
943 let region_id = RegionId::new(2000, 0);
944 let file_type = FileType::Parquet;
945 let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
947 let mut total_size = 0;
948 for (i, file_id) in file_ids.iter().enumerate() {
949 let key = IndexKey::new(region_id, *file_id, file_type);
950 let file_path = cache.cache_file_path(key);
951 let bytes = i.to_string().into_bytes();
952 local_store.write(&file_path, bytes.clone()).await.unwrap();
953
954 cache
956 .put(
957 IndexKey::new(region_id, *file_id, file_type),
958 IndexValue {
959 file_size: bytes.len() as u32,
960 },
961 )
962 .await;
963 total_size += bytes.len();
964 }
965
966 let cache = FileCache::new(
968 local_store.clone(),
969 ReadableSize::mb(10),
970 None,
971 None,
972 true, );
974 assert!(
976 cache
977 .reader(IndexKey::new(region_id, file_ids[0], file_type))
978 .await
979 .is_none()
980 );
981 cache.recover(true, None).await;
982
983 cache.inner.parquet_index.run_pending_tasks().await;
985 assert_eq!(
986 total_size,
987 cache.inner.parquet_index.weighted_size() as usize
988 );
989
990 for (i, file_id) in file_ids.iter().enumerate() {
991 let key = IndexKey::new(region_id, *file_id, file_type);
992 let reader = cache.reader(key).await.unwrap();
993 let buf = reader.read(..).await.unwrap().to_vec();
994 assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
995 }
996 }
997
998 #[tokio::test]
999 async fn test_file_cache_read_ranges() {
1000 let dir = create_temp_dir("");
1001 let local_store = new_fs_store(dir.path().to_str().unwrap());
1002 let file_cache = FileCache::new(
1003 local_store.clone(),
1004 ReadableSize::mb(10),
1005 None,
1006 None,
1007 true, );
1009 let region_id = RegionId::new(2000, 0);
1010 let file_id = FileId::random();
1011 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
1012 let file_path = file_cache.cache_file_path(key);
1013 let data = b"hello greptime database";
1015 local_store
1016 .write(&file_path, data.as_slice())
1017 .await
1018 .unwrap();
1019 file_cache.put(key, IndexValue { file_size: 5 }).await;
1021 let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
1023 let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
1024
1025 assert_eq!(4, bytes.len());
1026 assert_eq!(b"hello", bytes[0].as_ref());
1027 assert_eq!(b"grep", bytes[1].as_ref());
1028 assert_eq!(b"data", bytes[2].as_ref());
1029 assert_eq!(data, bytes[3].as_ref());
1030 }
1031
1032 #[test]
1033 fn test_cache_file_path() {
1034 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1035 assert_eq!(
1036 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1037 cache_file_path(
1038 "test_dir",
1039 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1040 )
1041 );
1042 assert_eq!(
1043 "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1044 cache_file_path(
1045 "test_dir/",
1046 IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1047 )
1048 );
1049 }
1050
1051 #[test]
1052 fn test_parse_file_name() {
1053 let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1054 let region_id = RegionId::new(1234, 5);
1055 assert_eq!(
1056 IndexKey::new(region_id, file_id, FileType::Parquet),
1057 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
1058 );
1059 assert_eq!(
1060 IndexKey::new(region_id, file_id, FileType::Puffin(0)),
1061 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
1062 );
1063 assert_eq!(
1064 IndexKey::new(region_id, file_id, FileType::Puffin(42)),
1065 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
1066 .unwrap()
1067 );
1068 assert!(parse_index_key("").is_none());
1069 assert!(parse_index_key(".").is_none());
1070 assert!(parse_index_key("5299989643269").is_none());
1071 assert!(parse_index_key("5299989643269.").is_none());
1072 assert!(parse_index_key(".5299989643269").is_none());
1073 assert!(parse_index_key("5299989643269.").is_none());
1074 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
1075 assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
1076 assert!(
1077 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
1078 );
1079 assert!(
1080 parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
1081 .is_none()
1082 );
1083 }
1084}