diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index ced8f9e025..d3a9d4d134 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -398,6 +398,19 @@ impl CacheManager { } } + /// Returns the total weighted size of the in-memory SST meta cache. + pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 { + self.sst_meta_cache + .as_ref() + .map(|cache| cache.weighted_size()) + .unwrap_or(0) + } + + /// Returns true if the in-memory SST meta cache is enabled. + pub(crate) fn sst_meta_cache_enabled(&self) -> bool { + self.sst_meta_cache.is_some() + } + /// Gets a vector with repeated value for specific `key`. pub fn get_repeated_vector( &self, diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 302e0d3902..1cbf6e5d2a 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -17,7 +17,7 @@ use std::any::TypeId; use std::collections::HashMap; use std::sync::atomic::{AtomicI64, AtomicU64}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, LazyLock, Mutex}; use std::time::Instant; use common_telemetry::{debug, error, info, warn}; @@ -38,6 +38,7 @@ use store_api::metadata::{ use store_api::region_engine::RegionRole; use store_api::region_request::PathType; use store_api::storage::{ColumnId, RegionId}; +use tokio::sync::Semaphore; use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; @@ -75,6 +76,11 @@ use crate::time_provider::TimeProviderRef; use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; +const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8; + +static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock = + LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY)); + /// A fetcher to retrieve partition expr for a region. /// /// Compatibility: older regions didn't persist `partition_expr` in engine metadata, @@ -950,19 +956,40 @@ fn maybe_load_cache( /// /// This improves the latency of the first query after server start by avoiding large Parquet /// metadata reads on demand. +/// +/// The preload is best-effort: +/// - Always tries to warm from the local write cache (file cache) first. +/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata +/// directly from the local store. +/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...). async fn preload_parquet_meta_cache_for_files( region_id: RegionId, + cache_manager: CacheManagerRef, + sst_meta_cache_capacity: u64, table_dir: String, path_type: PathType, object_store: object_store::ObjectStore, - cache_manager: CacheManagerRef, mut files: Vec, ) -> usize { - // Load older files first so the most recent files remain hot in the LRU cache. - files.sort_by(|a, b| a.meta_ref().time_range.1.cmp(&b.meta_ref().time_range.1)); + if !cache_manager.sst_meta_cache_enabled() + || sst_meta_cache_capacity == 0 + || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity + { + return 0; + } + + let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs); + + // Sort by time range so we can prefer preloading newer files first. + files.sort_by(|a, b| b.meta_ref().time_range.1.cmp(&a.meta_ref().time_range.1)); let mut loaded = 0usize; for file_handle in files { + // Stop when the shared SST meta cache is full. + if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity { + break; + } + let file_id = file_handle.file_id(); let mut cache_metrics = MetadataCacheMetrics::default(); if cache_manager @@ -977,6 +1004,10 @@ async fn preload_parquet_meta_cache_for_files( continue; } + if !allow_direct_load { + continue; + } + let file_size = file_handle.meta_ref().file_size; let file_path = file_handle.file_path(&table_dir, path_type); let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); @@ -988,7 +1019,7 @@ async fn preload_parquet_meta_cache_for_files( Err(err) => { // Preloading is best-effort. Failure shouldn't affect region open. warn!( - err; "Failed to preload parquet metadata, region: {}, file: {}", + err; "Failed to preload parquet metadata from local store, region: {}, file: {}", region_id, file_path ); } @@ -1006,16 +1037,26 @@ fn maybe_preload_parquet_meta_cache( let Some(cache_manager) = cache_manager else { return; }; + if !cache_manager.sst_meta_cache_enabled() { + return; + } // Skip if SST meta cache is disabled. if config.sst_meta_cache_size.as_bytes() == 0 { return; } + if !config.preload_index_cache { + return; + } let region = region.clone(); let cache_manager = cache_manager.clone(); + let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes(); tokio::spawn(async move { + // Safety: semaphore must exist. + let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap(); + let region_id = region.region_id; let table_dir = region.access_layer.table_dir().to_string(); let path_type = region.access_layer.path_type(); @@ -1031,20 +1072,25 @@ fn maybe_preload_parquet_meta_cache( } } } + let preloading_start = Instant::now(); let loaded = preload_parquet_meta_cache_for_files( region_id, + cache_manager, + sst_meta_cache_capacity, table_dir, path_type, object_store, - cache_manager, files, ) .await; + let preloading_cost = preloading_start.elapsed(); if loaded > 0 { info!( - "Preloaded parquet metadata for region {}, loaded_files: {}", - region_id, loaded + "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}", + region_id, + loaded, + preloading_cost.as_millis() ); } }); @@ -1070,11 +1116,12 @@ mod tests { use std::sync::Arc; use common_base::readable_size::ReadableSize; + use common_test_util::temp_dir::create_temp_dir; use common_time::Timestamp; use datatypes::arrow::array::{ArrayRef, Int64Array}; use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; - use object_store::services::Memory; + use object_store::services::{Fs, Memory}; use parquet::arrow::ArrowWriter; use store_api::region_request::PathType; use store_api::storage::{FileId, RegionId}; @@ -1156,19 +1203,18 @@ mod tests { .is_none() ); - // Provide an empty remote object store (so object-store load should fail if attempted). - let missing_store = ObjectStore::new(Memory::default()).unwrap().finish(); let loaded = preload_parquet_meta_cache_for_files( region_id, + cache_manager.clone(), + 1024 * 1024, table_dir.to_string(), path_type, - missing_store, - cache_manager.clone(), + source_store.clone(), vec![file_handle], ) .await; - // Should warm the in-memory cache from file cache even if remote store misses. + // Should warm the in-memory cache from the local file cache. assert_eq!(loaded, 1); assert!( cache_manager @@ -1178,8 +1224,7 @@ mod tests { } #[tokio::test] - async fn test_preload_parquet_meta_cache_with_unknown_file_size() { - let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() { let cache_manager = Arc::new( CacheManager::builder() .sst_meta_cache_size(1024 * 1024) @@ -1189,15 +1234,7 @@ mod tests { let region_id = RegionId::new(1, 1); let file_id = FileId::random(); - let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; - let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); - let mut parquet_bytes = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat - // the object store to retrieve it. + // Without a local file cache entry, preloading should skip the file. let file_meta = FileMeta { region_id, file_id, @@ -1220,8 +1257,11 @@ mod tests { let table_dir = "test_table"; let path_type = PathType::Bare; let remote_path = file_handle.file_path(table_dir, path_type); + + // Even if the remote object store has the file, we should not preload from it. + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); object_store - .write(&remote_path, parquet_bytes) + .write(&remote_path, b"noop".as_slice()) .await .unwrap(); @@ -1234,10 +1274,86 @@ mod tests { let loaded = preload_parquet_meta_cache_for_files( region_id, + cache_manager.clone(), + 1024 * 1024, table_dir.to_string(), path_type, object_store, + vec![file_handle], + ) + .await; + + assert_eq!(loaded, 0); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_none() + ); + } + + #[tokio::test] + async fn test_preload_parquet_meta_cache_loads_from_local_fs() { + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(1024 * 1024) + .build(), + ); + + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut parquet_bytes = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat + // the local filesystem to retrieve it. + let file_meta = FileMeta { + region_id, + file_id, + time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)), + level: 0, + file_size: 0, + max_row_group_uncompressed_size: 0, + available_indexes: Default::default(), + indexes: vec![], + index_file_size: 0, + index_version: 0, + num_rows: 3, + num_row_groups: 1, + sequence: None, + partition_expr: None, + num_series: 0, + }; + let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); + + let table_dir = "test_table"; + let path_type = PathType::Bare; + let file_path = file_handle.file_path(table_dir, path_type); + + let root = create_temp_dir("parquet-meta-preload"); + let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap())) + .unwrap() + .finish(); + object_store.write(&file_path, parquet_bytes).await.unwrap(); + + let region_file_id = file_handle.file_id(); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_none() + ); + + let loaded = preload_parquet_meta_cache_for_files( + region_id, cache_manager.clone(), + 1024 * 1024, + table_dir.to_string(), + path_type, + object_store, vec![file_handle], ) .await;