feat: pass PreFilterMode to memtable

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-10-28 21:33:22 +08:00
parent d1372092ae
commit 98752f4b47
10 changed files with 131 additions and 42 deletions

View File

@@ -20,7 +20,7 @@ use criterion::{Criterion, black_box, criterion_group, criterion_main};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable;
use mito2::memtable::{KeyValues, Memtable, MemtableRanges};
use mito2::memtable::{KeyValues, Memtable, MemtableRanges, RangesOptions};
use mito2::read;
use mito2::read::Source;
use mito2::read::dedup::DedupReader;
@@ -127,7 +127,12 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable {
async fn flush(mem: &SimpleBulkMemtable) {
let MemtableRanges { ranges, .. } = mem
.ranges(None, PredicateGroup::default(), None, true)
.ranges(
None,
PredicateGroup::default(),
None,
RangesOptions::for_flush(),
)
.unwrap();
let mut source = if ranges.len() == 1 {

View File

@@ -40,7 +40,9 @@ use crate::error::{
RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges};
use crate::memtable::{
BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
};
use crate::metrics::{
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
INFLIGHT_FLUSH_COUNT,
@@ -459,7 +461,12 @@ impl RegionFlushTask {
flush_metrics.compact_memtable += compact_cost;
// Sets `for_flush` flag to true.
let mem_ranges = mem.ranges(None, PredicateGroup::default(), None, true)?;
let mem_ranges = mem.ranges(
None,
PredicateGroup::default(),
None,
RangesOptions::for_flush(),
)?;
let num_mem_ranges = mem_ranges.ranges.len();
let num_mem_rows = mem_ranges.stats.num_rows();
let memtable_id = mem.id();

View File

@@ -44,6 +44,7 @@ use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
use crate::sst::FormatType;
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::SstInfo;
use crate::sst::parquet::file_range::PreFilterMode;
mod builder;
pub mod bulk;
@@ -73,6 +74,41 @@ pub enum MemtableConfig {
TimeSeries,
}
/// Options for querying ranges from a memtable.
#[derive(Debug, Clone, Copy)]
pub struct RangesOptions {
/// Whether the ranges are being queried for flush.
pub for_flush: bool,
/// Mode to pre-filter columns in ranges.
pub pre_filter_mode: PreFilterMode,
}
impl Default for RangesOptions {
fn default() -> Self {
Self {
for_flush: false,
pre_filter_mode: PreFilterMode::All,
}
}
}
impl RangesOptions {
/// Creates a new [RangesOptions] for flushing.
pub fn for_flush() -> Self {
Self {
for_flush: true,
pre_filter_mode: PreFilterMode::All,
}
}
/// Sets the pre-filter mode.
#[must_use]
pub fn with_pre_filter_mode(mut self, pre_filter_mode: PreFilterMode) -> Self {
self.pre_filter_mode = pre_filter_mode;
self
}
}
#[derive(Debug, Default, Clone)]
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
@@ -191,14 +227,13 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the ranges in the memtable.
///
/// The `for_flush` flag is true if the flush job calls this method for flush.
/// The returned map contains the range id and the range after applying the predicate.
fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
for_flush: bool,
options: RangesOptions,
) -> Result<MemtableRanges>;
/// Returns true if the memtable is empty.

View File

@@ -43,6 +43,7 @@ use crate::memtable::{
AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
RangesOptions,
};
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeIterator;
@@ -332,17 +333,18 @@ impl Memtable for BulkMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
for_flush: bool,
options: RangesOptions,
) -> Result<MemtableRanges> {
let mut ranges = BTreeMap::new();
let mut range_id = 0;
// TODO(yingwen): Filter ranges by sequence.
let context = Arc::new(BulkIterContext::new(
let context = Arc::new(BulkIterContext::new_with_pre_filter_mode(
self.metadata.clone(),
projection,
predicate.predicate().cloned(),
for_flush,
options.for_flush,
options.pre_filter_mode,
)?);
// Adds ranges for regular parts and encoded parts
@@ -1188,7 +1190,9 @@ mod tests {
assert_eq!(3000, max_ts.value());
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
let ranges = memtable
.ranges(None, predicate_group, None, RangesOptions::default())
.unwrap();
assert_eq!(3, ranges.ranges.len());
assert_eq!(5, ranges.stats.num_rows);
@@ -1230,7 +1234,12 @@ mod tests {
let projection = vec![4u32];
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable
.ranges(Some(&projection), predicate_group, None, false)
.ranges(
Some(&projection),
predicate_group,
None,
RangesOptions::default(),
)
.unwrap();
assert_eq!(1, ranges.ranges.len());
@@ -1346,7 +1355,9 @@ mod tests {
}
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
let ranges = memtable
.ranges(None, predicate_group, None, RangesOptions::default())
.unwrap();
assert_eq!(3, ranges.ranges.len());
assert_eq!(5, ranges.stats.num_rows);
@@ -1379,7 +1390,12 @@ mod tests {
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
let ranges = memtable
.ranges(None, predicate_group, sequence_filter, false)
.ranges(
None,
predicate_group,
sequence_filter,
RangesOptions::default(),
)
.unwrap();
assert_eq!(1, ranges.ranges.len());
@@ -1411,7 +1427,9 @@ mod tests {
memtable.compact(false).unwrap();
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let ranges = memtable.ranges(None, predicate_group, None, false).unwrap();
let ranges = memtable
.ranges(None, predicate_group, None, RangesOptions::default())
.unwrap();
// Should have ranges for both bulk parts and encoded parts
assert_eq!(3, ranges.ranges.len());

View File

@@ -44,7 +44,7 @@ use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
MemtableStats, PredicateGroup,
MemtableStats, PredicateGroup, RangesOptions,
};
use crate::region::options::MergeMode;
@@ -192,7 +192,7 @@ impl Memtable for PartitionTreeMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
_for_flush: bool,
_options: RangesOptions,
) -> Result<MemtableRanges> {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {

View File

@@ -35,7 +35,7 @@ use crate::memtable::stats::WriteMetrics;
use crate::memtable::time_series::Series;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableId,
MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
};
use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
use crate::read::Batch;
@@ -240,7 +240,7 @@ impl Memtable for SimpleBulkMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
_for_flush: bool,
_options: RangesOptions,
) -> error::Result<MemtableRanges> {
let start_time = Instant::now();
let projection = Arc::new(self.build_projection(projection));
@@ -618,7 +618,12 @@ mod tests {
memtable.write_one(kv).unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None, false)
.ranges(
None,
PredicateGroup::default(),
None,
RangesOptions::default(),
)
.unwrap();
let mut source = vec![];
for r in ranges.ranges.values() {
@@ -652,7 +657,12 @@ mod tests {
memtable.freeze().unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None, false)
.ranges(
None,
PredicateGroup::default(),
None,
RangesOptions::default(),
)
.unwrap();
let mut source = vec![];
for r in ranges.ranges.values() {
@@ -695,7 +705,12 @@ mod tests {
memtable.freeze().unwrap();
let ranges = memtable
.ranges(None, PredicateGroup::default(), None, false)
.ranges(
None,
PredicateGroup::default(),
None,
RangesOptions::default(),
)
.unwrap();
assert_eq!(ranges.ranges.len(), 1);
let range = ranges.ranges.into_values().next().unwrap();
@@ -911,7 +926,12 @@ mod tests {
})
.unwrap();
let MemtableRanges { ranges, .. } = memtable
.ranges(None, PredicateGroup::default(), None, false)
.ranges(
None,
PredicateGroup::default(),
None,
RangesOptions::default(),
)
.unwrap();
let mut source = if ranges.len() == 1 {
let only_range = ranges.into_values().next().unwrap();

View File

@@ -53,7 +53,7 @@ use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
MemtableStats, PredicateGroup,
MemtableStats, PredicateGroup, RangesOptions,
};
use crate::metrics::{
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
@@ -305,7 +305,7 @@ impl Memtable for TimeSeriesMemtable {
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceRange>,
_for_flush: bool,
_options: RangesOptions,
) -> Result<MemtableRanges> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()

View File

@@ -48,7 +48,7 @@ use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE
use crate::error::{InvalidPartitionExprSnafu, Result};
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
use crate::memtable::MemtableRange;
use crate::memtable::{MemtableRange, RangesOptions};
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
use crate::read::projection::ProjectionMapper;
@@ -427,6 +427,10 @@ impl ScanRegion {
let memtables = self.version.memtables.list_memtables();
// Skip empty memtables and memtables out of time range.
let mut mem_range_builders = Vec::new();
let filter_mode = pre_filter_mode(
self.version.options.append_mode,
self.version.options.merge_mode(),
);
for m in memtables {
// check if memtable is empty by reading stats.
@@ -445,7 +449,7 @@ impl ScanRegion {
self.request.memtable_min_sequence,
self.request.memtable_max_sequence,
),
false,
RangesOptions::default().with_pre_filter_mode(filter_mode),
)?;
mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
// todo: we should add stats to MemtableRange
@@ -950,17 +954,6 @@ impl ScanInput {
}
}
fn pre_filter_mode(&self) -> PreFilterMode {
if self.append_mode {
return PreFilterMode::All;
}
match self.merge_mode {
MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
MergeMode::LastNonNull => PreFilterMode::SkipFields,
}
}
/// Prunes a file to scan and returns the builder to build readers.
pub async fn prune_file(
&self,
@@ -968,7 +961,7 @@ impl ScanInput {
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let predicate = self.predicate_for_file(file);
let filter_mode = self.pre_filter_mode();
let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
let res = self
.access_layer
.read_sst(file.clone())
@@ -1173,6 +1166,17 @@ impl ScanInput {
}
}
fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
if append_mode {
return PreFilterMode::All;
}
match merge_mode {
MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
MergeMode::LastNonNull => PreFilterMode::SkipFields,
}
}
/// Context shared by different streams from a scanner.
/// It contains the input and ranges to scan.
pub struct StreamContext {

View File

@@ -295,8 +295,8 @@ impl FileRangeContext {
}
/// Mode to pre-filter columns in a range.
#[derive(Clone, Copy)]
pub(crate) enum PreFilterMode {
#[derive(Debug, Clone, Copy)]
pub enum PreFilterMode {
/// Filters all columns.
All,
/// If the range doesn't contain delete op or doesn't have statistics, filters all columns.

View File

@@ -38,7 +38,7 @@ use crate::memtable::bulk::part::BulkPart;
use crate::memtable::partition_tree::data::{DataBatch, DataBuffer, timestamp_array_to_i64_slice};
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
MemtableRef, MemtableStats,
MemtableRef, MemtableStats, RangesOptions,
};
use crate::read::scan_region::PredicateGroup;
@@ -99,7 +99,7 @@ impl Memtable for EmptyMemtable {
_projection: Option<&[ColumnId]>,
_predicate: PredicateGroup,
_sequence: Option<SequenceRange>,
_for_flush: bool,
_options: RangesOptions,
) -> Result<MemtableRanges> {
Ok(MemtableRanges::default())
}