diff --git a/src/mito2/src/cache/index/result_cache.rs b/src/mito2/src/cache/index/result_cache.rs index 55384c28ae..d4a62f573d 100644 --- a/src/mito2/src/cache/index/result_cache.rs +++ b/src/mito2/src/cache/index/result_cache.rs @@ -137,8 +137,8 @@ impl PredicateKey { } /// Creates a new min-max pruning key. - pub fn new_minmax(exprs: Arc>) -> Self { - Self::MinMax(MinMaxKey::new(exprs)) + pub fn new_minmax(exprs: Arc>, schema_version: u64, skip_fields: bool) -> Self { + Self::MinMax(MinMaxKey::new(exprs, schema_version, skip_fields)) } /// Returns the memory usage of this key. @@ -251,13 +251,21 @@ impl InvertedIndexKey { #[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] pub struct MinMaxKey { exprs: Arc>, + schema_version: u64, + skip_fields: bool, mem_usage: usize, } impl MinMaxKey { - pub fn new(exprs: Arc>) -> Self { - let mem_usage = exprs.iter().map(|s| s.len()).sum::(); - Self { exprs, mem_usage } + pub fn new(exprs: Arc>, schema_version: u64, skip_fields: bool) -> Self { + let mem_usage = + exprs.iter().map(|s| s.len()).sum::() + size_of::() + size_of::(); + Self { + exprs, + schema_version, + skip_fields, + mem_usage, + } } } @@ -304,6 +312,18 @@ mod tests { assert!(cache.get(&key, non_existent_file_id).is_none()); } + #[test] + fn test_minmax_key_should_distinguish_schema_version_and_skip_fields() { + let exprs = Arc::new(vec!["col > 1".to_string()]); + + let key1 = PredicateKey::new_minmax(exprs.clone(), 1, false); + let key2 = PredicateKey::new_minmax(exprs.clone(), 2, false); + assert_ne!(key1, key2); + + let key3 = PredicateKey::new_minmax(exprs, 1, true); + assert_ne!(key1, key3); + } + #[test] fn test_cache_capacity_limit() { // Create a cache with small capacity (100 bytes) diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index c49a2ab026..3a95b79c61 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -63,7 +63,7 @@ use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::FormatType; -use crate::sst::file::{RegionFileId, RegionIndexId}; +use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId}; use crate::sst::file_purger::{FilePurgerRef, create_file_purger}; use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; @@ -950,6 +950,58 @@ fn maybe_load_cache( /// /// This improves the latency of the first query after server start by avoiding large Parquet /// metadata reads on demand. +async fn preload_parquet_meta_cache_for_files( + region_id: RegionId, + table_dir: String, + path_type: PathType, + object_store: object_store::ObjectStore, + cache_manager: CacheManagerRef, + mut files: Vec, +) -> usize { + // Load older files first so the most recent files remain hot in the LRU cache. + files.sort_by(|a, b| a.meta_ref().time_range.1.cmp(&b.meta_ref().time_range.1)); + + let mut loaded = 0usize; + for file_handle in files { + let file_id = file_handle.file_id(); + let mut cache_metrics = MetadataCacheMetrics::default(); + if cache_manager + .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default()) + .await + .is_some() + { + // Metadata is either already in memory or loaded from file cache. + if cache_metrics.mem_cache_hit == 0 { + loaded += 1; + } + continue; + } + + let file_size = file_handle.meta_ref().file_size; + if file_size == 0 { + continue; + } + + let file_path = file_handle.file_path(&table_dir, path_type); + let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); + match loader.load(&mut cache_metrics).await { + Ok(metadata) => { + cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata)); + loaded += 1; + } + Err(err) => { + // Preloading is best-effort. Failure shouldn't affect region open. + warn!( + err; "Failed to preload parquet metadata, region: {}, file: {}", + region_id, file_path + ); + } + } + } + + loaded +} + fn maybe_preload_parquet_meta_cache( region: &MitoRegionRef, config: &MitoConfig, @@ -969,9 +1021,9 @@ fn maybe_preload_parquet_meta_cache( tokio::spawn(async move { let region_id = region.region_id; - let table_dir = region.access_layer.table_dir(); + let table_dir = region.access_layer.table_dir().to_string(); let path_type = region.access_layer.path_type(); - let object_store = region.access_layer.object_store(); + let object_store = region.access_layer.object_store().clone(); // Collect SST files. Do not hold the version longer than needed. let mut files = Vec::new(); @@ -983,42 +1035,15 @@ fn maybe_preload_parquet_meta_cache( } } } - - // Load older files first so the most recent files remain hot in the LRU cache. - files.sort_by(|a, b| a.meta_ref().time_range.1.cmp(&b.meta_ref().time_range.1)); - - let mut loaded = 0usize; - for file_handle in files { - let file_id = file_handle.file_id(); - if cache_manager - .get_parquet_meta_data_from_mem_cache(file_id) - .is_some() - { - continue; - } - - let file_size = file_handle.meta_ref().file_size; - if file_size == 0 { - continue; - } - - let file_path = file_handle.file_path(table_dir, path_type); - let mut cache_metrics = MetadataCacheMetrics::default(); - let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); - match loader.load(&mut cache_metrics).await { - Ok(metadata) => { - cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata)); - loaded += 1; - } - Err(err) => { - // Preloading is best-effort. Failure shouldn't affect region open. - warn!( - err; "Failed to preload parquet metadata, region: {}, file: {}", - region_id, file_path - ); - } - } - } + let loaded = preload_parquet_meta_cache_for_files( + region_id, + table_dir, + path_type, + object_store, + cache_manager, + files, + ) + .await; if loaded > 0 { info!( @@ -1043,3 +1068,116 @@ fn can_load_cache(state: RegionRoleState) -> bool { | RegionRoleState::Leader(RegionLeaderState::Truncating) => false, } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_base::readable_size::ReadableSize; + use common_time::Timestamp; + use datatypes::arrow::array::{ArrayRef, Int64Array}; + use datatypes::arrow::record_batch::RecordBatch; + use object_store::ObjectStore; + use object_store::services::Memory; + use parquet::arrow::ArrowWriter; + use store_api::region_request::PathType; + use store_api::storage::{FileId, RegionId}; + + use super::preload_parquet_meta_cache_for_files; + use crate::cache::CacheManager; + use crate::cache::file_cache::{FileType, IndexKey}; + use crate::sst::file::{FileHandle, FileMeta}; + use crate::sst::file_purger::NoopFilePurger; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_preload_parquet_meta_cache_uses_file_cache() { + let env = TestEnv::new().await; + + let local_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let write_cache = env + .create_write_cache(local_store, ReadableSize::mb(1024)) + .await; + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(1024 * 1024) + .write_cache(Some(write_cache.clone())) + .build(), + ); + + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut parquet_bytes = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let file_size = parquet_bytes.len() as u64; + + let file_meta = FileMeta { + region_id, + file_id, + time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)), + level: 0, + file_size, + max_row_group_uncompressed_size: 0, + available_indexes: Default::default(), + indexes: vec![], + index_file_size: 0, + index_version: 0, + num_rows: 3, + num_row_groups: 1, + sequence: None, + partition_expr: None, + num_series: 0, + }; + let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); + + let table_dir = "test_table"; + let path_type = PathType::Bare; + let remote_path = file_handle.file_path(table_dir, path_type); + + let source_store = ObjectStore::new(Memory::default()).unwrap().finish(); + source_store + .write(&remote_path, parquet_bytes) + .await + .unwrap(); + + // Put the parquet file into the write cache, so file cache contains metadata. + let index_key = IndexKey::new(region_id, file_id, FileType::Parquet); + write_cache + .file_cache() + .download(index_key, &remote_path, &source_store, file_size) + .await + .unwrap(); + + let region_file_id = file_handle.file_id(); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_none() + ); + + // Provide an empty remote object store (so object-store load should fail if attempted). + let missing_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let loaded = preload_parquet_meta_cache_for_files( + region_id, + table_dir.to_string(), + path_type, + missing_store, + cache_manager.clone(), + vec![file_handle], + ) + .await; + + // Should warm the in-memory cache from file cache even if remote store misses. + assert_eq!(loaded, 1); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_some() + ); + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 4cd73d6feb..77bf128e32 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1159,7 +1159,16 @@ impl ParquetReaderBuilder { .map(|expr| format!("{expr:?}")) .collect::>(); exprs.sort(); - Some(PredicateKey::new_minmax(Arc::new(exprs))) + let schema_version = self + .expected_metadata + .as_ref() + .map(|meta| meta.schema_version) + .unwrap_or_else(|| read_format.metadata().schema_version); + Some(PredicateKey::new_minmax( + Arc::new(exprs), + schema_version, + skip_fields, + )) } else { None }; diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 8527a60338..595f1d352a 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -116,16 +116,20 @@ impl RowGroupSelection { }; let selection = RowSelection::from(vec![RowSelector::select(rg_row_count)]); - selection_in_rg.insert( - rg_id, - RowSelectionWithCount { - selection, - row_count: rg_row_count, - selector_len: 1, - }, - ); - row_count += rg_row_count; - selector_len += 1; + if selection_in_rg + .insert( + rg_id, + RowSelectionWithCount { + selection, + row_count: rg_row_count, + selector_len: 1, + }, + ) + .is_none() + { + row_count += rg_row_count; + selector_len += 1; + } } Self { @@ -806,6 +810,16 @@ mod tests { assert_eq!(row_selection.row_count(), 1); } + #[test] + fn test_from_full_row_group_ids_dedup_duplicates() { + let selection = RowGroupSelection::from_full_row_group_ids([0, 0, 2, 2], 10, 25); + assert_eq!(selection.row_group_count(), 2); + assert_eq!(selection.row_count(), 15); + + assert_eq!(selection.get(0).unwrap().row_count(), 10); + assert_eq!(selection.get(2).unwrap().row_count(), 5); + } + #[test] fn test_from_row_ids() { let row_group_size = 100;