From c34d142e7db5f0c6eb50e39333102762d76e1e4c Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 21 Jan 2026 11:25:44 +0800 Subject: [PATCH] fix: clear unused range builders eagerly (#7569) * feat: clear the range builder after one part Signed-off-by: evenyag * feat: collect peak memory usage of build ranges Signed-off-by: evenyag * feat: collect peak range builder nums in metrics Signed-off-by: evenyag * refactor: num_range_builders_peak -> num_peak_range_builders Signed-off-by: evenyag * fix: track file range counts * Ensure the reader won't be released until all ranges scanned. * This fixes unordered scan which each partition range is a row group Signed-off-by: evenyag * style: fix clippy Signed-off-by: evenyag * chore: change to isize The metrics may init to 0. Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/cache.rs | 2 +- src/mito2/src/read/range.rs | 109 +++++++++++++++++++++--- src/mito2/src/read/scan_util.rs | 34 +++++++- src/mito2/src/read/seq_scan.rs | 34 ++++++-- src/mito2/src/read/series_scan.rs | 18 +++- src/mito2/src/read/unordered_scan.rs | 18 +++- src/mito2/src/sst/parquet/file_range.rs | 6 ++ src/mito2/src/sst/parquet/reader.rs | 6 ++ 8 files changed, 203 insertions(+), 24 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 7226d27b18..ced8f9e025 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -14,7 +14,7 @@ //! Cache for the engine. -mod cache_size; +pub(crate) mod cache_size; pub(crate) mod file_cache; pub(crate) mod index; diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index f8c8f6c4f0..b6b19e8f93 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -440,6 +440,17 @@ impl FileRangeBuilder { ); } } + + /// Returns the estimated memory size of this builder. + pub(crate) fn memory_size(&self) -> usize { + let context_size = self + .context + .as_ref() + .map(|ctx| ctx.memory_size()) + .unwrap_or(0); + let selection_size = self.selection.mem_usage(); + context_size + selection_size + } } /// Builder to create mem ranges. @@ -472,21 +483,68 @@ impl MemRangeBuilder { } } +/// Computes the number of ranges that reference each file. +/// +/// # Arguments +/// * `num_memtables` - Number of memtables +/// * `num_files` - Number of files +/// * `ranges` - All range metadata from StreamContext +/// * `part_ranges` - Iterator of partition ranges to scan +pub(crate) fn file_range_counts<'a>( + num_memtables: usize, + num_files: usize, + ranges: &[RangeMeta], + part_ranges: impl Iterator, +) -> Vec { + let mut counts = vec![0usize; num_files]; + for part_range in part_ranges { + let range_meta = &ranges[part_range.identifier]; + for row_group_index in &range_meta.row_group_indices { + if row_group_index.index >= num_memtables { + let file_index = row_group_index.index - num_memtables; + if file_index < num_files { + counts[file_index] += 1; + } + } + } + } + counts +} + +/// Entry for a file builder with its remaining range count. +struct FileBuilderEntry { + /// The builder for the file. None if not yet built or already cleared. + builder: Option>, + /// Number of remaining ranges to scan for this file. + remaining_ranges: usize, +} + /// List to manages the builders to create file ranges. /// Each scan partition should have its own list. Mutex inside this list is used to allow moving /// the list to different streams in the same partition. pub(crate) struct RangeBuilderList { num_memtables: usize, - file_builders: Mutex>>>, + file_entries: Mutex>, } impl RangeBuilderList { - /// Creates a new [ReaderBuilderList] with the given number of memtables and files. - pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self { - let file_builders = (0..num_files).map(|_| None).collect(); + /// Creates a new [RangeBuilderList] with pre-computed file range counts. + /// + /// # Arguments + /// * `num_memtables` - Number of memtables + /// * `file_range_counts` - Pre-computed counts of ranges per file + pub(crate) fn new(num_memtables: usize, file_range_counts: Vec) -> Self { + let file_entries = file_range_counts + .into_iter() + .map(|count| FileBuilderEntry { + builder: None, + remaining_ranges: count, + }) + .collect(); + Self { num_memtables, - file_builders: Mutex::new(file_builders), + file_entries: Mutex::new(file_entries), } } @@ -506,20 +564,49 @@ impl RangeBuilderList { let file = &input.files[file_index]; let builder = input.prune_file(file, reader_metrics).await?; builder.build_ranges(index.row_group_index, &mut ranges); + + // Record memory size and count of newly built builder. + reader_metrics.metadata_mem_size += builder.memory_size() as isize; + reader_metrics.num_range_builders += 1; + self.set_file_builder(file_index, Arc::new(builder)); } } + + // Decrement remaining count and auto-clear if all ranges are scanned + self.decrement_and_maybe_clear(file_index, reader_metrics); + Ok(ranges) } - fn get_file_builder(&self, index: usize) -> Option> { - let file_builders = self.file_builders.lock().unwrap(); - file_builders[index].clone() + fn get_file_builder(&self, file_index: usize) -> Option> { + let entries = self.file_entries.lock().unwrap(); + entries + .get(file_index) + .and_then(|entry| entry.builder.clone()) } - fn set_file_builder(&self, index: usize, builder: Arc) { - let mut file_builders = self.file_builders.lock().unwrap(); - file_builders[index] = Some(builder); + fn set_file_builder(&self, file_index: usize, builder: Arc) { + let mut entries = self.file_entries.lock().unwrap(); + if let Some(entry) = entries.get_mut(file_index) { + entry.builder = Some(builder); + } + } + + /// Decrements the remaining range count for a file and clears the builder if done. + fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) { + let mut entries = self.file_entries.lock().unwrap(); + if let Some(entry) = entries.get_mut(file_index) + && entry.remaining_ranges > 0 + { + entry.remaining_ranges -= 1; + if entry.remaining_ranges == 0 + && let Some(builder) = entry.builder.take() + { + reader_metrics.metadata_mem_size -= builder.memory_size() as isize; + reader_metrics.num_range_builders -= 1; + } + } } } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 3c1a78c7dd..06f5c1392c 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -225,6 +225,15 @@ pub(crate) struct ScanMetricsSet { metadata_cache_metrics: Option, /// Per-file scan metrics, only populated when explain_verbose is true. per_file_metrics: Option>, + + /// Current memory usage for file range builders. + build_ranges_mem_size: isize, + /// Peak memory usage for file range builders. + build_ranges_peak_mem_size: isize, + /// Current number of file range builders. + num_range_builders: isize, + /// Peak number of file range builders. + num_peak_range_builders: isize, } /// Wrapper for file metrics that compares by total cost in reverse order. @@ -313,6 +322,10 @@ impl fmt::Debug for ScanMetricsSet { fetch_metrics, metadata_cache_metrics, per_file_metrics, + build_ranges_mem_size: _, + build_ranges_peak_mem_size, + num_range_builders: _, + num_peak_range_builders, } = self; // Write core metrics @@ -534,7 +547,12 @@ impl fmt::Debug for ScanMetricsSet { write!(f, "}}")?; } - write!(f, ", \"stream_eof\":{stream_eof}}}") + write!( + f, + ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \ + \"num_peak_range_builders\":{num_peak_range_builders}, \ + \"stream_eof\":{stream_eof}}}" + ) } } impl ScanMetricsSet { @@ -599,6 +617,8 @@ impl ScanMetricsSet { scan_cost, metadata_cache_metrics, fetch_metrics, + metadata_mem_size, + num_range_builders, } = other; self.build_parts_cost += *build_cost; @@ -653,6 +673,18 @@ impl ScanMetricsSet { self.metadata_cache_metrics .get_or_insert_with(MetadataCacheMetrics::default) .merge_from(metadata_cache_metrics); + + // Track memory usage and update peak. + self.build_ranges_mem_size += *metadata_mem_size; + if self.build_ranges_mem_size > self.build_ranges_peak_mem_size { + self.build_ranges_peak_mem_size = self.build_ranges_mem_size; + } + + // Track number of builders and update peak. + self.num_range_builders += *num_range_builders; + if self.num_range_builders > self.num_peak_range_builders { + self.num_peak_range_builders = self.num_range_builders; + } } /// Merges per-file metrics. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 150d9bc87b..6f4bf2ce0f 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -41,7 +41,7 @@ use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeReader; use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; -use crate::read::range::{RangeBuilderList, RangeMeta}; +use crate::read::range::{RangeBuilderList, RangeMeta, file_range_counts}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges, @@ -170,9 +170,15 @@ impl SeqScan { part_metrics: &PartitionMetrics, ) -> Result { let mut sources = Vec::new(); - let range_builder_list = Arc::new(RangeBuilderList::new( + let counts = file_range_counts( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), + &stream_ctx.ranges, + partition_ranges.iter(), + ); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + counts, )); for part_range in partition_ranges { build_sources( @@ -204,9 +210,15 @@ impl SeqScan { part_metrics: &PartitionMetrics, ) -> Result { let mut sources = Vec::new(); - let range_builder_list = Arc::new(RangeBuilderList::new( + let counts = file_range_counts( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), + &stream_ctx.ranges, + partition_ranges.iter(), + ); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + counts, )); for part_range in partition_ranges { build_flat_sources( @@ -414,9 +426,15 @@ impl SeqScan { // build part cost. let mut fetch_start = Instant::now(); - let range_builder_list = Arc::new(RangeBuilderList::new( + let counts = file_range_counts( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), + &stream_ctx.ranges, + partition_ranges.iter(), + ); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + counts, )); let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu { reason: "Unexpected format", @@ -529,9 +547,15 @@ impl SeqScan { // build part cost. let mut fetch_start = Instant::now(); - let range_builder_list = Arc::new(RangeBuilderList::new( + let counts = file_range_counts( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), + &stream_ctx.ranges, + partition_ranges.iter(), + ); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + counts, )); // Scans each part. for part_range in partition_ranges { diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index faffd2805c..c14873ba20 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -43,7 +43,7 @@ use crate::error::{ Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu, ScanSeriesSnafu, TooManyFilesToReadSnafu, }; -use crate::read::range::RangeBuilderList; +use crate::read::range::{RangeBuilderList, file_range_counts}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics}; use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources}; @@ -450,9 +450,15 @@ impl SeriesDistributor { // build part cost. let mut fetch_start = Instant::now(); - let range_builder_list = Arc::new(RangeBuilderList::new( + let counts = file_range_counts( self.stream_ctx.input.num_memtables(), self.stream_ctx.input.num_files(), + &self.stream_ctx.ranges, + self.partitions.iter().flatten(), + ); + let range_builder_list = Arc::new(RangeBuilderList::new( + self.stream_ctx.input.num_memtables(), + counts, )); // Scans all parts. let mut sources = Vec::with_capacity(self.partitions.len()); @@ -548,9 +554,15 @@ impl SeriesDistributor { // build part cost. let mut fetch_start = Instant::now(); - let range_builder_list = Arc::new(RangeBuilderList::new( + let counts = file_range_counts( self.stream_ctx.input.num_memtables(), self.stream_ctx.input.num_files(), + &self.stream_ctx.ranges, + self.partitions.iter().flatten(), + ); + let range_builder_list = Arc::new(RangeBuilderList::new( + self.stream_ctx.input.num_memtables(), + counts, )); // Scans all parts. let mut sources = Vec::with_capacity(self.partitions.len()); diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index ee074ef9d7..27c133fb9d 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -34,7 +34,7 @@ use store_api::region_engine::{ }; use crate::error::{PartitionOutOfRangeSnafu, Result}; -use crate::read::range::RangeBuilderList; +use crate::read::range::{RangeBuilderList, file_range_counts}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges, @@ -305,9 +305,15 @@ impl UnorderedScan { let stream = try_stream! { part_metrics.on_first_poll(); - let range_builder_list = Arc::new(RangeBuilderList::new( + let counts = file_range_counts( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), + &stream_ctx.ranges, + part_ranges.iter(), + ); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + counts, )); // Scans each part. for part_range in part_ranges { @@ -395,9 +401,15 @@ impl UnorderedScan { let stream = try_stream! { part_metrics.on_first_poll(); - let range_builder_list = Arc::new(RangeBuilderList::new( + let counts = file_range_counts( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), + &stream_ctx.ranges, + part_ranges.iter(), + ); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + counts, )); // Scans each part. for part_range in part_ranges { diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 051c4b6986..bf75b15f12 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -363,6 +363,12 @@ impl FileRangeContext { let metadata = self.reader_builder.parquet_metadata(); row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path()) } + + /// Returns the estimated memory size of this context. + /// Mainly accounts for the parquet metadata size. + pub(crate) fn memory_size(&self) -> usize { + crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata()) + } } /// Mode to pre-filter columns in a range. diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 9be2907099..b726d3294f 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1502,6 +1502,10 @@ pub struct ReaderMetrics { pub(crate) metadata_cache_metrics: MetadataCacheMetrics, /// Optional metrics for page/row group fetch operations. pub(crate) fetch_metrics: Option>, + /// Memory size of metadata loaded for building file ranges. + pub(crate) metadata_mem_size: isize, + /// Number of file range builders created. + pub(crate) num_range_builders: isize, } impl ReaderMetrics { @@ -1522,6 +1526,8 @@ impl ReaderMetrics { self.fetch_metrics = Some(other_fetch.clone()); } } + self.metadata_mem_size += other.metadata_mem_size; + self.num_range_builders += other.num_range_builders; } /// Reports total rows.