diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 5f0184f528..3ad71d2a61 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -400,6 +400,19 @@ impl CacheManager { } } + /// Returns the total weighted size of the in-memory SST meta cache. + pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 { + self.sst_meta_cache + .as_ref() + .map(|cache| cache.weighted_size()) + .unwrap_or(0) + } + + /// Returns true if the in-memory SST meta cache is enabled. + pub(crate) fn sst_meta_cache_enabled(&self) -> bool { + self.sst_meta_cache.is_some() + } + /// Gets a vector with repeated value for specific `key`. pub fn get_repeated_vector( &self, diff --git a/src/mito2/src/cache/index/result_cache.rs b/src/mito2/src/cache/index/result_cache.rs index 1b14c9d981..dfed662a14 100644 --- a/src/mito2/src/cache/index/result_cache.rs +++ b/src/mito2/src/cache/index/result_cache.rs @@ -116,6 +116,8 @@ pub enum PredicateKey { Bloom(BloomFilterKey), /// Inverted index predicate. Inverted(InvertedIndexKey), + /// Min-max pruning predicate. + MinMax(MinMaxKey), } impl PredicateKey { @@ -134,12 +136,18 @@ impl PredicateKey { Self::Inverted(InvertedIndexKey::new(predicates)) } + /// Creates a new min-max pruning key. + pub fn new_minmax(exprs: Arc>, schema_version: u64, skip_fields: bool) -> Self { + Self::MinMax(MinMaxKey::new(exprs, schema_version, skip_fields)) + } + /// Returns the memory usage of this key. pub fn mem_usage(&self) -> usize { match self { Self::Fulltext(key) => key.mem_usage, Self::Bloom(key) => key.mem_usage, Self::Inverted(key) => key.mem_usage, + Self::MinMax(key) => key.mem_usage, } } } @@ -239,6 +247,30 @@ impl InvertedIndexKey { } } +/// Key for min-max pruning. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct MinMaxKey { + exprs: Arc>, + schema_version: u64, + skip_fields: bool, + mem_usage: usize, +} + +impl MinMaxKey { + pub fn new(exprs: Arc>, schema_version: u64, skip_fields: bool) -> Self { + let mem_usage = size_of::() + + size_of::>() + + exprs.len() * size_of::() + + exprs.iter().map(|s| s.len()).sum::(); + Self { + exprs, + schema_version, + skip_fields, + mem_usage, + } + } +} + #[cfg(test)] #[allow(clippy::single_range_in_vec_init)] mod tests { @@ -282,6 +314,18 @@ mod tests { assert!(cache.get(&key, non_existent_file_id).is_none()); } + #[test] + fn test_minmax_key_should_distinguish_schema_version_and_skip_fields() { + let exprs = Arc::new(vec!["col > 1".to_string()]); + + let key1 = PredicateKey::new_minmax(exprs.clone(), 1, false); + let key2 = PredicateKey::new_minmax(exprs.clone(), 2, false); + assert_ne!(key1, key2); + + let key3 = PredicateKey::new_minmax(exprs, 1, true); + assert_ne!(key1, key3); + } + #[test] fn test_cache_capacity_limit() { // Create a cache with small capacity (100 bytes) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index bda5b377b3..d405696bc0 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -182,6 +182,10 @@ pub(crate) struct ScanMetricsSet { bloom_filter_cache_hit: usize, /// Number of index result cache misses for bloom filter index. bloom_filter_cache_miss: usize, + /// Number of index result cache hits for minmax pruning. + minmax_cache_hit: usize, + /// Number of index result cache misses for minmax pruning. + minmax_cache_miss: usize, /// Number of pruner builder cache hits. pruner_cache_hit: usize, /// Number of pruner builder cache misses. @@ -308,6 +312,8 @@ impl fmt::Debug for ScanMetricsSet { inverted_index_cache_miss, bloom_filter_cache_hit, bloom_filter_cache_miss, + minmax_cache_hit, + minmax_cache_miss, pruner_cache_hit, pruner_cache_miss, pruner_prune_cost, @@ -433,6 +439,12 @@ impl fmt::Debug for ScanMetricsSet { if *bloom_filter_cache_miss > 0 { write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?; } + if *minmax_cache_hit > 0 { + write!(f, ", \"minmax_cache_hit\":{minmax_cache_hit}")?; + } + if *minmax_cache_miss > 0 { + write!(f, ", \"minmax_cache_miss\":{minmax_cache_miss}")?; + } if *pruner_cache_hit > 0 { write!(f, ", \"pruner_cache_hit\":{pruner_cache_hit}")?; } @@ -639,6 +651,8 @@ impl ScanMetricsSet { inverted_index_cache_miss, bloom_filter_cache_hit, bloom_filter_cache_miss, + minmax_cache_hit, + minmax_cache_miss, pruner_cache_hit, pruner_cache_miss, pruner_prune_cost, @@ -680,6 +694,8 @@ impl ScanMetricsSet { self.inverted_index_cache_miss += *inverted_index_cache_miss; self.bloom_filter_cache_hit += *bloom_filter_cache_hit; self.bloom_filter_cache_miss += *bloom_filter_cache_miss; + self.minmax_cache_hit += *minmax_cache_hit; + self.minmax_cache_miss += *minmax_cache_miss; self.pruner_cache_hit += *pruner_cache_hit; self.pruner_cache_miss += *pruner_cache_miss; self.pruner_prune_cost += *pruner_prune_cost; diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index e621c77e36..dd85616241 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; use common_error::ext::BoxedError; -use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu}; +use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{DfRecordBatch, RecordBatch}; -use datatypes::compute; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; use snafu::ResultExt; @@ -47,7 +47,7 @@ pub(crate) struct ConvertBatchStream { projection_mapper: Arc, cache_strategy: CacheStrategy, partition_metrics: PartitionMetrics, - buffer: Vec, + pending: VecDeque, } impl ConvertBatchStream { @@ -62,7 +62,7 @@ impl ConvertBatchStream { projection_mapper, cache_strategy, partition_metrics, - buffer: Vec::new(), + pending: VecDeque::new(), } } @@ -79,40 +79,36 @@ impl ConvertBatchStream { } } ScanBatch::Series(series) => { - self.buffer.clear(); + debug_assert!( + self.pending.is_empty(), + "ConvertBatchStream should not convert a new SeriesBatch when pending batches exist" + ); match series { SeriesBatch::PrimaryKey(primary_key_batch) => { - self.buffer.reserve(primary_key_batch.batches.len()); // Safety: Only primary key format returns this batch. let mapper = self.projection_mapper.as_primary_key().unwrap(); for batch in primary_key_batch.batches { - let record_batch = mapper.convert(&batch, &self.cache_strategy)?; - self.buffer.push(record_batch.into_df_record_batch()); + self.pending + .push_back(mapper.convert(&batch, &self.cache_strategy)?); } } SeriesBatch::Flat(flat_batch) => { - self.buffer.reserve(flat_batch.batches.len()); // Safety: Only flat format returns this batch. let mapper = self.projection_mapper.as_flat().unwrap(); for batch in flat_batch.batches { - let record_batch = mapper.convert(&batch)?; - self.buffer.push(record_batch.into_df_record_batch()); + self.pending.push_back(mapper.convert(&batch)?); } } } let output_schema = self.projection_mapper.output_schema(); - let record_batch = - compute::concat_batches(output_schema.arrow_schema(), &self.buffer) - .context(ArrowComputeSnafu)?; - - Ok(RecordBatch::from_df_record_batch( - output_schema, - record_batch, - )) + Ok(self + .pending + .pop_front() + .unwrap_or_else(|| RecordBatch::new_empty(output_schema))) } ScanBatch::RecordBatch(df_record_batch) => { // Safety: Only flat format returns this batch. @@ -128,6 +124,10 @@ impl Stream for ConvertBatchStream { type Item = common_recordbatch::error::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(batch) = self.pending.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + let batch = futures::ready!(self.inner.poll_next_unpin(cx)); let Some(batch) = batch else { return Poll::Ready(None); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index d30c1f5e10..014c50820f 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -17,7 +17,7 @@ use std::any::TypeId; use std::collections::HashMap; use std::sync::atomic::{AtomicI64, AtomicU64}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, LazyLock, Mutex}; use std::time::Instant; use common_telemetry::{debug, error, info, warn}; @@ -38,6 +38,7 @@ use store_api::metadata::{ use store_api::region_engine::RegionRole; use store_api::region_request::PathType; use store_api::storage::{ColumnId, RegionId}; +use tokio::sync::Semaphore; use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; @@ -63,16 +64,23 @@ use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::FormatType; -use crate::sst::file::{RegionFileId, RegionIndexId}; +use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId}; use crate::sst::file_purger::{FilePurgerRef, create_file_purger}; use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::{self, region_dir_from_table_dir}; +use crate::sst::parquet::metadata::MetadataLoader; +use crate::sst::parquet::reader::MetadataCacheMetrics; use crate::time_provider::TimeProviderRef; use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; +const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8; + +static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock = + LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY)); + /// A fetcher to retrieve partition expr for a region. /// /// Compatibility: older regions didn't persist `partition_expr` in engine metadata, @@ -585,6 +593,7 @@ impl RegionOpener { let region = Arc::new(region); maybe_load_cache(®ion, config, &self.cache_manager); + maybe_preload_parquet_meta_cache(®ion, config, &self.cache_manager); Ok(Some(region)) } @@ -973,6 +982,150 @@ fn maybe_load_cache( write_cache.load_region_cache(task); } +/// Preloads Parquet metadata into the in-memory SST meta cache on region open. +/// +/// This improves the latency of the first query after server start by avoiding large Parquet +/// metadata reads on demand. +/// +/// The preload is best-effort: +/// - Always tries to warm from the local write cache (file cache) first. +/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata +/// directly from the local store. +/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...). +async fn preload_parquet_meta_cache_for_files( + region_id: RegionId, + cache_manager: CacheManagerRef, + sst_meta_cache_capacity: u64, + table_dir: String, + path_type: PathType, + object_store: object_store::ObjectStore, + mut files: Vec, +) -> usize { + if !cache_manager.sst_meta_cache_enabled() + || sst_meta_cache_capacity == 0 + || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity + { + return 0; + } + + let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs); + + // Sort by time range so we can prefer preloading newer files first. + files.sort_by(|a, b| b.meta_ref().time_range.1.cmp(&a.meta_ref().time_range.1)); + + let mut loaded = 0usize; + for file_handle in files { + // Stop when the shared SST meta cache is full. + if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity { + break; + } + + let file_id = file_handle.file_id(); + let mut cache_metrics = MetadataCacheMetrics::default(); + if cache_manager + .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default()) + .await + .is_some() + { + // Metadata is either already in memory or loaded from file cache. + if cache_metrics.mem_cache_hit == 0 { + loaded += 1; + } + continue; + } + + if !allow_direct_load { + continue; + } + + let file_size = file_handle.meta_ref().file_size; + let file_path = file_handle.file_path(&table_dir, path_type); + let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); + match loader.load(&mut cache_metrics).await { + Ok(metadata) => { + cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata)); + loaded += 1; + } + Err(err) => { + // Preloading is best-effort. Failure shouldn't affect region open. + warn!( + err; "Failed to preload parquet metadata from local store, region: {}, file: {}", + region_id, file_path + ); + } + } + } + + loaded +} + +fn maybe_preload_parquet_meta_cache( + region: &MitoRegionRef, + config: &MitoConfig, + cache_manager: &Option, +) { + let Some(cache_manager) = cache_manager else { + return; + }; + if !cache_manager.sst_meta_cache_enabled() { + return; + } + + // Skip if SST meta cache is disabled. + if config.sst_meta_cache_size.as_bytes() == 0 { + return; + } + if !config.preload_index_cache { + return; + } + + let region = region.clone(); + let cache_manager = cache_manager.clone(); + let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes(); + + tokio::spawn(async move { + // Safety: semaphore must exist. + let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap(); + + let region_id = region.region_id; + let table_dir = region.access_layer.table_dir().to_string(); + let path_type = region.access_layer.path_type(); + let object_store = region.access_layer.object_store().clone(); + + // Collect SST files. Do not hold the version longer than needed. + let mut files = Vec::new(); + { + let version = region.version_control.current().version; + for level in version.ssts.levels() { + for file_handle in level.files.values() { + files.push(file_handle.clone()); + } + } + } + let preloading_start = Instant::now(); + let loaded = preload_parquet_meta_cache_for_files( + region_id, + cache_manager, + sst_meta_cache_capacity, + table_dir, + path_type, + object_store, + files, + ) + .await; + let preloading_cost = preloading_start.elapsed(); + + if loaded > 0 { + info!( + "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}", + region_id, + loaded, + preloading_cost.as_millis() + ); + } + }); +} + fn can_load_cache(state: RegionRoleState) -> bool { match state { RegionRoleState::Leader(RegionLeaderState::Writable) @@ -987,3 +1140,259 @@ fn can_load_cache(state: RegionRoleState) -> bool { | RegionRoleState::Leader(RegionLeaderState::Truncating) => false, } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_base::readable_size::ReadableSize; + use common_test_util::temp_dir::create_temp_dir; + use common_time::Timestamp; + use datatypes::arrow::array::{ArrayRef, Int64Array}; + use datatypes::arrow::record_batch::RecordBatch; + use object_store::ObjectStore; + use object_store::services::{Fs, Memory}; + use parquet::arrow::ArrowWriter; + use store_api::region_request::PathType; + use store_api::storage::{FileId, RegionId}; + + use super::preload_parquet_meta_cache_for_files; + use crate::cache::CacheManager; + use crate::cache::file_cache::{FileType, IndexKey}; + use crate::sst::file::{FileHandle, FileMeta}; + use crate::sst::file_purger::NoopFilePurger; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_preload_parquet_meta_cache_uses_file_cache() { + let env = TestEnv::new().await; + + let local_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let write_cache = env + .create_write_cache(local_store, ReadableSize::mb(1024)) + .await; + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(1024 * 1024) + .write_cache(Some(write_cache.clone())) + .build(), + ); + + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut parquet_bytes = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let file_size = parquet_bytes.len() as u64; + + let file_meta = FileMeta { + region_id, + file_id, + time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)), + level: 0, + file_size, + max_row_group_uncompressed_size: 0, + available_indexes: Default::default(), + indexes: vec![], + index_file_size: 0, + index_version: 0, + num_rows: 3, + num_row_groups: 1, + sequence: None, + partition_expr: None, + num_series: 0, + }; + let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); + + let table_dir = "test_table"; + let path_type = PathType::Bare; + let remote_path = file_handle.file_path(table_dir, path_type); + + let source_store = ObjectStore::new(Memory::default()).unwrap().finish(); + source_store + .write(&remote_path, parquet_bytes) + .await + .unwrap(); + + // Put the parquet file into the write cache, so file cache contains metadata. + let index_key = IndexKey::new(region_id, file_id, FileType::Parquet); + write_cache + .file_cache() + .download(index_key, &remote_path, &source_store, file_size) + .await + .unwrap(); + + let region_file_id = file_handle.file_id(); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_none() + ); + + let loaded = preload_parquet_meta_cache_for_files( + region_id, + cache_manager.clone(), + 1024 * 1024, + table_dir.to_string(), + path_type, + source_store.clone(), + vec![file_handle], + ) + .await; + + // Should warm the in-memory cache from the local file cache. + assert_eq!(loaded, 1); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_some() + ); + } + + #[tokio::test] + async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() { + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(1024 * 1024) + .build(), + ); + + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + + // Without a local file cache entry, preloading should skip the file. + let file_meta = FileMeta { + region_id, + file_id, + time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)), + level: 0, + file_size: 0, + max_row_group_uncompressed_size: 0, + available_indexes: Default::default(), + indexes: vec![], + index_file_size: 0, + index_version: 0, + num_rows: 3, + num_row_groups: 1, + sequence: None, + partition_expr: None, + num_series: 0, + }; + let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); + + let table_dir = "test_table"; + let path_type = PathType::Bare; + let remote_path = file_handle.file_path(table_dir, path_type); + + // Even if the remote object store has the file, we should not preload from it. + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + object_store + .write(&remote_path, b"noop".as_slice()) + .await + .unwrap(); + + let region_file_id = file_handle.file_id(); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_none() + ); + + let loaded = preload_parquet_meta_cache_for_files( + region_id, + cache_manager.clone(), + 1024 * 1024, + table_dir.to_string(), + path_type, + object_store, + vec![file_handle], + ) + .await; + + assert_eq!(loaded, 0); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_none() + ); + } + + #[tokio::test] + async fn test_preload_parquet_meta_cache_loads_from_local_fs() { + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(1024 * 1024) + .build(), + ); + + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut parquet_bytes = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat + // the local filesystem to retrieve it. + let file_meta = FileMeta { + region_id, + file_id, + time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)), + level: 0, + file_size: 0, + max_row_group_uncompressed_size: 0, + available_indexes: Default::default(), + indexes: vec![], + index_file_size: 0, + index_version: 0, + num_rows: 3, + num_row_groups: 1, + sequence: None, + partition_expr: None, + num_series: 0, + }; + let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); + + let table_dir = "test_table"; + let path_type = PathType::Bare; + let file_path = file_handle.file_path(table_dir, path_type); + + let root = create_temp_dir("parquet-meta-preload"); + let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap())) + .unwrap() + .finish(); + object_store.write(&file_path, parquet_bytes).await.unwrap(); + + let region_file_id = file_handle.file_id(); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_none() + ); + + let loaded = preload_parquet_meta_cache_for_files( + region_id, + cache_manager.clone(), + 1024 * 1024, + table_dir.to_string(), + path_type, + object_store, + vec![file_handle], + ) + .await; + + assert_eq!(loaded, 1); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_some() + ); + } +} diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 460b18f3a3..cd2aef07dc 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -679,15 +679,14 @@ impl ParquetReaderBuilder { metrics.rg_total += num_row_groups; metrics.rows_total += num_rows as usize; - let mut output = RowGroupSelection::new(row_group_size, num_rows as _); - // Compute skip_fields once for all pruning operations let skip_fields = self.compute_skip_fields(parquet_meta); - self.prune_row_groups_by_minmax( + let mut output = self.row_groups_by_minmax( read_format, parquet_meta, - &mut output, + row_group_size, + num_rows as usize, metrics, skip_fields, ); @@ -1136,20 +1135,59 @@ impl ParquetReaderBuilder { } } - /// Prunes row groups by min-max index. - fn prune_row_groups_by_minmax( + /// Computes row groups selection after min-max pruning. + fn row_groups_by_minmax( &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - output: &mut RowGroupSelection, + row_group_size: usize, + total_row_count: usize, metrics: &mut ReaderFilterMetrics, skip_fields: bool, - ) -> bool { + ) -> RowGroupSelection { let Some(predicate) = &self.predicate else { - return false; + return RowGroupSelection::new(row_group_size, total_row_count); }; - let row_groups_before = output.row_group_count(); + let file_id = self.file_handle.file_id().file_id(); + let index_result_cache = self.cache_strategy.index_result_cache(); + let cached_minmax_key = + if index_result_cache.is_some() && predicate.dyn_filters().is_empty() { + // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly + // building row-group pruning stats for identical predicates across queries. + let mut exprs = predicate + .exprs() + .iter() + .map(|expr| format!("{expr:?}")) + .collect::>(); + exprs.sort(); + let schema_version = self + .expected_metadata + .as_ref() + .map(|meta| meta.schema_version) + .unwrap_or_else(|| read_format.metadata().schema_version); + Some(PredicateKey::new_minmax( + Arc::new(exprs), + schema_version, + skip_fields, + )) + } else { + None + }; + + if let Some(index_result_cache) = index_result_cache + && let Some(predicate_key) = cached_minmax_key.as_ref() + { + if let Some(result) = index_result_cache.get(predicate_key, file_id) { + metrics.minmax_cache_hit += 1; + let num_row_groups = parquet_meta.num_row_groups(); + metrics.rg_minmax_filtered += + num_row_groups.saturating_sub(result.row_group_count()); + return (*result).clone(); + } + + metrics.minmax_cache_miss += 1; + } let region_meta = read_format.metadata(); let row_groups = parquet_meta.row_groups(); @@ -1168,20 +1206,26 @@ impl ParquetReaderBuilder { // Here we use the schema of the SST to build the physical expression. If the column // in the SST doesn't have the same column id as the column in the expected metadata, // we will get a None statistics for that column. - predicate - .prune_with_stats(&stats, prune_schema) - .iter() - .zip(0..parquet_meta.num_row_groups()) - .for_each(|(mask, row_group)| { - if !*mask { - output.remove_row_group(row_group); - } - }); + let mask = predicate.prune_with_stats(&stats, prune_schema); + let output = RowGroupSelection::from_full_row_group_ids( + mask.iter() + .enumerate() + .filter_map(|(row_group, keep)| keep.then_some(row_group)), + row_group_size, + total_row_count, + ); - let row_groups_after = output.row_group_count(); - metrics.rg_minmax_filtered += row_groups_before - row_groups_after; + metrics.rg_minmax_filtered += parquet_meta + .num_row_groups() + .saturating_sub(output.row_group_count()); - true + if let Some(index_result_cache) = index_result_cache + && let Some(predicate_key) = cached_minmax_key + { + index_result_cache.put(predicate_key, file_id, Arc::new(output.clone())); + } + + output } fn apply_index_result_and_update_cache( @@ -1295,6 +1339,10 @@ pub(crate) struct ReaderFilterMetrics { pub(crate) bloom_filter_cache_hit: usize, /// Number of index result cache misses for bloom filter index. pub(crate) bloom_filter_cache_miss: usize, + /// Number of index result cache hits for minmax pruning. + pub(crate) minmax_cache_hit: usize, + /// Number of index result cache misses for minmax pruning. + pub(crate) minmax_cache_miss: usize, /// Optional metrics for inverted index applier. pub(crate) inverted_index_apply_metrics: Option, @@ -1335,6 +1383,8 @@ impl ReaderFilterMetrics { self.inverted_index_cache_miss += other.inverted_index_cache_miss; self.bloom_filter_cache_hit += other.bloom_filter_cache_hit; self.bloom_filter_cache_miss += other.bloom_filter_cache_miss; + self.minmax_cache_hit += other.minmax_cache_hit; + self.minmax_cache_miss += other.minmax_cache_miss; self.pruner_cache_hit += other.pruner_cache_hit; self.pruner_cache_miss += other.pruner_cache_miss; @@ -1423,7 +1473,7 @@ impl ReaderFilterMetrics { } #[cfg(all(test, feature = "vector_index"))] -mod tests { +mod vector_index_tests { use super::*; #[test] @@ -2120,3 +2170,112 @@ impl FlatRowGroupReader { } } } + +#[cfg(test)] +mod tests { + use std::any::Any; + use std::fmt::{Debug, Formatter}; + use std::sync::{Arc, LazyLock}; + + use datafusion::arrow::datatypes::DataType; + use datafusion_common::ScalarValue; + use datafusion_expr::expr::ScalarFunction; + use datafusion_expr::{ + ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility, + }; + use datatypes::arrow::array::{ArrayRef, Int64Array}; + use datatypes::arrow::record_batch::RecordBatch; + use object_store::services::Memory; + use parquet::arrow::ArrowWriter; + use store_api::region_request::PathType; + use table::predicate::Predicate; + + use super::*; + use crate::sst::parquet::metadata::MetadataLoader; + use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata}; + + #[tokio::test(flavor = "current_thread")] + async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() { + #[derive(Eq, PartialEq, Hash)] + struct PanicDebugUdf; + + impl Debug for PanicDebugUdf { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + panic!("minmax predicate key should not format exprs when cache is disabled"); + } + } + + impl ScalarUDFImpl for PanicDebugUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "panic_debug_udf" + } + + fn signature(&self) -> &Signature { + static SIGNATURE: LazyLock = + LazyLock::new(|| Signature::variadic_any(Volatility::Immutable)); + &SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result { + Ok(DataType::Int64) + } + + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1)))) + } + } + + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_handle = sst_file_handle(0, 1); + let table_dir = "test_table".to_string(); + let path_type = PathType::Bare; + let file_path = file_handle.file_path(&table_dir, path_type); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut parquet_bytes = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let file_size = parquet_bytes.len() as u64; + object_store.write(&file_path, parquet_bytes).await.unwrap(); + + let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let read_format = + ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap(); + + let mut cache_metrics = MetadataCacheMetrics::default(); + let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); + let parquet_meta = loader.load(&mut cache_metrics).await.unwrap(); + + let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf)); + let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf( + udf, + vec![], + ))]); + let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store) + .predicate(Some(predicate)) + .cache(CacheStrategy::Disabled); + + let row_group_size = parquet_meta.row_group(0).num_rows() as usize; + let total_row_count = parquet_meta.file_metadata().num_rows() as usize; + let mut metrics = ReaderFilterMetrics::default(); + let selection = builder.row_groups_by_minmax( + &read_format, + &parquet_meta, + row_group_size, + total_row_count, + &mut metrics, + false, + ); + + assert!(!selection.is_empty()); + } +} diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 10acf76f18..595f1d352a 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -77,6 +77,68 @@ impl RowGroupSelection { } } + /// Creates a new `RowGroupSelection` that selects all rows in the specified row groups. + /// + /// This is useful for fast construction after coarse pruning (e.g. min-max pruning), + /// avoiding building and then removing a full selection of all row groups. + pub fn from_full_row_group_ids( + row_group_ids: I, + row_group_size: usize, + total_row_count: usize, + ) -> Self + where + I: IntoIterator, + { + if row_group_size == 0 || total_row_count == 0 { + return Self::default(); + } + + let row_group_count = total_row_count.div_ceil(row_group_size); + if row_group_count == 0 { + return Self::default(); + } + + let last_row_group_size = total_row_count - (row_group_count - 1) * row_group_size; + + let mut selection_in_rg = BTreeMap::new(); + let mut row_count = 0usize; + let mut selector_len = 0usize; + + for rg_id in row_group_ids { + if rg_id >= row_group_count { + continue; + } + + let rg_row_count = if rg_id == row_group_count - 1 { + last_row_group_size + } else { + row_group_size + }; + + let selection = RowSelection::from(vec![RowSelector::select(rg_row_count)]); + if selection_in_rg + .insert( + rg_id, + RowSelectionWithCount { + selection, + row_count: rg_row_count, + selector_len: 1, + }, + ) + .is_none() + { + row_count += rg_row_count; + selector_len += 1; + } + } + + Self { + selection_in_rg, + row_count, + selector_len, + } + } + /// Returns the row selection for a given row group. /// /// `None` indicates not selected. @@ -748,6 +810,16 @@ mod tests { assert_eq!(row_selection.row_count(), 1); } + #[test] + fn test_from_full_row_group_ids_dedup_duplicates() { + let selection = RowGroupSelection::from_full_row_group_ids([0, 0, 2, 2], 10, 25); + assert_eq!(selection.row_group_count(), 2); + assert_eq!(selection.row_count(), 15); + + assert_eq!(selection.get(0).unwrap().row_count(), 10); + assert_eq!(selection.get(2).unwrap().row_count(), 5); + } + #[test] fn test_from_row_ids() { let row_group_size = 100;