From 7a8bf3de4ba9792675243afd7242f9db90ecae23 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 30 Oct 2025 15:58:44 +0800 Subject: [PATCH] refactor: move predicate and sequence to RangesOptions Signed-off-by: evenyag --- src/mito2/benches/simple_bulk_memtable.rs | 8 +---- src/mito2/src/flush.rs | 8 +---- src/mito2/src/memtable.rs | 26 ++++++++++++-- src/mito2/src/memtable/bulk.rs | 32 ++++++++++------- src/mito2/src/memtable/partition_tree.rs | 8 ++--- .../src/memtable/simple_bulk_memtable.rs | 35 ++++--------------- src/mito2/src/memtable/time_series.rs | 8 ++--- src/mito2/src/read/scan_region.rs | 13 +++---- src/mito2/src/test_util/memtable_util.rs | 2 -- 9 files changed, 66 insertions(+), 74 deletions(-) diff --git a/src/mito2/benches/simple_bulk_memtable.rs b/src/mito2/benches/simple_bulk_memtable.rs index 3819757699..50968eae2f 100644 --- a/src/mito2/benches/simple_bulk_memtable.rs +++ b/src/mito2/benches/simple_bulk_memtable.rs @@ -25,7 +25,6 @@ use mito2::read; use mito2::read::Source; use mito2::read::dedup::DedupReader; use mito2::read::merge::MergeReaderBuilder; -use mito2::read::scan_region::PredicateGroup; use mito2::region::options::MergeMode; use mito2::test_util::column_metadata_to_column_schema; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; @@ -127,12 +126,7 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable { async fn flush(mem: &SimpleBulkMemtable) { let MemtableRanges { ranges, .. } = mem - .ranges( - None, - PredicateGroup::default(), - None, - RangesOptions::for_flush(), - ) + .ranges(None, RangesOptions::for_flush()) .unwrap(); let mut source = if ranges.len() == 1 { diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 603ba06161..59f4d1cda5 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -51,7 +51,6 @@ use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeIterator; use crate::read::merge::MergeReaderBuilder; -use crate::read::scan_region::PredicateGroup; use crate::read::{FlatSource, Source}; use crate::region::options::{IndexOptions, MergeMode, RegionOptions}; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; @@ -461,12 +460,7 @@ impl RegionFlushTask { flush_metrics.compact_memtable += compact_cost; // Sets `for_flush` flag to true. - let mem_ranges = mem.ranges( - None, - PredicateGroup::default(), - None, - RangesOptions::for_flush(), - )?; + let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?; let num_mem_ranges = mem_ranges.ranges.len(); let num_mem_rows = mem_ranges.stats.num_rows(); let memtable_id = mem.id(); diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index a5dda5bfe0..2d430b2356 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -75,12 +75,16 @@ pub enum MemtableConfig { } /// Options for querying ranges from a memtable. -#[derive(Debug, Clone, Copy)] +#[derive(Clone)] pub struct RangesOptions { /// Whether the ranges are being queried for flush. pub for_flush: bool, /// Mode to pre-filter columns in ranges. pub pre_filter_mode: PreFilterMode, + /// Predicate to filter the data. + pub predicate: PredicateGroup, + /// Sequence range to filter the data. + pub sequence: Option, } impl Default for RangesOptions { @@ -88,6 +92,8 @@ impl Default for RangesOptions { Self { for_flush: false, pre_filter_mode: PreFilterMode::All, + predicate: PredicateGroup::default(), + sequence: None, } } } @@ -98,6 +104,8 @@ impl RangesOptions { Self { for_flush: true, pre_filter_mode: PreFilterMode::All, + predicate: PredicateGroup::default(), + sequence: None, } } @@ -107,6 +115,20 @@ impl RangesOptions { self.pre_filter_mode = pre_filter_mode; self } + + /// Sets the predicate. + #[must_use] + pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self { + self.predicate = predicate; + self + } + + /// Sets the sequence range. + #[must_use] + pub fn with_sequence(mut self, sequence: Option) -> Self { + self.sequence = sequence; + self + } } #[derive(Debug, Default, Clone)] @@ -231,8 +253,6 @@ pub trait Memtable: Send + Sync + fmt::Debug { fn ranges( &self, projection: Option<&[ColumnId]>, - predicate: PredicateGroup, - sequence: Option, options: RangesOptions, ) -> Result; diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index a5bd480790..2ffdad3478 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -42,8 +42,7 @@ use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, - MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup, - RangesOptions, + MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions, }; use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeIterator; @@ -331,10 +330,10 @@ impl Memtable for BulkMemtable { fn ranges( &self, projection: Option<&[ColumnId]>, - predicate: PredicateGroup, - sequence: Option, options: RangesOptions, ) -> Result { + let predicate = options.predicate; + let sequence = options.sequence; let mut ranges = BTreeMap::new(); let mut range_id = 0; @@ -1191,7 +1190,10 @@ mod tests { let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); let ranges = memtable - .ranges(None, predicate_group, None, RangesOptions::default()) + .ranges( + None, + RangesOptions::default().with_predicate(predicate_group), + ) .unwrap(); assert_eq!(3, ranges.ranges.len()); @@ -1236,9 +1238,7 @@ mod tests { let ranges = memtable .ranges( Some(&projection), - predicate_group, - None, - RangesOptions::default(), + RangesOptions::default().with_predicate(predicate_group), ) .unwrap(); @@ -1356,7 +1356,10 @@ mod tests { let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); let ranges = memtable - .ranges(None, predicate_group, None, RangesOptions::default()) + .ranges( + None, + RangesOptions::default().with_predicate(predicate_group), + ) .unwrap(); assert_eq!(3, ranges.ranges.len()); @@ -1392,9 +1395,9 @@ mod tests { let ranges = memtable .ranges( None, - predicate_group, - sequence_filter, - RangesOptions::default(), + RangesOptions::default() + .with_predicate(predicate_group) + .with_sequence(sequence_filter), ) .unwrap(); @@ -1428,7 +1431,10 @@ mod tests { let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); let ranges = memtable - .ranges(None, predicate_group, None, RangesOptions::default()) + .ranges( + None, + RangesOptions::default().with_predicate(predicate_group), + ) .unwrap(); // Should have ranges for both bulk parts and encoded parts diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index b25ca68b4a..8ddd687053 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -44,7 +44,7 @@ use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, - MemtableStats, PredicateGroup, RangesOptions, + MemtableStats, RangesOptions, }; use crate::region::options::MergeMode; @@ -190,10 +190,10 @@ impl Memtable for PartitionTreeMemtable { fn ranges( &self, projection: Option<&[ColumnId]>, - predicate: PredicateGroup, - sequence: Option, - _options: RangesOptions, + options: RangesOptions, ) -> Result { + let predicate = options.predicate; + let sequence = options.sequence; let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { tree: self.tree.clone(), diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index d4feeaff03..fe2cace388 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -40,7 +40,6 @@ use crate::memtable::{ use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT; use crate::read::Batch; use crate::read::dedup::LastNonNullIter; -use crate::read::scan_region::PredicateGroup; use crate::region::options::MergeMode; use crate::{error, metrics}; @@ -238,10 +237,10 @@ impl Memtable for SimpleBulkMemtable { fn ranges( &self, projection: Option<&[ColumnId]>, - predicate: PredicateGroup, - sequence: Option, - _options: RangesOptions, + options: RangesOptions, ) -> error::Result { + let predicate = options.predicate; + let sequence = options.sequence; let start_time = Instant::now(); let projection = Arc::new(self.build_projection(projection)); let values = self.series.read().unwrap().read_to_values(); @@ -618,12 +617,7 @@ mod tests { memtable.write_one(kv).unwrap(); let ranges = memtable - .ranges( - None, - PredicateGroup::default(), - None, - RangesOptions::default(), - ) + .ranges(None, RangesOptions::default()) .unwrap(); let mut source = vec![]; for r in ranges.ranges.values() { @@ -657,12 +651,7 @@ mod tests { memtable.freeze().unwrap(); let ranges = memtable - .ranges( - None, - PredicateGroup::default(), - None, - RangesOptions::default(), - ) + .ranges(None, RangesOptions::default()) .unwrap(); let mut source = vec![]; for r in ranges.ranges.values() { @@ -705,12 +694,7 @@ mod tests { memtable.freeze().unwrap(); let ranges = memtable - .ranges( - None, - PredicateGroup::default(), - None, - RangesOptions::default(), - ) + .ranges(None, RangesOptions::default()) .unwrap(); assert_eq!(ranges.ranges.len(), 1); let range = ranges.ranges.into_values().next().unwrap(); @@ -926,12 +910,7 @@ mod tests { }) .unwrap(); let MemtableRanges { ranges, .. } = memtable - .ranges( - None, - PredicateGroup::default(), - None, - RangesOptions::default(), - ) + .ranges(None, RangesOptions::default()) .unwrap(); let mut source = if ranges.len() == 1 { let only_range = ranges.into_values().next().unwrap(); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 59e0c6cb49..1de5c64614 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -53,7 +53,7 @@ use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, - MemtableStats, PredicateGroup, RangesOptions, + MemtableStats, RangesOptions, }; use crate::metrics::{ MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL, @@ -303,10 +303,10 @@ impl Memtable for TimeSeriesMemtable { fn ranges( &self, projection: Option<&[ColumnId]>, - predicate: PredicateGroup, - sequence: Option, - _options: RangesOptions, + options: RangesOptions, ) -> Result { + let predicate = options.predicate; + let sequence = options.sequence; let projection = if let Some(projection) = projection { projection.iter().copied().collect() } else { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index aa18fa47d3..aa158389e0 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -444,12 +444,13 @@ impl ScanRegion { } let ranges_in_memtable = m.ranges( Some(mapper.column_ids()), - predicate.clone(), - SequenceRange::new( - self.request.memtable_min_sequence, - self.request.memtable_max_sequence, - ), - RangesOptions::default().with_pre_filter_mode(filter_mode), + RangesOptions::default() + .with_predicate(predicate.clone()) + .with_sequence(SequenceRange::new( + self.request.memtable_min_sequence, + self.request.memtable_max_sequence, + )) + .with_pre_filter_mode(filter_mode), )?; mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| { // todo: we should add stats to MemtableRange diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 511fc6bf9e..a3811f013f 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -97,8 +97,6 @@ impl Memtable for EmptyMemtable { fn ranges( &self, _projection: Option<&[ColumnId]>, - _predicate: PredicateGroup, - _sequence: Option, _options: RangesOptions, ) -> Result { Ok(MemtableRanges::default())