From ac65ede033408c6c281b53031e9aa01da2ad35eb Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 16 Oct 2025 13:30:56 +0800 Subject: [PATCH] feat: memtable seq range read (#6950) * feat: seq range memtable read Signed-off-by: discord9 * test: from&range Signed-off-by: discord9 * wt Signed-off-by: discord9 * after rebase fix Signed-off-by: discord9 * refactor: per review Signed-off-by: discord9 * docs: better naming&emphaise Signed-off-by: discord9 * refactor: use filter method Signed-off-by: discord9 * tests: unwrap Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/mito2/src/memtable.rs | 6 +- src/mito2/src/memtable/bulk.rs | 12 +- src/mito2/src/memtable/bulk/part.rs | 4 +- src/mito2/src/memtable/bulk/part_reader.rs | 30 ++--- src/mito2/src/memtable/partition_tree.rs | 10 +- src/mito2/src/memtable/partition_tree/tree.rs | 6 +- .../src/memtable/simple_bulk_memtable.rs | 10 +- .../simple_bulk_memtable/test_only.rs | 6 +- src/mito2/src/memtable/time_series.rs | 14 +- src/mito2/src/read.rs | 124 ++++++++++++++++-- src/mito2/src/read/scan_region.rs | 9 +- src/mito2/src/test_util/memtable_util.rs | 6 +- src/query/src/dummy_catalog.rs | 4 +- src/store-api/src/storage.rs | 2 +- src/store-api/src/storage/requests.rs | 9 +- src/store-api/src/storage/types.rs | 57 ++++++++ 16 files changed, 237 insertions(+), 72 deletions(-) diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 2744f24890..b4461e8b06 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -28,7 +28,7 @@ use mito_codec::key_values::KeyValue; pub use mito_codec::key_values::KeyValues; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::{ColumnId, SequenceNumber, SequenceRange}; use crate::config::MitoConfig; use crate::error::{Result, UnsupportedOperationSnafu}; @@ -186,7 +186,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { &self, projection: Option<&[ColumnId]>, predicate: Option, - sequence: Option, + sequence: Option, ) -> Result; /// Returns the ranges in the memtable. @@ -197,7 +197,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { &self, projection: Option<&[ColumnId]>, predicate: PredicateGroup, - sequence: Option, + sequence: Option, for_flush: bool, ) -> Result; diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index c410a53943..d67e9f1424 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -30,7 +30,7 @@ use datatypes::arrow::datatypes::SchemaRef; use mito_codec::key_values::KeyValue; use rayon::prelude::*; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, FileId, RegionId, SequenceNumber}; +use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange}; use tokio::sync::Semaphore; use crate::error::{Result, UnsupportedOperationSnafu}; @@ -323,7 +323,7 @@ impl Memtable for BulkMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, - _sequence: Option, + _sequence: Option, ) -> Result { todo!() } @@ -332,7 +332,7 @@ impl Memtable for BulkMemtable { &self, projection: Option<&[ColumnId]>, predicate: PredicateGroup, - sequence: Option, + sequence: Option, for_flush: bool, ) -> Result { let mut ranges = BTreeMap::new(); @@ -602,7 +602,7 @@ impl BulkMemtable { struct BulkRangeIterBuilder { part: BulkPart, context: Arc, - sequence: Option, + sequence: Option, } impl IterBuilder for BulkRangeIterBuilder { @@ -641,7 +641,7 @@ struct EncodedBulkRangeIterBuilder { file_id: FileId, part: EncodedBulkPart, context: Arc, - sequence: Option, + sequence: Option, } impl IterBuilder for EncodedBulkRangeIterBuilder { @@ -1381,7 +1381,7 @@ mod tests { memtable.write_bulk(part).unwrap(); let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap(); - let sequence_filter = Some(400u64); // Filters out rows with sequence > 400 + let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400 let ranges = memtable .ranges(None, predicate_group, sequence_filter, false) .unwrap(); diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index d8b9d4b176..e30c0a2156 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -53,7 +53,7 @@ use snafu::{OptionExt, ResultExt, Snafu}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; -use store_api::storage::{FileId, SequenceNumber}; +use store_api::storage::{FileId, SequenceNumber, SequenceRange}; use table::predicate::Predicate; use crate::error::{ @@ -564,7 +564,7 @@ impl EncodedBulkPart { pub(crate) fn read( &self, context: BulkIterContextRef, - sequence: Option, + sequence: Option, ) -> Result> { // use predicate to find row groups to read. let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata); diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 49a3e6b1b7..b14ff05dfe 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -17,14 +17,14 @@ use std::ops::BitAnd; use std::sync::Arc; use bytes::Bytes; -use datatypes::arrow::array::{BooleanArray, Scalar, UInt64Array}; +use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::file::metadata::ParquetMetaData; use snafu::ResultExt; -use store_api::storage::SequenceNumber; +use store_api::storage::SequenceRange; use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu}; use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef}; @@ -39,7 +39,7 @@ pub struct EncodedBulkPartIter { current_reader: Option, builder: MemtableRowGroupReaderBuilder, /// Sequence number filter. - sequence: Option>, + sequence: Option, } impl EncodedBulkPartIter { @@ -49,12 +49,10 @@ impl EncodedBulkPartIter { mut row_groups_to_read: VecDeque, parquet_meta: Arc, data: Bytes, - sequence: Option, + sequence: Option, ) -> error::Result { assert!(context.read_format().as_flat().is_some()); - let sequence = sequence.map(UInt64Array::new_scalar); - let projection_mask = ProjectionMask::roots( parquet_meta.file_metadata().schema_descr(), context.read_format().projection_indices().iter().copied(), @@ -121,7 +119,7 @@ pub struct BulkPartRecordBatchIter { /// Iterator context for filtering context: BulkIterContextRef, /// Sequence number filter. - sequence: Option>, + sequence: Option, } impl BulkPartRecordBatchIter { @@ -129,12 +127,10 @@ impl BulkPartRecordBatchIter { pub fn new( record_batch: RecordBatch, context: BulkIterContextRef, - sequence: Option, + sequence: Option, ) -> Self { assert!(context.read_format().as_flat().is_some()); - let sequence = sequence.map(UInt64Array::new_scalar); - Self { record_batch: Some(record_batch), context, @@ -185,7 +181,7 @@ impl Iterator for BulkPartRecordBatchIter { /// Panics if the format is not flat. fn apply_combined_filters( context: &BulkIterContext, - sequence: &Option>, + sequence: &Option, record_batch: RecordBatch, ) -> error::Result> { // Converts the format to the flat format first. @@ -234,9 +230,9 @@ fn apply_combined_filters( if let Some(sequence) = sequence { let sequence_column = record_batch.column(sequence_column_index(record_batch.num_columns())); - let sequence_filter = - datatypes::arrow::compute::kernels::cmp::lt_eq(sequence_column, sequence) - .context(ComputeArrowSnafu)?; + let sequence_filter = sequence + .filter(&sequence_column) + .context(ComputeArrowSnafu)?; // Combine with existing filter using AND operation combined_filter = match combined_filter { None => Some(sequence_filter), @@ -385,7 +381,11 @@ mod tests { assert_eq!(6, result[0].num_columns(),); // Creates iter with sequence filter (only include sequences <= 2) - let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context, Some(2)); + let iter = BulkPartRecordBatchIter::new( + record_batch.clone(), + context, + Some(SequenceRange::LtEq { max: 2 }), + ); let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect(); assert_eq!(1, result.len()); let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef; diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index d20c51a137..90b51f7683 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -32,7 +32,7 @@ use mito_codec::key_values::KeyValue; use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec}; use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::{ColumnId, SequenceRange}; use table::predicate::Predicate; use crate::error::{Result, UnsupportedOperationSnafu}; @@ -181,7 +181,7 @@ impl Memtable for PartitionTreeMemtable { &self, projection: Option<&[ColumnId]>, predicate: Option, - sequence: Option, + sequence: Option, ) -> Result { self.tree.read(projection, predicate, sequence, None) } @@ -190,7 +190,7 @@ impl Memtable for PartitionTreeMemtable { &self, projection: Option<&[ColumnId]>, predicate: PredicateGroup, - sequence: Option, + sequence: Option, _for_flush: bool, ) -> Result { let projection = projection.map(|ids| ids.to_vec()); @@ -314,7 +314,7 @@ impl PartitionTreeMemtable { &self, projection: Option<&[ColumnId]>, predicate: Option, - sequence: Option, + sequence: Option, ) -> Result { self.tree.read(projection, predicate, sequence, None) } @@ -361,7 +361,7 @@ struct PartitionTreeIterBuilder { tree: Arc, projection: Option>, predicate: Option, - sequence: Option, + sequence: Option, } impl IterBuilder for PartitionTreeIterBuilder { diff --git a/src/mito2/src/memtable/partition_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs index 0b903a2616..8b10b1c5fc 100644 --- a/src/mito2/src/memtable/partition_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -30,7 +30,7 @@ use mito_codec::row_converter::{PrimaryKeyCodec, SortField}; use snafu::{ResultExt, ensure}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::{ColumnId, SequenceRange}; use table::predicate::Predicate; use crate::error::{ @@ -229,7 +229,7 @@ impl PartitionTree { &self, projection: Option<&[ColumnId]>, predicate: Option, - sequence: Option, + sequence: Option, mem_scan_metrics: Option, ) -> Result { let start = Instant::now(); @@ -465,7 +465,7 @@ struct TreeIterMetrics { struct TreeIter { /// Optional Sequence number of the current reader which limit results batch to lower than this sequence number. - sequence: Option, + sequence: Option, partitions: VecDeque, current_reader: Option, metrics: TreeIterMetrics, diff --git a/src/mito2/src/memtable/simple_bulk_memtable.rs b/src/mito2/src/memtable/simple_bulk_memtable.rs index 1270557ac6..7d58c08c60 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable.rs @@ -27,7 +27,7 @@ use mito_codec::key_values::KeyValue; use rayon::prelude::*; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::{ColumnId, SequenceRange}; use crate::flush::WriteBufferManagerRef; use crate::memtable::bulk::part::BulkPart; @@ -218,7 +218,7 @@ impl Memtable for SimpleBulkMemtable { &self, projection: Option<&[ColumnId]>, _predicate: Option, - sequence: Option, + sequence: Option, ) -> error::Result { let iter = self.create_iter(projection, sequence)?.build(None)?; @@ -234,7 +234,7 @@ impl Memtable for SimpleBulkMemtable { &self, projection: Option<&[ColumnId]>, predicate: PredicateGroup, - sequence: Option, + sequence: Option, _for_flush: bool, ) -> error::Result { let start_time = Instant::now(); @@ -833,7 +833,9 @@ mod tests { .unwrap(); // Filter with sequence 0 should only return first write - let mut iter = memtable.iter(None, None, Some(0)).unwrap(); + let mut iter = memtable + .iter(None, None, Some(SequenceRange::LtEq { max: 0 })) + .unwrap(); let batch = iter.next().unwrap().unwrap(); assert_eq!(1, batch.num_rows()); assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap()); diff --git a/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs b/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs index f71385d78c..2f2f9a278d 100644 --- a/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs +++ b/src/mito2/src/memtable/simple_bulk_memtable/test_only.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::time::Instant; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::{ColumnId, SequenceRange}; use crate::error; use crate::memtable::simple_bulk_memtable::{Iter, SimpleBulkMemtable}; @@ -33,7 +33,7 @@ impl SimpleBulkMemtable { pub(crate) fn create_iter( &self, projection: Option<&[ColumnId]>, - sequence: Option, + sequence: Option, ) -> error::Result { let mut series = self.series.write().unwrap(); @@ -59,7 +59,7 @@ pub(crate) struct BatchIterBuilderDeprecated { region_metadata: RegionMetadataRef, values: Option, projection: HashSet, - sequence: Option, + sequence: Option, dedup: bool, merge_mode: MergeMode, } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index e1c292269f..28736b1874 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -39,7 +39,7 @@ use mito_codec::key_values::KeyValue; use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::{ColumnId, SequenceRange}; use table::predicate::Predicate; use crate::error::{ @@ -272,7 +272,7 @@ impl Memtable for TimeSeriesMemtable { &self, projection: Option<&[ColumnId]>, filters: Option, - sequence: Option, + sequence: Option, ) -> Result { let projection = if let Some(projection) = projection { projection.iter().copied().collect() @@ -299,7 +299,7 @@ impl Memtable for TimeSeriesMemtable { &self, projection: Option<&[ColumnId]>, predicate: PredicateGroup, - sequence: Option, + sequence: Option, _for_flush: bool, ) -> Result { let projection = if let Some(projection) = projection { @@ -463,7 +463,7 @@ impl SeriesSet { projection: HashSet, predicate: Option, dedup: bool, - sequence: Option, + sequence: Option, mem_scan_metrics: Option, ) -> Result { let primary_key_schema = primary_key_schema(&self.region_metadata); @@ -531,7 +531,7 @@ struct Iter { pk_datatypes: Vec, codec: Arc, dedup: bool, - sequence: Option, + sequence: Option, metrics: Metrics, mem_scan_metrics: Option, } @@ -547,7 +547,7 @@ impl Iter { pk_datatypes: Vec, codec: Arc, dedup: bool, - sequence: Option, + sequence: Option, mem_scan_metrics: Option, ) -> Result { let predicate = predicate @@ -1239,7 +1239,7 @@ struct TimeSeriesIterBuilder { projection: HashSet, predicate: Option, dedup: bool, - sequence: Option, + sequence: Option, merge_mode: MergeMode, } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index a10b2ba021..06d615452a 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -41,7 +41,7 @@ use async_trait::async_trait; use common_time::Timestamp; use datafusion_common::arrow::array::UInt8Array; use datatypes::arrow; -use datatypes::arrow::array::{Array, ArrayRef, UInt64Array}; +use datatypes::arrow::array::{Array, ArrayRef}; use datatypes::arrow::compute::SortOptions; use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow::row::{RowConverter, SortField}; @@ -60,7 +60,7 @@ use futures::stream::BoxStream; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::RegionMetadata; -use store_api::storage::{ColumnId, SequenceNumber}; +use store_api::storage::{ColumnId, SequenceNumber, SequenceRange}; use crate::error::{ ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, @@ -361,17 +361,29 @@ impl Batch { } /// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`. - pub fn filter_by_sequence(&mut self, sequence: Option) -> Result<()> { - let seq = match (sequence, self.last_sequence()) { - (None, _) | (_, None) => return Ok(()), - (Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()), - (Some(sequence), Some(_)) => sequence, + pub fn filter_by_sequence(&mut self, sequence: Option) -> Result<()> { + let seq_range = match sequence { + None => return Ok(()), + Some(seq_range) => { + let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence()) + else { + return Ok(()); + }; + let is_subset = match seq_range { + SequenceRange::Gt { min } => min < first, + SequenceRange::LtEq { max } => max >= last, + SequenceRange::GtLtEq { min, max } => min < first && max >= last, + }; + if is_subset { + return Ok(()); + } + seq_range + } }; let seqs = self.sequences.as_arrow(); - let sequence = UInt64Array::new_scalar(seq); - let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence) - .context(ComputeArrowSnafu)?; + let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?; + let predicate = BooleanVector::from(predicate); self.filter(&predicate)?; @@ -1292,7 +1304,9 @@ mod tests { &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], &[21, 22, 23, 24], ); - batch.filter_by_sequence(Some(13)).unwrap(); + batch + .filter_by_sequence(Some(SequenceRange::LtEq { max: 13 })) + .unwrap(); let expect = new_batch( &[1, 2, 3], &[11, 12, 13], @@ -1309,7 +1323,9 @@ mod tests { &[21, 22, 23, 24], ); - batch.filter_by_sequence(Some(10)).unwrap(); + batch + .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 })) + .unwrap(); assert!(batch.is_empty()); // None filter. @@ -1325,13 +1341,95 @@ mod tests { // Filter a empty batch let mut batch = new_batch(&[], &[], &[], &[]); - batch.filter_by_sequence(Some(10)).unwrap(); + batch + .filter_by_sequence(Some(SequenceRange::LtEq { max: 10 })) + .unwrap(); assert!(batch.is_empty()); // Filter a empty batch with None let mut batch = new_batch(&[], &[], &[], &[]); batch.filter_by_sequence(None).unwrap(); assert!(batch.is_empty()); + + // Test From variant - exclusive lower bound + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + batch + .filter_by_sequence(Some(SequenceRange::Gt { min: 12 })) + .unwrap(); + let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]); + assert_eq!(expect, batch); + + // Test From variant with no matches + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Delete, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + batch + .filter_by_sequence(Some(SequenceRange::Gt { min: 20 })) + .unwrap(); + assert!(batch.is_empty()); + + // Test Range variant - exclusive lower bound, inclusive upper bound + let mut batch = new_batch( + &[1, 2, 3, 4, 5], + &[11, 12, 13, 14, 15], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[21, 22, 23, 24, 25], + ); + batch + .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 })) + .unwrap(); + let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]); + assert_eq!(expect, batch); + + // Test Range variant with mixed operations + let mut batch = new_batch( + &[1, 2, 3, 4, 5], + &[11, 12, 13, 14, 15], + &[ + OpType::Put, + OpType::Delete, + OpType::Put, + OpType::Delete, + OpType::Put, + ], + &[21, 22, 23, 24, 25], + ); + batch + .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 })) + .unwrap(); + let expect = new_batch( + &[2, 3], + &[12, 13], + &[OpType::Delete, OpType::Put], + &[22, 23], + ); + assert_eq!(expect, batch); + + // Test Range variant with no matches + let mut batch = new_batch( + &[1, 2, 3, 4], + &[11, 12, 13, 14], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 23, 24], + ); + batch + .filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 })) + .unwrap(); + assert!(batch.is_empty()); } #[test] diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index cd660dab9f..536c48e248 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -35,7 +35,9 @@ use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{PartitionRange, RegionScannerRef}; -use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; +use store_api::storage::{ + RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector, +}; use table::predicate::{Predicate, build_time_range_predicate}; use tokio::sync::{Semaphore, mpsc}; use tokio_stream::wrappers::ReceiverStream; @@ -438,7 +440,10 @@ impl ScanRegion { let ranges_in_memtable = m.ranges( Some(mapper.column_ids()), predicate.clone(), - self.request.sequence, + SequenceRange::new( + self.request.memtable_min_sequence, + self.request.memtable_max_sequence, + ), false, )?; mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| { diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 75efa0c6f5..2174ac7b9f 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -30,7 +30,7 @@ use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortFi use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; -use store_api::storage::{ColumnId, RegionId, SequenceNumber}; +use store_api::storage::{ColumnId, RegionId, SequenceNumber, SequenceRange}; use table::predicate::Predicate; use crate::error::Result; @@ -89,7 +89,7 @@ impl Memtable for EmptyMemtable { &self, _projection: Option<&[ColumnId]>, _filters: Option, - _sequence: Option, + _sequence: Option, ) -> Result { Ok(Box::new(std::iter::empty())) } @@ -98,7 +98,7 @@ impl Memtable for EmptyMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: PredicateGroup, - _sequence: Option, + _sequence: Option, _for_flush: bool, ) -> Result { Ok(MemtableRanges::default()) diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index d81767a904..798ae52549 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -256,7 +256,7 @@ impl DummyTableProvider { } pub fn with_sequence(&self, sequence: u64) { - self.scan_request.lock().unwrap().sequence = Some(sequence); + self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence); } /// Gets the scan request of the provider. @@ -287,7 +287,7 @@ impl DummyTableProviderFactory { let scan_request = query_ctx .as_ref() .map(|ctx| ScanRequest { - sequence: ctx.get_snapshot(region_id.as_u64()), + memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()), sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()), ..Default::default() }) diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index a8f872cd70..1df7a0aff6 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -28,4 +28,4 @@ pub use datatypes::schema::{ pub use self::descriptors::*; pub use self::file::{FileId, ParseIdError}; pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; -pub use self::types::SequenceNumber; +pub use self::types::{SequenceNumber, SequenceRange}; diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 513a98b148..5e9fae3215 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -55,9 +55,12 @@ pub struct ScanRequest { /// Optional hint to select rows from time-series. pub series_row_selector: Option, /// Optional constraint on the sequence number of the rows to read. - /// If set, only rows with a sequence number lesser or equal to this value + /// If set, only rows with a sequence number **lesser or equal** to this value /// will be returned. - pub sequence: Option, + pub memtable_max_sequence: Option, + /// Optional constraint on the minimal sequence number in the memtable. + /// If set, only the memtables that contain sequences **greater than** this value will be scanned + pub memtable_min_sequence: Option, /// Optional constraint on the minimal sequence number in the SST files. /// If set, only the SST files that contain sequences greater than this value will be scanned. pub sst_min_sequence: Option, @@ -121,7 +124,7 @@ impl Display for ScanRequest { series_row_selector )?; } - if let Some(sequence) = &self.sequence { + if let Some(sequence) = &self.memtable_max_sequence { write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?; } if let Some(sst_min_sequence) = &self.sst_min_sequence { diff --git a/src/store-api/src/storage/types.rs b/src/store-api/src/storage/types.rs index ff1162d401..dbe5a377af 100644 --- a/src/store-api/src/storage/types.rs +++ b/src/store-api/src/storage/types.rs @@ -14,6 +14,63 @@ //! Common types. +use datatypes::arrow::array::{BooleanArray, Datum, UInt64Array}; + /// Represents a sequence number of data in storage. The offset of logstore can be used /// as a sequence number. pub type SequenceNumber = u64; + +/// A range of sequence numbers. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SequenceRange { + Gt { + /// Exclusive lower bound + min: SequenceNumber, + }, + LtEq { + /// Inclusive upper bound + max: SequenceNumber, + }, + GtLtEq { + /// Exclusive lower bound + min: SequenceNumber, + /// Inclusive upper bound + max: SequenceNumber, + }, +} + +impl SequenceRange { + pub fn new(min: Option, max: Option) -> Option { + match (min, max) { + (Some(min), Some(max)) => Some(SequenceRange::GtLtEq { min, max }), + (Some(min), None) => Some(SequenceRange::Gt { min }), + (None, Some(max)) => Some(SequenceRange::LtEq { max }), + (None, None) => None, + } + } + + pub fn filter( + &self, + seqs: &dyn Datum, + ) -> Result { + match self { + SequenceRange::Gt { min } => { + let min = UInt64Array::new_scalar(*min); + let pred = datafusion_common::arrow::compute::kernels::cmp::gt(seqs, &min)?; + Ok(pred) + } + SequenceRange::LtEq { max } => { + let max = UInt64Array::new_scalar(*max); + let pred = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &max)?; + Ok(pred) + } + SequenceRange::GtLtEq { min, max } => { + let min = UInt64Array::new_scalar(*min); + let max = UInt64Array::new_scalar(*max); + let pred_min = datafusion_common::arrow::compute::kernels::cmp::gt(seqs, &min)?; + let pred_max = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &max)?; + datafusion_common::arrow::compute::kernels::boolean::and(&pred_min, &pred_max) + } + } + } +}