diff --git a/src/mito2/src/cache/index/result_cache.rs b/src/mito2/src/cache/index/result_cache.rs index 1b14c9d981..55384c28ae 100644 --- a/src/mito2/src/cache/index/result_cache.rs +++ b/src/mito2/src/cache/index/result_cache.rs @@ -116,6 +116,8 @@ pub enum PredicateKey { Bloom(BloomFilterKey), /// Inverted index predicate. Inverted(InvertedIndexKey), + /// Min-max pruning predicate. + MinMax(MinMaxKey), } impl PredicateKey { @@ -134,12 +136,18 @@ impl PredicateKey { Self::Inverted(InvertedIndexKey::new(predicates)) } + /// Creates a new min-max pruning key. + pub fn new_minmax(exprs: Arc>) -> Self { + Self::MinMax(MinMaxKey::new(exprs)) + } + /// Returns the memory usage of this key. pub fn mem_usage(&self) -> usize { match self { Self::Fulltext(key) => key.mem_usage, Self::Bloom(key) => key.mem_usage, Self::Inverted(key) => key.mem_usage, + Self::MinMax(key) => key.mem_usage, } } } @@ -239,6 +247,20 @@ impl InvertedIndexKey { } } +/// Key for min-max pruning. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct MinMaxKey { + exprs: Arc>, + mem_usage: usize, +} + +impl MinMaxKey { + pub fn new(exprs: Arc>) -> Self { + let mem_usage = exprs.iter().map(|s| s.len()).sum::(); + Self { exprs, mem_usage } + } +} + #[cfg(test)] #[allow(clippy::single_range_in_vec_init)] mod tests { diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 602f5508ba..5e95c2e221 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -321,10 +321,10 @@ impl MitoConfig { ); // Use 2x of global write buffer size as global write buffer reject size. let global_write_buffer_reject_size = global_write_buffer_size * 2; - // shouldn't be greater than 128MB in default mode. + // shouldn't be greater than 256MB in default mode. let sst_meta_cache_size = cmp::min( sys_memory / SST_META_CACHE_SIZE_FACTOR, - ReadableSize::mb(128), + ReadableSize::mb(256), ); // shouldn't be greater than 512MB in default mode. let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512)); diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index e621c77e36..dd85616241 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; use common_error::ext::BoxedError; -use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu}; +use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{DfRecordBatch, RecordBatch}; -use datatypes::compute; use futures::stream::BoxStream; use futures::{Stream, StreamExt}; use snafu::ResultExt; @@ -47,7 +47,7 @@ pub(crate) struct ConvertBatchStream { projection_mapper: Arc, cache_strategy: CacheStrategy, partition_metrics: PartitionMetrics, - buffer: Vec, + pending: VecDeque, } impl ConvertBatchStream { @@ -62,7 +62,7 @@ impl ConvertBatchStream { projection_mapper, cache_strategy, partition_metrics, - buffer: Vec::new(), + pending: VecDeque::new(), } } @@ -79,40 +79,36 @@ impl ConvertBatchStream { } } ScanBatch::Series(series) => { - self.buffer.clear(); + debug_assert!( + self.pending.is_empty(), + "ConvertBatchStream should not convert a new SeriesBatch when pending batches exist" + ); match series { SeriesBatch::PrimaryKey(primary_key_batch) => { - self.buffer.reserve(primary_key_batch.batches.len()); // Safety: Only primary key format returns this batch. let mapper = self.projection_mapper.as_primary_key().unwrap(); for batch in primary_key_batch.batches { - let record_batch = mapper.convert(&batch, &self.cache_strategy)?; - self.buffer.push(record_batch.into_df_record_batch()); + self.pending + .push_back(mapper.convert(&batch, &self.cache_strategy)?); } } SeriesBatch::Flat(flat_batch) => { - self.buffer.reserve(flat_batch.batches.len()); // Safety: Only flat format returns this batch. let mapper = self.projection_mapper.as_flat().unwrap(); for batch in flat_batch.batches { - let record_batch = mapper.convert(&batch)?; - self.buffer.push(record_batch.into_df_record_batch()); + self.pending.push_back(mapper.convert(&batch)?); } } } let output_schema = self.projection_mapper.output_schema(); - let record_batch = - compute::concat_batches(output_schema.arrow_schema(), &self.buffer) - .context(ArrowComputeSnafu)?; - - Ok(RecordBatch::from_df_record_batch( - output_schema, - record_batch, - )) + Ok(self + .pending + .pop_front() + .unwrap_or_else(|| RecordBatch::new_empty(output_schema))) } ScanBatch::RecordBatch(df_record_batch) => { // Safety: Only flat format returns this batch. @@ -128,6 +124,10 @@ impl Stream for ConvertBatchStream { type Item = common_recordbatch::error::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Some(batch) = self.pending.pop_front() { + return Poll::Ready(Some(Ok(batch))); + } + let batch = futures::ready!(self.inner.poll_next_unpin(cx)); let Some(batch) = batch else { return Poll::Ready(None); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ce2d94e829..c49a2ab026 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -69,6 +69,8 @@ use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::{self, region_dir_from_table_dir}; +use crate::sst::parquet::metadata::MetadataLoader; +use crate::sst::parquet::reader::MetadataCacheMetrics; use crate::time_provider::TimeProviderRef; use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; @@ -585,6 +587,7 @@ impl RegionOpener { let region = Arc::new(region); maybe_load_cache(®ion, config, &self.cache_manager); + maybe_preload_parquet_meta_cache(®ion, config, &self.cache_manager); Ok(Some(region)) } @@ -943,6 +946,89 @@ fn maybe_load_cache( write_cache.load_region_cache(task); } +/// Preloads Parquet metadata into the in-memory SST meta cache on region open. +/// +/// This improves the latency of the first query after server start by avoiding large Parquet +/// metadata reads on demand. +fn maybe_preload_parquet_meta_cache( + region: &MitoRegionRef, + config: &MitoConfig, + cache_manager: &Option, +) { + let Some(cache_manager) = cache_manager else { + return; + }; + + // Skip if SST meta cache is disabled. + if config.sst_meta_cache_size.as_bytes() == 0 { + return; + } + + let region = region.clone(); + let cache_manager = cache_manager.clone(); + + tokio::spawn(async move { + let region_id = region.region_id; + let table_dir = region.access_layer.table_dir(); + let path_type = region.access_layer.path_type(); + let object_store = region.access_layer.object_store(); + + // Collect SST files. Do not hold the version longer than needed. + let mut files = Vec::new(); + { + let version = region.version_control.current().version; + for level in version.ssts.levels() { + for file_handle in level.files.values() { + files.push(file_handle.clone()); + } + } + } + + // Load older files first so the most recent files remain hot in the LRU cache. + files.sort_by(|a, b| a.meta_ref().time_range.1.cmp(&b.meta_ref().time_range.1)); + + let mut loaded = 0usize; + for file_handle in files { + let file_id = file_handle.file_id(); + if cache_manager + .get_parquet_meta_data_from_mem_cache(file_id) + .is_some() + { + continue; + } + + let file_size = file_handle.meta_ref().file_size; + if file_size == 0 { + continue; + } + + let file_path = file_handle.file_path(table_dir, path_type); + let mut cache_metrics = MetadataCacheMetrics::default(); + let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); + match loader.load(&mut cache_metrics).await { + Ok(metadata) => { + cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata)); + loaded += 1; + } + Err(err) => { + // Preloading is best-effort. Failure shouldn't affect region open. + warn!( + err; "Failed to preload parquet metadata, region: {}, file: {}", + region_id, file_path + ); + } + } + } + + if loaded > 0 { + info!( + "Preloaded parquet metadata for region {}, loaded_files: {}", + region_id, loaded + ); + } + }); +} + fn can_load_cache(state: RegionRoleState) -> bool { match state { RegionRoleState::Leader(RegionLeaderState::Writable) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 989aeb812b..11eaf4d4a8 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -39,9 +39,9 @@ pub mod writer; pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; /// Default batch size to read parquet files. -pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; +pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 4096; /// Default row group size for parquet files. -pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; +pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024; /// Parquet write options. #[derive(Debug, Clone)] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 460b18f3a3..4cd73d6feb 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -679,15 +679,14 @@ impl ParquetReaderBuilder { metrics.rg_total += num_row_groups; metrics.rows_total += num_rows as usize; - let mut output = RowGroupSelection::new(row_group_size, num_rows as _); - // Compute skip_fields once for all pruning operations let skip_fields = self.compute_skip_fields(parquet_meta); - self.prune_row_groups_by_minmax( + let mut output = self.row_groups_by_minmax( read_format, parquet_meta, - &mut output, + row_group_size, + num_rows as usize, metrics, skip_fields, ); @@ -1136,20 +1135,43 @@ impl ParquetReaderBuilder { } } - /// Prunes row groups by min-max index. - fn prune_row_groups_by_minmax( + /// Computes row groups selection after min-max pruning. + fn row_groups_by_minmax( &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - output: &mut RowGroupSelection, + row_group_size: usize, + total_row_count: usize, metrics: &mut ReaderFilterMetrics, skip_fields: bool, - ) -> bool { + ) -> RowGroupSelection { let Some(predicate) = &self.predicate else { - return false; + return RowGroupSelection::new(row_group_size, total_row_count); }; - let row_groups_before = output.row_group_count(); + let file_id = self.file_handle.file_id().file_id(); + let cached_minmax_key = if predicate.dyn_filters().is_empty() { + // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly + // building row-group pruning stats for identical predicates across queries. + let mut exprs = predicate + .exprs() + .iter() + .map(|expr| format!("{expr:?}")) + .collect::>(); + exprs.sort(); + Some(PredicateKey::new_minmax(Arc::new(exprs))) + } else { + None + }; + + if let Some(index_result_cache) = self.cache_strategy.index_result_cache() + && let Some(predicate_key) = cached_minmax_key.as_ref() + && let Some(result) = index_result_cache.get(predicate_key, file_id) + { + let num_row_groups = parquet_meta.num_row_groups(); + metrics.rg_minmax_filtered += num_row_groups.saturating_sub(result.row_group_count()); + return (*result).clone(); + } let region_meta = read_format.metadata(); let row_groups = parquet_meta.row_groups(); @@ -1168,20 +1190,26 @@ impl ParquetReaderBuilder { // Here we use the schema of the SST to build the physical expression. If the column // in the SST doesn't have the same column id as the column in the expected metadata, // we will get a None statistics for that column. - predicate - .prune_with_stats(&stats, prune_schema) - .iter() - .zip(0..parquet_meta.num_row_groups()) - .for_each(|(mask, row_group)| { - if !*mask { - output.remove_row_group(row_group); - } - }); + let mask = predicate.prune_with_stats(&stats, prune_schema); + let output = RowGroupSelection::from_full_row_group_ids( + mask.iter() + .enumerate() + .filter_map(|(row_group, keep)| keep.then_some(row_group)), + row_group_size, + total_row_count, + ); - let row_groups_after = output.row_group_count(); - metrics.rg_minmax_filtered += row_groups_before - row_groups_after; + metrics.rg_minmax_filtered += parquet_meta + .num_row_groups() + .saturating_sub(output.row_group_count()); - true + if let Some(index_result_cache) = self.cache_strategy.index_result_cache() + && let Some(predicate_key) = cached_minmax_key + { + index_result_cache.put(predicate_key, file_id, Arc::new(output.clone())); + } + + output } fn apply_index_result_and_update_cache( diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 10acf76f18..8527a60338 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -77,6 +77,64 @@ impl RowGroupSelection { } } + /// Creates a new `RowGroupSelection` that selects all rows in the specified row groups. + /// + /// This is useful for fast construction after coarse pruning (e.g. min-max pruning), + /// avoiding building and then removing a full selection of all row groups. + pub fn from_full_row_group_ids( + row_group_ids: I, + row_group_size: usize, + total_row_count: usize, + ) -> Self + where + I: IntoIterator, + { + if row_group_size == 0 || total_row_count == 0 { + return Self::default(); + } + + let row_group_count = total_row_count.div_ceil(row_group_size); + if row_group_count == 0 { + return Self::default(); + } + + let last_row_group_size = total_row_count - (row_group_count - 1) * row_group_size; + + let mut selection_in_rg = BTreeMap::new(); + let mut row_count = 0usize; + let mut selector_len = 0usize; + + for rg_id in row_group_ids { + if rg_id >= row_group_count { + continue; + } + + let rg_row_count = if rg_id == row_group_count - 1 { + last_row_group_size + } else { + row_group_size + }; + + let selection = RowSelection::from(vec![RowSelector::select(rg_row_count)]); + selection_in_rg.insert( + rg_id, + RowSelectionWithCount { + selection, + row_count: rg_row_count, + selector_len: 1, + }, + ); + row_count += rg_row_count; + selector_len += 1; + } + + Self { + selection_in_rg, + row_count, + selector_len, + } + } + /// Returns the row selection for a given row group. /// /// `None` indicates not selected.