From e16f8230b03b53366766e0c1e50b4dc858143be2 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 18 Mar 2026 02:56:29 +0800 Subject: [PATCH] take optional pre-exist metadata Signed-off-by: Ruihang Xia --- src/mito2/src/access_layer.rs | 2 + src/mito2/src/cache.rs | 112 +++++++++++++++++++++------------ src/mito2/src/region/opener.rs | 2 +- 3 files changed, 76 insertions(+), 40 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 92c8a3bc36..0a012c7221 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -339,6 +339,7 @@ impl AccessLayer { metrics: &mut Metrics, ) -> Result { let region_id = request.metadata.region_id; + let region_metadata = request.metadata.clone(); let cache_manager = request.cache_manager.clone(); let sst_info = if let Some(write_cache) = cache_manager.write_cache() { @@ -412,6 +413,7 @@ impl AccessLayer { cache_manager.put_parquet_meta_data( RegionFileId::new(region_id, sst.file_id), parquet_metadata.clone(), + Some(region_metadata.clone()), ) } } diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 9e1887e126..30cebf5007 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -83,38 +83,44 @@ pub(crate) struct CachedSstMeta { impl CachedSstMeta { pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result { - let (region_metadata, region_metadata_weight) = { - let file_metadata = parquet_metadata.file_metadata(); - let key_values = file_metadata - .key_value_metadata() - .context(InvalidParquetSnafu { - file: file_path, - reason: "missing key value meta", - })?; - let meta_value = key_values - .iter() - .find(|kv| kv.key == PARQUET_METADATA_KEY) - .with_context(|| InvalidParquetSnafu { - file: file_path, - reason: format!("key {} not found", PARQUET_METADATA_KEY), - })?; - let json = meta_value - .value - .as_ref() - .with_context(|| InvalidParquetSnafu { - file: file_path, - reason: format!("No value for key {}", PARQUET_METADATA_KEY), - })?; - let region_metadata = Arc::new( + Self::try_new_with_region_metadata(file_path, parquet_metadata, None) + } + + pub(crate) fn try_new_with_region_metadata( + file_path: &str, + parquet_metadata: ParquetMetaData, + region_metadata: Option, + ) -> Result { + let file_metadata = parquet_metadata.file_metadata(); + let key_values = file_metadata + .key_value_metadata() + .context(InvalidParquetSnafu { + file: file_path, + reason: "missing key value meta", + })?; + let meta_value = key_values + .iter() + .find(|kv| kv.key == PARQUET_METADATA_KEY) + .with_context(|| InvalidParquetSnafu { + file: file_path, + reason: format!("key {} not found", PARQUET_METADATA_KEY), + })?; + let json = meta_value + .value + .as_ref() + .with_context(|| InvalidParquetSnafu { + file: file_path, + reason: format!("No value for key {}", PARQUET_METADATA_KEY), + })?; + let region_metadata = match region_metadata { + Some(region_metadata) => region_metadata, + None => Arc::new( store_api::metadata::RegionMetadata::from_json(json) .context(InvalidMetadataSnafu)?, - ); - // Keep the previous JSON-byte floor and charge the decoded structures as well. - ( - region_metadata.clone(), - region_metadata.estimated_size().max(json.len()), - ) + ), }; + // Keep the previous JSON-byte floor and charge the decoded structures as well. + let region_metadata_weight = region_metadata.estimated_size().max(json.len()); let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata)); Ok(Self { @@ -233,13 +239,15 @@ impl CacheStrategy { } /// Calls [CacheManager::put_parquet_meta_data()]. - pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc) { + pub fn put_parquet_meta_data( + &self, + file_id: RegionFileId, + metadata: Arc, + region_metadata: Option, + ) { match self { - CacheStrategy::EnableAll(cache_manager) => { - cache_manager.put_parquet_meta_data(file_id, metadata); - } - CacheStrategy::Compaction(cache_manager) => { - cache_manager.put_parquet_meta_data(file_id, metadata); + CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => { + cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata); } CacheStrategy::Disabled => {} } @@ -526,14 +534,23 @@ impl CacheManager { } /// Puts [ParquetMetaData] into the cache. - pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc) { + pub fn put_parquet_meta_data( + &self, + file_id: RegionFileId, + metadata: Arc, + region_metadata: Option, + ) { if self.sst_meta_cache.is_some() { let file_path = format!( "region_id={}, file_id={}", file_id.region_id(), file_id.file_id() ); - match CachedSstMeta::try_new(&file_path, Arc::unwrap_or_clone(metadata)) { + match CachedSstMeta::try_new_with_region_metadata( + &file_path, + Arc::unwrap_or_clone(metadata), + region_metadata, + ) { Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)), Err(err) => warn!( err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}", @@ -1085,7 +1102,7 @@ mod tests { let file_id = RegionFileId::new(region_id, FileId::random()); let metadata = parquet_meta(); let mut metrics = MetadataCacheMetrics::default(); - cache.put_parquet_meta_data(file_id, metadata); + cache.put_parquet_meta_data(file_id, metadata, None); assert!( cache .get_parquet_meta_data(file_id, &mut metrics, Default::default()) @@ -1123,7 +1140,7 @@ mod tests { .is_none() ); let (metadata, region_metadata) = sst_parquet_meta(); - cache.put_parquet_meta_data(file_id, metadata); + cache.put_parquet_meta_data(file_id, metadata, None); let cached = cache .get_sst_meta_data(file_id, &mut metrics, Default::default()) .await @@ -1149,6 +1166,23 @@ mod tests { ); } + #[tokio::test] + async fn test_parquet_meta_cache_with_provided_region_metadata() { + let cache = CacheManager::builder().sst_meta_cache_size(2000).build(); + let mut metrics = MetadataCacheMetrics::default(); + let region_id = RegionId::new(1, 1); + let file_id = RegionFileId::new(region_id, FileId::random()); + let (metadata, region_metadata) = sst_parquet_meta(); + + cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone())); + + let cached = cache + .get_sst_meta_data(file_id, &mut metrics, Default::default()) + .await + .unwrap(); + assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata())); + } + #[test] fn test_meta_cache_weight_accounts_for_decoded_region_metadata() { let region_metadata = Arc::new(wide_region_metadata(128)); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index bb2bc23337..d089493f81 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -1043,7 +1043,7 @@ async fn preload_parquet_meta_cache_for_files( let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); match loader.load(&mut cache_metrics).await { Ok(metadata) => { - cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata)); + cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None); loaded += 1; } Err(err) => {