fix: clear unused range builders eagerly (#7569)

* feat: clear the range builder after one part

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect peak memory usage of build ranges

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect peak range builder nums in metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: num_range_builders_peak -> num_peak_range_builders

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: track file range counts

* Ensure the reader won't be released until all ranges scanned.
* This fixes unordered scan which each partition range is a row group

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: change to isize

The metrics may init to 0.

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-01-21 11:25:44 +08:00
committed by GitHub
parent 67e51b4573
commit c34d142e7d
8 changed files with 203 additions and 24 deletions

View File

@@ -14,7 +14,7 @@
//! Cache for the engine.
mod cache_size;
pub(crate) mod cache_size;
pub(crate) mod file_cache;
pub(crate) mod index;

View File

@@ -440,6 +440,17 @@ impl FileRangeBuilder {
);
}
}
/// Returns the estimated memory size of this builder.
pub(crate) fn memory_size(&self) -> usize {
let context_size = self
.context
.as_ref()
.map(|ctx| ctx.memory_size())
.unwrap_or(0);
let selection_size = self.selection.mem_usage();
context_size + selection_size
}
}
/// Builder to create mem ranges.
@@ -472,21 +483,68 @@ impl MemRangeBuilder {
}
}
/// Computes the number of ranges that reference each file.
///
/// # Arguments
/// * `num_memtables` - Number of memtables
/// * `num_files` - Number of files
/// * `ranges` - All range metadata from StreamContext
/// * `part_ranges` - Iterator of partition ranges to scan
pub(crate) fn file_range_counts<'a>(
num_memtables: usize,
num_files: usize,
ranges: &[RangeMeta],
part_ranges: impl Iterator<Item = &'a PartitionRange>,
) -> Vec<usize> {
let mut counts = vec![0usize; num_files];
for part_range in part_ranges {
let range_meta = &ranges[part_range.identifier];
for row_group_index in &range_meta.row_group_indices {
if row_group_index.index >= num_memtables {
let file_index = row_group_index.index - num_memtables;
if file_index < num_files {
counts[file_index] += 1;
}
}
}
}
counts
}
/// Entry for a file builder with its remaining range count.
struct FileBuilderEntry {
/// The builder for the file. None if not yet built or already cleared.
builder: Option<Arc<FileRangeBuilder>>,
/// Number of remaining ranges to scan for this file.
remaining_ranges: usize,
}
/// List to manages the builders to create file ranges.
/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
/// the list to different streams in the same partition.
pub(crate) struct RangeBuilderList {
num_memtables: usize,
file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
file_entries: Mutex<Vec<FileBuilderEntry>>,
}
impl RangeBuilderList {
/// Creates a new [ReaderBuilderList] with the given number of memtables and files.
pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
let file_builders = (0..num_files).map(|_| None).collect();
/// Creates a new [RangeBuilderList] with pre-computed file range counts.
///
/// # Arguments
/// * `num_memtables` - Number of memtables
/// * `file_range_counts` - Pre-computed counts of ranges per file
pub(crate) fn new(num_memtables: usize, file_range_counts: Vec<usize>) -> Self {
let file_entries = file_range_counts
.into_iter()
.map(|count| FileBuilderEntry {
builder: None,
remaining_ranges: count,
})
.collect();
Self {
num_memtables,
file_builders: Mutex::new(file_builders),
file_entries: Mutex::new(file_entries),
}
}
@@ -506,20 +564,49 @@ impl RangeBuilderList {
let file = &input.files[file_index];
let builder = input.prune_file(file, reader_metrics).await?;
builder.build_ranges(index.row_group_index, &mut ranges);
// Record memory size and count of newly built builder.
reader_metrics.metadata_mem_size += builder.memory_size() as isize;
reader_metrics.num_range_builders += 1;
self.set_file_builder(file_index, Arc::new(builder));
}
}
// Decrement remaining count and auto-clear if all ranges are scanned
self.decrement_and_maybe_clear(file_index, reader_metrics);
Ok(ranges)
}
fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
let file_builders = self.file_builders.lock().unwrap();
file_builders[index].clone()
fn get_file_builder(&self, file_index: usize) -> Option<Arc<FileRangeBuilder>> {
let entries = self.file_entries.lock().unwrap();
entries
.get(file_index)
.and_then(|entry| entry.builder.clone())
}
fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
let mut file_builders = self.file_builders.lock().unwrap();
file_builders[index] = Some(builder);
fn set_file_builder(&self, file_index: usize, builder: Arc<FileRangeBuilder>) {
let mut entries = self.file_entries.lock().unwrap();
if let Some(entry) = entries.get_mut(file_index) {
entry.builder = Some(builder);
}
}
/// Decrements the remaining range count for a file and clears the builder if done.
fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
let mut entries = self.file_entries.lock().unwrap();
if let Some(entry) = entries.get_mut(file_index)
&& entry.remaining_ranges > 0
{
entry.remaining_ranges -= 1;
if entry.remaining_ranges == 0
&& let Some(builder) = entry.builder.take()
{
reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
reader_metrics.num_range_builders -= 1;
}
}
}
}

View File

@@ -225,6 +225,15 @@ pub(crate) struct ScanMetricsSet {
metadata_cache_metrics: Option<MetadataCacheMetrics>,
/// Per-file scan metrics, only populated when explain_verbose is true.
per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
/// Current memory usage for file range builders.
build_ranges_mem_size: isize,
/// Peak memory usage for file range builders.
build_ranges_peak_mem_size: isize,
/// Current number of file range builders.
num_range_builders: isize,
/// Peak number of file range builders.
num_peak_range_builders: isize,
}
/// Wrapper for file metrics that compares by total cost in reverse order.
@@ -313,6 +322,10 @@ impl fmt::Debug for ScanMetricsSet {
fetch_metrics,
metadata_cache_metrics,
per_file_metrics,
build_ranges_mem_size: _,
build_ranges_peak_mem_size,
num_range_builders: _,
num_peak_range_builders,
} = self;
// Write core metrics
@@ -534,7 +547,12 @@ impl fmt::Debug for ScanMetricsSet {
write!(f, "}}")?;
}
write!(f, ", \"stream_eof\":{stream_eof}}}")
write!(
f,
", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \
\"num_peak_range_builders\":{num_peak_range_builders}, \
\"stream_eof\":{stream_eof}}}"
)
}
}
impl ScanMetricsSet {
@@ -599,6 +617,8 @@ impl ScanMetricsSet {
scan_cost,
metadata_cache_metrics,
fetch_metrics,
metadata_mem_size,
num_range_builders,
} = other;
self.build_parts_cost += *build_cost;
@@ -653,6 +673,18 @@ impl ScanMetricsSet {
self.metadata_cache_metrics
.get_or_insert_with(MetadataCacheMetrics::default)
.merge_from(metadata_cache_metrics);
// Track memory usage and update peak.
self.build_ranges_mem_size += *metadata_mem_size;
if self.build_ranges_mem_size > self.build_ranges_peak_mem_size {
self.build_ranges_peak_mem_size = self.build_ranges_mem_size;
}
// Track number of builders and update peak.
self.num_range_builders += *num_range_builders;
if self.num_range_builders > self.num_peak_range_builders {
self.num_peak_range_builders = self.num_range_builders;
}
}
/// Merges per-file metrics.

View File

@@ -41,7 +41,7 @@ use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeReader;
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::{RangeBuilderList, RangeMeta};
use crate::read::range::{RangeBuilderList, RangeMeta, file_range_counts};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
@@ -170,9 +170,15 @@ impl SeqScan {
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
let mut sources = Vec::new();
let range_builder_list = Arc::new(RangeBuilderList::new(
let counts = file_range_counts(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
&stream_ctx.ranges,
partition_ranges.iter(),
);
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
counts,
));
for part_range in partition_ranges {
build_sources(
@@ -204,9 +210,15 @@ impl SeqScan {
part_metrics: &PartitionMetrics,
) -> Result<BoxedRecordBatchStream> {
let mut sources = Vec::new();
let range_builder_list = Arc::new(RangeBuilderList::new(
let counts = file_range_counts(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
&stream_ctx.ranges,
partition_ranges.iter(),
);
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
counts,
));
for part_range in partition_ranges {
build_flat_sources(
@@ -414,9 +426,15 @@ impl SeqScan {
// build part cost.
let mut fetch_start = Instant::now();
let range_builder_list = Arc::new(RangeBuilderList::new(
let counts = file_range_counts(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
&stream_ctx.ranges,
partition_ranges.iter(),
);
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
counts,
));
let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
reason: "Unexpected format",
@@ -529,9 +547,15 @@ impl SeqScan {
// build part cost.
let mut fetch_start = Instant::now();
let range_builder_list = Arc::new(RangeBuilderList::new(
let counts = file_range_counts(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
&stream_ctx.ranges,
partition_ranges.iter(),
);
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
counts,
));
// Scans each part.
for part_range in partition_ranges {

View File

@@ -43,7 +43,7 @@ use crate::error::{
Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
ScanSeriesSnafu, TooManyFilesToReadSnafu,
};
use crate::read::range::RangeBuilderList;
use crate::read::range::{RangeBuilderList, file_range_counts};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
@@ -450,9 +450,15 @@ impl SeriesDistributor {
// build part cost.
let mut fetch_start = Instant::now();
let range_builder_list = Arc::new(RangeBuilderList::new(
let counts = file_range_counts(
self.stream_ctx.input.num_memtables(),
self.stream_ctx.input.num_files(),
&self.stream_ctx.ranges,
self.partitions.iter().flatten(),
);
let range_builder_list = Arc::new(RangeBuilderList::new(
self.stream_ctx.input.num_memtables(),
counts,
));
// Scans all parts.
let mut sources = Vec::with_capacity(self.partitions.len());
@@ -548,9 +554,15 @@ impl SeriesDistributor {
// build part cost.
let mut fetch_start = Instant::now();
let range_builder_list = Arc::new(RangeBuilderList::new(
let counts = file_range_counts(
self.stream_ctx.input.num_memtables(),
self.stream_ctx.input.num_files(),
&self.stream_ctx.ranges,
self.partitions.iter().flatten(),
);
let range_builder_list = Arc::new(RangeBuilderList::new(
self.stream_ctx.input.num_memtables(),
counts,
));
// Scans all parts.
let mut sources = Vec::with_capacity(self.partitions.len());

View File

@@ -34,7 +34,7 @@ use store_api::region_engine::{
};
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::range::RangeBuilderList;
use crate::read::range::{RangeBuilderList, file_range_counts};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
@@ -305,9 +305,15 @@ impl UnorderedScan {
let stream = try_stream! {
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
let counts = file_range_counts(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
&stream_ctx.ranges,
part_ranges.iter(),
);
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
counts,
));
// Scans each part.
for part_range in part_ranges {
@@ -395,9 +401,15 @@ impl UnorderedScan {
let stream = try_stream! {
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
let counts = file_range_counts(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
&stream_ctx.ranges,
part_ranges.iter(),
);
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
counts,
));
// Scans each part.
for part_range in part_ranges {

View File

@@ -363,6 +363,12 @@ impl FileRangeContext {
let metadata = self.reader_builder.parquet_metadata();
row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
}
/// Returns the estimated memory size of this context.
/// Mainly accounts for the parquet metadata size.
pub(crate) fn memory_size(&self) -> usize {
crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
}
}
/// Mode to pre-filter columns in a range.

View File

@@ -1502,6 +1502,10 @@ pub struct ReaderMetrics {
pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
/// Optional metrics for page/row group fetch operations.
pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
/// Memory size of metadata loaded for building file ranges.
pub(crate) metadata_mem_size: isize,
/// Number of file range builders created.
pub(crate) num_range_builders: isize,
}
impl ReaderMetrics {
@@ -1522,6 +1526,8 @@ impl ReaderMetrics {
self.fetch_metrics = Some(other_fetch.clone());
}
}
self.metadata_mem_size += other.metadata_mem_size;
self.num_range_builders += other.num_range_builders;
}
/// Reports total rows.