mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 15:00:40 +00:00
refactor: move predicate and sequence to RangesOptions
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -25,7 +25,6 @@ use mito2::read;
|
||||
use mito2::read::Source;
|
||||
use mito2::read::dedup::DedupReader;
|
||||
use mito2::read::merge::MergeReaderBuilder;
|
||||
use mito2::read::scan_region::PredicateGroup;
|
||||
use mito2::region::options::MergeMode;
|
||||
use mito2::test_util::column_metadata_to_column_schema;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
@@ -127,12 +126,7 @@ fn create_memtable_with_rows(num_batches: usize) -> SimpleBulkMemtable {
|
||||
|
||||
async fn flush(mem: &SimpleBulkMemtable) {
|
||||
let MemtableRanges { ranges, .. } = mem
|
||||
.ranges(
|
||||
None,
|
||||
PredicateGroup::default(),
|
||||
None,
|
||||
RangesOptions::for_flush(),
|
||||
)
|
||||
.ranges(None, RangesOptions::for_flush())
|
||||
.unwrap();
|
||||
|
||||
let mut source = if ranges.len() == 1 {
|
||||
|
||||
@@ -51,7 +51,6 @@ use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
|
||||
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
|
||||
use crate::read::flat_merge::FlatMergeIterator;
|
||||
use crate::read::merge::MergeReaderBuilder;
|
||||
use crate::read::scan_region::PredicateGroup;
|
||||
use crate::read::{FlatSource, Source};
|
||||
use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
|
||||
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
|
||||
@@ -461,12 +460,7 @@ impl RegionFlushTask {
|
||||
flush_metrics.compact_memtable += compact_cost;
|
||||
|
||||
// Sets `for_flush` flag to true.
|
||||
let mem_ranges = mem.ranges(
|
||||
None,
|
||||
PredicateGroup::default(),
|
||||
None,
|
||||
RangesOptions::for_flush(),
|
||||
)?;
|
||||
let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
|
||||
let num_mem_ranges = mem_ranges.ranges.len();
|
||||
let num_mem_rows = mem_ranges.stats.num_rows();
|
||||
let memtable_id = mem.id();
|
||||
|
||||
@@ -75,12 +75,16 @@ pub enum MemtableConfig {
|
||||
}
|
||||
|
||||
/// Options for querying ranges from a memtable.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Clone)]
|
||||
pub struct RangesOptions {
|
||||
/// Whether the ranges are being queried for flush.
|
||||
pub for_flush: bool,
|
||||
/// Mode to pre-filter columns in ranges.
|
||||
pub pre_filter_mode: PreFilterMode,
|
||||
/// Predicate to filter the data.
|
||||
pub predicate: PredicateGroup,
|
||||
/// Sequence range to filter the data.
|
||||
pub sequence: Option<SequenceRange>,
|
||||
}
|
||||
|
||||
impl Default for RangesOptions {
|
||||
@@ -88,6 +92,8 @@ impl Default for RangesOptions {
|
||||
Self {
|
||||
for_flush: false,
|
||||
pre_filter_mode: PreFilterMode::All,
|
||||
predicate: PredicateGroup::default(),
|
||||
sequence: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -98,6 +104,8 @@ impl RangesOptions {
|
||||
Self {
|
||||
for_flush: true,
|
||||
pre_filter_mode: PreFilterMode::All,
|
||||
predicate: PredicateGroup::default(),
|
||||
sequence: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,6 +115,20 @@ impl RangesOptions {
|
||||
self.pre_filter_mode = pre_filter_mode;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the predicate.
|
||||
#[must_use]
|
||||
pub fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
|
||||
self.predicate = predicate;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the sequence range.
|
||||
#[must_use]
|
||||
pub fn with_sequence(mut self, sequence: Option<SequenceRange>) -> Self {
|
||||
self.sequence = sequence;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
@@ -231,8 +253,6 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: PredicateGroup,
|
||||
sequence: Option<SequenceRange>,
|
||||
options: RangesOptions,
|
||||
) -> Result<MemtableRanges>;
|
||||
|
||||
|
||||
@@ -42,8 +42,7 @@ use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
|
||||
IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
|
||||
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
|
||||
RangesOptions,
|
||||
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, RangesOptions,
|
||||
};
|
||||
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
|
||||
use crate::read::flat_merge::FlatMergeIterator;
|
||||
@@ -331,10 +330,10 @@ impl Memtable for BulkMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: PredicateGroup,
|
||||
sequence: Option<SequenceRange>,
|
||||
options: RangesOptions,
|
||||
) -> Result<MemtableRanges> {
|
||||
let predicate = options.predicate;
|
||||
let sequence = options.sequence;
|
||||
let mut ranges = BTreeMap::new();
|
||||
let mut range_id = 0;
|
||||
|
||||
@@ -1191,7 +1190,10 @@ mod tests {
|
||||
|
||||
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
|
||||
let ranges = memtable
|
||||
.ranges(None, predicate_group, None, RangesOptions::default())
|
||||
.ranges(
|
||||
None,
|
||||
RangesOptions::default().with_predicate(predicate_group),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(3, ranges.ranges.len());
|
||||
@@ -1236,9 +1238,7 @@ mod tests {
|
||||
let ranges = memtable
|
||||
.ranges(
|
||||
Some(&projection),
|
||||
predicate_group,
|
||||
None,
|
||||
RangesOptions::default(),
|
||||
RangesOptions::default().with_predicate(predicate_group),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -1356,7 +1356,10 @@ mod tests {
|
||||
|
||||
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
|
||||
let ranges = memtable
|
||||
.ranges(None, predicate_group, None, RangesOptions::default())
|
||||
.ranges(
|
||||
None,
|
||||
RangesOptions::default().with_predicate(predicate_group),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(3, ranges.ranges.len());
|
||||
@@ -1392,9 +1395,9 @@ mod tests {
|
||||
let ranges = memtable
|
||||
.ranges(
|
||||
None,
|
||||
predicate_group,
|
||||
sequence_filter,
|
||||
RangesOptions::default(),
|
||||
RangesOptions::default()
|
||||
.with_predicate(predicate_group)
|
||||
.with_sequence(sequence_filter),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -1428,7 +1431,10 @@ mod tests {
|
||||
|
||||
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
|
||||
let ranges = memtable
|
||||
.ranges(None, predicate_group, None, RangesOptions::default())
|
||||
.ranges(
|
||||
None,
|
||||
RangesOptions::default().with_predicate(predicate_group),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Should have ranges for both bulk parts and encoded parts
|
||||
|
||||
@@ -44,7 +44,7 @@ use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
|
||||
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
|
||||
MemtableStats, PredicateGroup, RangesOptions,
|
||||
MemtableStats, RangesOptions,
|
||||
};
|
||||
use crate::region::options::MergeMode;
|
||||
|
||||
@@ -190,10 +190,10 @@ impl Memtable for PartitionTreeMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: PredicateGroup,
|
||||
sequence: Option<SequenceRange>,
|
||||
_options: RangesOptions,
|
||||
options: RangesOptions,
|
||||
) -> Result<MemtableRanges> {
|
||||
let predicate = options.predicate;
|
||||
let sequence = options.sequence;
|
||||
let projection = projection.map(|ids| ids.to_vec());
|
||||
let builder = Box::new(PartitionTreeIterBuilder {
|
||||
tree: self.tree.clone(),
|
||||
|
||||
@@ -40,7 +40,6 @@ use crate::memtable::{
|
||||
use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
|
||||
use crate::read::Batch;
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
use crate::read::scan_region::PredicateGroup;
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::{error, metrics};
|
||||
|
||||
@@ -238,10 +237,10 @@ impl Memtable for SimpleBulkMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: PredicateGroup,
|
||||
sequence: Option<SequenceRange>,
|
||||
_options: RangesOptions,
|
||||
options: RangesOptions,
|
||||
) -> error::Result<MemtableRanges> {
|
||||
let predicate = options.predicate;
|
||||
let sequence = options.sequence;
|
||||
let start_time = Instant::now();
|
||||
let projection = Arc::new(self.build_projection(projection));
|
||||
let values = self.series.read().unwrap().read_to_values();
|
||||
@@ -618,12 +617,7 @@ mod tests {
|
||||
memtable.write_one(kv).unwrap();
|
||||
|
||||
let ranges = memtable
|
||||
.ranges(
|
||||
None,
|
||||
PredicateGroup::default(),
|
||||
None,
|
||||
RangesOptions::default(),
|
||||
)
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap();
|
||||
let mut source = vec![];
|
||||
for r in ranges.ranges.values() {
|
||||
@@ -657,12 +651,7 @@ mod tests {
|
||||
memtable.freeze().unwrap();
|
||||
|
||||
let ranges = memtable
|
||||
.ranges(
|
||||
None,
|
||||
PredicateGroup::default(),
|
||||
None,
|
||||
RangesOptions::default(),
|
||||
)
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap();
|
||||
let mut source = vec![];
|
||||
for r in ranges.ranges.values() {
|
||||
@@ -705,12 +694,7 @@ mod tests {
|
||||
memtable.freeze().unwrap();
|
||||
|
||||
let ranges = memtable
|
||||
.ranges(
|
||||
None,
|
||||
PredicateGroup::default(),
|
||||
None,
|
||||
RangesOptions::default(),
|
||||
)
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap();
|
||||
assert_eq!(ranges.ranges.len(), 1);
|
||||
let range = ranges.ranges.into_values().next().unwrap();
|
||||
@@ -926,12 +910,7 @@ mod tests {
|
||||
})
|
||||
.unwrap();
|
||||
let MemtableRanges { ranges, .. } = memtable
|
||||
.ranges(
|
||||
None,
|
||||
PredicateGroup::default(),
|
||||
None,
|
||||
RangesOptions::default(),
|
||||
)
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap();
|
||||
let mut source = if ranges.len() == 1 {
|
||||
let only_range = ranges.into_values().next().unwrap();
|
||||
|
||||
@@ -53,7 +53,7 @@ use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, MemScanMetrics, Memtable,
|
||||
MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef,
|
||||
MemtableStats, PredicateGroup, RangesOptions,
|
||||
MemtableStats, RangesOptions,
|
||||
};
|
||||
use crate::metrics::{
|
||||
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
|
||||
@@ -303,10 +303,10 @@ impl Memtable for TimeSeriesMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: PredicateGroup,
|
||||
sequence: Option<SequenceRange>,
|
||||
_options: RangesOptions,
|
||||
options: RangesOptions,
|
||||
) -> Result<MemtableRanges> {
|
||||
let predicate = options.predicate;
|
||||
let sequence = options.sequence;
|
||||
let projection = if let Some(projection) = projection {
|
||||
projection.iter().copied().collect()
|
||||
} else {
|
||||
|
||||
@@ -444,12 +444,13 @@ impl ScanRegion {
|
||||
}
|
||||
let ranges_in_memtable = m.ranges(
|
||||
Some(mapper.column_ids()),
|
||||
predicate.clone(),
|
||||
SequenceRange::new(
|
||||
self.request.memtable_min_sequence,
|
||||
self.request.memtable_max_sequence,
|
||||
),
|
||||
RangesOptions::default().with_pre_filter_mode(filter_mode),
|
||||
RangesOptions::default()
|
||||
.with_predicate(predicate.clone())
|
||||
.with_sequence(SequenceRange::new(
|
||||
self.request.memtable_min_sequence,
|
||||
self.request.memtable_max_sequence,
|
||||
))
|
||||
.with_pre_filter_mode(filter_mode),
|
||||
)?;
|
||||
mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
|
||||
// todo: we should add stats to MemtableRange
|
||||
|
||||
@@ -97,8 +97,6 @@ impl Memtable for EmptyMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: PredicateGroup,
|
||||
_sequence: Option<SequenceRange>,
|
||||
_options: RangesOptions,
|
||||
) -> Result<MemtableRanges> {
|
||||
Ok(MemtableRanges::default())
|
||||
|
||||
Reference in New Issue
Block a user