diff --git a/src/mito2/benches/simple_bulk_memtable.rs b/src/mito2/benches/simple_bulk_memtable.rs index 9aa55893a1..3819757699 100644 --- a/src/mito2/benches/simple_bulk_memtable.rs +++ b/src/mito2/benches/simple_bulk_memtable.rs @@ -20,7 +20,7 @@ use criterion::{Criterion, black_box, criterion_group, criterion_main}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable; -use mito2::memtable::{KeyValues, Memtable, MemtableRanges}; +use mito2::memtable::{KeyValues, Memtable, MemtableRanges, RangesOptions}; use mito2::read; use mito2::read::Source; use mito2::read::dedup::DedupReader; @@ -127,7 +127,12 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable { async fn flush(mem: &SimpleBulkMemtable) { let MemtableRanges { ranges, .. } = mem - .ranges(None, PredicateGroup::default(), None, true) + .ranges( + None, + PredicateGroup::default(), + None, + RangesOptions::for_flush(), + ) .unwrap(); let mut source = if ranges.len() == 1 { diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 0b0b4b05db..603ba06161 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -40,7 +40,9 @@ use crate::error::{ RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges}; +use crate::memtable::{ + BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions, +}; use crate::metrics::{ FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL, INFLIGHT_FLUSH_COUNT, @@ -459,7 +461,12 @@ impl RegionFlushTask { flush_metrics.compact_memtable += compact_cost; // Sets `for_flush` flag to true. - let mem_ranges = mem.ranges(None, PredicateGroup::default(), None, true)?; + let mem_ranges = mem.ranges( + None, + PredicateGroup::default(), + None, + RangesOptions::for_flush(), + )?; let num_mem_ranges = mem_ranges.ranges.len(); let num_mem_rows = mem_ranges.stats.num_rows(); let memtable_id = mem.id(); diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index b4461e8b06..a5dda5bfe0 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -44,6 +44,7 @@ use crate::region::options::{MemtableOptions, MergeMode, RegionOptions}; use crate::sst::FormatType; use crate::sst::file::FileTimeRange; use crate::sst::parquet::SstInfo; +use crate::sst::parquet::file_range::PreFilterMode; mod builder; pub mod bulk; @@ -73,6 +74,41 @@ pub enum MemtableConfig { TimeSeries, } +/// Options for querying ranges from a memtable. +#[derive(Debug, Clone, Copy)] +pub struct RangesOptions { + /// Whether the ranges are being queried for flush. + pub for_flush: bool, + /// Mode to pre-filter columns in ranges. + pub pre_filter_mode: PreFilterMode, +} + +impl Default for RangesOptions { + fn default() -> Self { + Self { + for_flush: false, + pre_filter_mode: PreFilterMode::All, + } + } +} + +impl RangesOptions { + /// Creates a new [RangesOptions] for flushing. + pub fn for_flush() -> Self { + Self { + for_flush: true, + pre_filter_mode: PreFilterMode::All, + } + } + + /// Sets the pre-filter mode. + #[must_use] + pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self { + self.pre_filter_mode = pre_filter_mode; + self + } +} + #[derive(Debug, Default, Clone)] pub struct MemtableStats { /// The estimated bytes allocated by this memtable from heap. @@ -191,14 +227,13 @@ pub trait Memtable: Send + Sync + fmt::Debug { /// Returns the ranges in the memtable. /// - /// The `for_flush` flag is true if the flush job calls this method for flush. /// The returned map contains the range id and the range after applying the predicate. fn ranges( &self, projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, - for_flush: bool, + options: RangesOptions, ) -> Result; /// Returns true if the memtable is empty. diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 750b34e1e0..a5bd480790 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -43,6 +43,7 @@ use crate::memtable::{ AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup, + RangesOptions, }; use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeIterator; @@ -332,17 +333,18 @@ impl Memtable for BulkMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, - for_flush: bool, + options: RangesOptions, ) -> Result { let mut ranges = BTreeMap::new(); let mut range_id = 0; // TODO(yingwen): Filter ranges by sequence. - let context = Arc::new(BulkIterContext::new( + let context = Arc::new(BulkIterContext::new_with_pre_filter_mode( self.metadata.clone(), projection, predicate.predicate().cloned(), - for_flush, + options.for_flush, + options.pre_filter_mode, )?); // Adds ranges for regular parts and encoded parts @@ -1188,7 +1190,9 @@ mod tests { assert_eq!(3000, max_ts.value()); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); - let ranges = memtable.ranges(None, predicate_group, None, false).unwrap(); + let ranges = memtable + .ranges(None, predicate_group, None, RangesOptions::default()) + .unwrap(); assert_eq!(3, ranges.ranges.len()); assert_eq!(5, ranges.stats.num_rows); @@ -1230,7 +1234,12 @@ mod tests { let projection = vec![4u32]; let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); let ranges = memtable - .ranges(Some(&projection), predicate_group, None, false) + .ranges( + Some(&projection), + predicate_group, + None, + RangesOptions::default(), + ) .unwrap(); assert_eq!(1, ranges.ranges.len()); @@ -1346,7 +1355,9 @@ mod tests { } let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); - let ranges = memtable.ranges(None, predicate_group, None, false).unwrap(); + let ranges = memtable + .ranges(None, predicate_group, None, RangesOptions::default()) + .unwrap(); assert_eq!(3, ranges.ranges.len()); assert_eq!(5, ranges.stats.num_rows); @@ -1379,7 +1390,12 @@ mod tests { let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400 let ranges = memtable - .ranges(None, predicate_group, sequence_filter, false) + .ranges( + None, + predicate_group, + sequence_filter, + RangesOptions::default(), + ) .unwrap(); assert_eq!(1, ranges.ranges.len()); @@ -1411,7 +1427,9 @@ mod tests { memtable.compact(false).unwrap(); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); - let ranges = memtable.ranges(None, predicate_group, None, false).unwrap(); + let ranges = memtable + .ranges(None, predicate_group, None, RangesOptions::default()) + .unwrap(); // Should have ranges for both bulk parts and encoded parts assert_eq!(3, ranges.ranges.len()); diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 31cadac4f1..b25ca68b4a 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -44,7 +44,7 @@ use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, - MemtableStats, PredicateGroup, + MemtableStats, PredicateGroup, RangesOptions, }; use crate::region::options::MergeMode; @@ -192,7 +192,7 @@ impl Memtable for PartitionTreeMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, - _for_flush: bool, + _options: RangesOptions, ) -> Result { let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index cd7d9bdf5c..d4feeaff03 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -35,7 +35,7 @@ use crate::memtable::stats::WriteMetrics; use crate::memtable::time_series::Series; use crate::memtable::{ AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId, - MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, + MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions, }; use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT; use crate::read::Batch; @@ -240,7 +240,7 @@ impl Memtable for SimpleBulkMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, - _for_flush: bool, + _options: RangesOptions, ) -> error::Result { let start_time = Instant::now(); let projection = Arc::new(self.build_projection(projection)); @@ -618,7 +618,12 @@ mod tests { memtable.write_one(kv).unwrap(); let ranges = memtable - .ranges(None, PredicateGroup::default(), None, false) + .ranges( + None, + PredicateGroup::default(), + None, + RangesOptions::default(), + ) .unwrap(); let mut source = vec![]; for r in ranges.ranges.values() { @@ -652,7 +657,12 @@ mod tests { memtable.freeze().unwrap(); let ranges = memtable - .ranges(None, PredicateGroup::default(), None, false) + .ranges( + None, + PredicateGroup::default(), + None, + RangesOptions::default(), + ) .unwrap(); let mut source = vec![]; for r in ranges.ranges.values() { @@ -695,7 +705,12 @@ mod tests { memtable.freeze().unwrap(); let ranges = memtable - .ranges(None, PredicateGroup::default(), None, false) + .ranges( + None, + PredicateGroup::default(), + None, + RangesOptions::default(), + ) .unwrap(); assert_eq!(ranges.ranges.len(), 1); let range = ranges.ranges.into_values().next().unwrap(); @@ -911,7 +926,12 @@ mod tests { }) .unwrap(); let MemtableRanges { ranges, .. } = memtable - .ranges(None, PredicateGroup::default(), None, false) + .ranges( + None, + PredicateGroup::default(), + None, + RangesOptions::default(), + ) .unwrap(); let mut source = if ranges.len() == 1 { let only_range = ranges.into_values().next().unwrap(); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 60fe2f0bcd..59e0c6cb49 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -53,7 +53,7 @@ use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, - MemtableStats, PredicateGroup, + MemtableStats, PredicateGroup, RangesOptions, }; use crate::metrics::{ MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL, @@ -305,7 +305,7 @@ impl Memtable for TimeSeriesMemtable { projection: Option<&[ColumnId]>, predicate: PredicateGroup, sequence: Option, - _for_flush: bool, + _options: RangesOptions, ) -> Result { let projection = if let Some(projection) = projection { projection.iter().copied().collect() diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 1e9b7f74c9..6e031aaf92 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -48,7 +48,7 @@ use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE use crate::error::{InvalidPartitionExprSnafu, Result}; #[cfg(feature = "enterprise")] use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider}; -use crate::memtable::MemtableRange; +use crate::memtable::{MemtableRange, RangesOptions}; use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch}; use crate::read::projection::ProjectionMapper; @@ -427,6 +427,10 @@ impl ScanRegion { let memtables = self.version.memtables.list_memtables(); // Skip empty memtables and memtables out of time range. let mut mem_range_builders = Vec::new(); + let filter_mode = pre_filter_mode( + self.version.options.append_mode, + self.version.options.merge_mode(), + ); for m in memtables { // check if memtable is empty by reading stats. @@ -445,7 +449,7 @@ impl ScanRegion { self.request.memtable_min_sequence, self.request.memtable_max_sequence, ), - false, + RangesOptions::default().with_pre_filter_mode(filter_mode), )?; mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| { // todo: we should add stats to MemtableRange @@ -950,17 +954,6 @@ impl ScanInput { } } - fn pre_filter_mode(&self) -> PreFilterMode { - if self.append_mode { - return PreFilterMode::All; - } - - match self.merge_mode { - MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete, - MergeMode::LastNonNull => PreFilterMode::SkipFields, - } - } - /// Prunes a file to scan and returns the builder to build readers. pub async fn prune_file( &self, @@ -968,7 +961,7 @@ impl ScanInput { reader_metrics: &mut ReaderMetrics, ) -> Result { let predicate = self.predicate_for_file(file); - let filter_mode = self.pre_filter_mode(); + let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode); let res = self .access_layer .read_sst(file.clone()) @@ -1173,6 +1166,17 @@ impl ScanInput { } } +fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode { + if append_mode { + return PreFilterMode::All; + } + + match merge_mode { + MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete, + MergeMode::LastNonNull => PreFilterMode::SkipFields, + } +} + /// Context shared by different streams from a scanner. /// It contains the input and ranges to scan. pub struct StreamContext { diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index d4e9324c87..689a8de599 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -295,8 +295,8 @@ impl FileRangeContext { } /// Mode to pre-filter columns in a range. -#[derive(Clone, Copy)] -pub(crate) enum PreFilterMode { +#[derive(Debug, Clone, Copy)] +pub enum PreFilterMode { /// Filters all columns. All, /// If the range doesn't contain delete op or doesn't have statistics, filters all columns. diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 2174ac7b9f..511fc6bf9e 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -38,7 +38,7 @@ use crate::memtable::bulk::part::BulkPart; use crate::memtable::partition_tree::data::{DataBatch, DataBuffer, timestamp_array_to_i64_slice}; use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, - MemtableRef, MemtableStats, + MemtableRef, MemtableStats, RangesOptions, }; use crate::read::scan_region::PredicateGroup; @@ -99,7 +99,7 @@ impl Memtable for EmptyMemtable { _projection: Option<&[ColumnId]>, _predicate: PredicateGroup, _sequence: Option, - _for_flush: bool, + _options: RangesOptions, ) -> Result { Ok(MemtableRanges::default()) }