feat: memtable seq range read (#6950)

* feat: seq range memtable read

Signed-off-by: discord9 <discord9@163.com>

* test: from&range

Signed-off-by: discord9 <discord9@163.com>

* wt

Signed-off-by: discord9 <discord9@163.com>

* after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* docs: better naming&emphaise

Signed-off-by: discord9 <discord9@163.com>

* refactor: use filter method

Signed-off-by: discord9 <discord9@163.com>

* tests: unwrap

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-10-16 13:30:56 +08:00
committed by GitHub
parent 552c502620
commit ac65ede033
16 changed files with 237 additions and 72 deletions

View File

@@ -28,7 +28,7 @@ use mito_codec::key_values::KeyValue;
pub use mito_codec::key_values::KeyValues;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
use crate::config::MitoConfig;
use crate::error::{Result, UnsupportedOperationSnafu};
@@ -186,7 +186,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<table::predicate::Predicate>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> Result<BoxedBatchIterator>;
/// Returns the ranges in the memtable.
@@ -197,7 +197,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
for_flush: bool,
) -> Result<MemtableRanges>;

View File

@@ -30,7 +30,7 @@ use datatypes::arrow::datatypes::SchemaRef;
use mito_codec::key_values::KeyValue;
use rayon::prelude::*;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, FileId, RegionId, SequenceNumber};
use store_api::storage::{ColumnId, FileId, RegionId, SequenceRange};
use tokio::sync::Semaphore;
use crate::error::{Result, UnsupportedOperationSnafu};
@@ -323,7 +323,7 @@ impl Memtable for BulkMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<table::predicate::Predicate>,
_sequence: Option<SequenceNumber>,
_sequence: Option<SequenceRange>,
) -> Result<crate::memtable::BoxedBatchIterator> {
todo!()
}
@@ -332,7 +332,7 @@ impl Memtable for BulkMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
for_flush: bool,
) -> Result<MemtableRanges> {
let mut ranges = BTreeMap::new();
@@ -602,7 +602,7 @@ impl BulkMemtable {
struct BulkRangeIterBuilder {
part: BulkPart,
context: Arc<BulkIterContext>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
}
impl IterBuilder for BulkRangeIterBuilder {
@@ -641,7 +641,7 @@ struct EncodedBulkRangeIterBuilder {
file_id: FileId,
part: EncodedBulkPart,
context: Arc<BulkIterContext>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
}
impl IterBuilder for EncodedBulkRangeIterBuilder {
@@ -1381,7 +1381,7 @@ mod tests {
memtable.write_bulk(part).unwrap();
let predicate_group = PredicateGroup::new(&metadata, &[]).unwrap();
let sequence_filter = Some(400u64); // Filters out rows with sequence > 400
let sequence_filter = Some(SequenceRange::LtEq { max: 400 }); // Filters out rows with sequence > 400
let ranges = memtable
.ranges(None, predicate_group, sequence_filter, false)
.unwrap();

View File

@@ -53,7 +53,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use store_api::storage::{FileId, SequenceNumber};
use store_api::storage::{FileId, SequenceNumber, SequenceRange};
use table::predicate::Predicate;
use crate::error::{
@@ -564,7 +564,7 @@ impl EncodedBulkPart {
pub(crate) fn read(
&self,
context: BulkIterContextRef,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> Result<Option<BoxedRecordBatchIterator>> {
// use predicate to find row groups to read.
let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);

View File

@@ -17,14 +17,14 @@ use std::ops::BitAnd;
use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::array::{BooleanArray, Scalar, UInt64Array};
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use store_api::storage::SequenceNumber;
use store_api::storage::SequenceRange;
use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
@@ -39,7 +39,7 @@ pub struct EncodedBulkPartIter {
current_reader: Option<ParquetRecordBatchReader>,
builder: MemtableRowGroupReaderBuilder,
/// Sequence number filter.
sequence: Option<Scalar<UInt64Array>>,
sequence: Option<SequenceRange>,
}
impl EncodedBulkPartIter {
@@ -49,12 +49,10 @@ impl EncodedBulkPartIter {
mut row_groups_to_read: VecDeque<usize>,
parquet_meta: Arc<ParquetMetaData>,
data: Bytes,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> error::Result<Self> {
assert!(context.read_format().as_flat().is_some());
let sequence = sequence.map(UInt64Array::new_scalar);
let projection_mask = ProjectionMask::roots(
parquet_meta.file_metadata().schema_descr(),
context.read_format().projection_indices().iter().copied(),
@@ -121,7 +119,7 @@ pub struct BulkPartRecordBatchIter {
/// Iterator context for filtering
context: BulkIterContextRef,
/// Sequence number filter.
sequence: Option<Scalar<UInt64Array>>,
sequence: Option<SequenceRange>,
}
impl BulkPartRecordBatchIter {
@@ -129,12 +127,10 @@ impl BulkPartRecordBatchIter {
pub fn new(
record_batch: RecordBatch,
context: BulkIterContextRef,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> Self {
assert!(context.read_format().as_flat().is_some());
let sequence = sequence.map(UInt64Array::new_scalar);
Self {
record_batch: Some(record_batch),
context,
@@ -185,7 +181,7 @@ impl Iterator for BulkPartRecordBatchIter {
/// Panics if the format is not flat.
fn apply_combined_filters(
context: &BulkIterContext,
sequence: &Option<Scalar<UInt64Array>>,
sequence: &Option<SequenceRange>,
record_batch: RecordBatch,
) -> error::Result<Option<RecordBatch>> {
// Converts the format to the flat format first.
@@ -234,9 +230,9 @@ fn apply_combined_filters(
if let Some(sequence) = sequence {
let sequence_column =
record_batch.column(sequence_column_index(record_batch.num_columns()));
let sequence_filter =
datatypes::arrow::compute::kernels::cmp::lt_eq(sequence_column, sequence)
.context(ComputeArrowSnafu)?;
let sequence_filter = sequence
.filter(&sequence_column)
.context(ComputeArrowSnafu)?;
// Combine with existing filter using AND operation
combined_filter = match combined_filter {
None => Some(sequence_filter),
@@ -385,7 +381,11 @@ mod tests {
assert_eq!(6, result[0].num_columns(),);
// Creates iter with sequence filter (only include sequences <= 2)
let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context, Some(2));
let iter = BulkPartRecordBatchIter::new(
record_batch.clone(),
context,
Some(SequenceRange::LtEq { max: 2 }),
);
let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
assert_eq!(1, result.len());
let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;

View File

@@ -32,7 +32,7 @@ use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::storage::{ColumnId, SequenceRange};
use table::predicate::Predicate;
use crate::error::{Result, UnsupportedOperationSnafu};
@@ -181,7 +181,7 @@ impl Memtable for PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> Result<BoxedBatchIterator> {
self.tree.read(projection, predicate, sequence, None)
}
@@ -190,7 +190,7 @@ impl Memtable for PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
_for_flush: bool,
) -> Result<MemtableRanges> {
let projection = projection.map(|ids| ids.to_vec());
@@ -314,7 +314,7 @@ impl PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> Result<BoxedBatchIterator> {
self.tree.read(projection, predicate, sequence, None)
}
@@ -361,7 +361,7 @@ struct PartitionTreeIterBuilder {
tree: Arc<PartitionTree>,
projection: Option<Vec<ColumnId>>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
}
impl IterBuilder for PartitionTreeIterBuilder {

View File

@@ -30,7 +30,7 @@ use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
use snafu::{ResultExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::storage::{ColumnId, SequenceRange};
use table::predicate::Predicate;
use crate::error::{
@@ -229,7 +229,7 @@ impl PartitionTree {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
mem_scan_metrics: Option<crate::memtable::MemScanMetrics>,
) -> Result<BoxedBatchIterator> {
let start = Instant::now();
@@ -465,7 +465,7 @@ struct TreeIterMetrics {
struct TreeIter {
/// Optional Sequence number of the current reader which limit results batch to lower than this sequence number.
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
partitions: VecDeque<PartitionRef>,
current_reader: Option<PartitionReader>,
metrics: TreeIterMetrics,

View File

@@ -27,7 +27,7 @@ use mito_codec::key_values::KeyValue;
use rayon::prelude::*;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::storage::{ColumnId, SequenceRange};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::part::BulkPart;
@@ -218,7 +218,7 @@ impl Memtable for SimpleBulkMemtable {
&self,
projection: Option<&[ColumnId]>,
_predicate: Option<table::predicate::Predicate>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> error::Result<BoxedBatchIterator> {
let iter = self.create_iter(projection, sequence)?.build(None)?;
@@ -234,7 +234,7 @@ impl Memtable for SimpleBulkMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
_for_flush: bool,
) -> error::Result<MemtableRanges> {
let start_time = Instant::now();
@@ -833,7 +833,9 @@ mod tests {
.unwrap();
// Filter with sequence 0 should only return first write
let mut iter = memtable.iter(None, None, Some(0)).unwrap();
let mut iter = memtable
.iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
.unwrap();
let batch = iter.next().unwrap().unwrap();
assert_eq!(1, batch.num_rows());
assert_eq!(1.0, batch.fields()[0].data.get(0).as_f64_lossy().unwrap());

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use std::time::Instant;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::storage::{ColumnId, SequenceRange};
use crate::error;
use crate::memtable::simple_bulk_memtable::{Iter, SimpleBulkMemtable};
@@ -33,7 +33,7 @@ impl SimpleBulkMemtable {
pub(crate) fn create_iter(
&self,
projection: Option<&[ColumnId]>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> error::Result<BatchIterBuilderDeprecated> {
let mut series = self.series.write().unwrap();
@@ -59,7 +59,7 @@ pub(crate) struct BatchIterBuilderDeprecated {
region_metadata: RegionMetadataRef,
values: Option<Values>,
projection: HashSet<ColumnId>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
dedup: bool,
merge_mode: MergeMode,
}

View File

@@ -39,7 +39,7 @@ use mito_codec::key_values::KeyValue;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::storage::{ColumnId, SequenceRange};
use table::predicate::Predicate;
use crate::error::{
@@ -272,7 +272,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
filters: Option<Predicate>,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
) -> Result<BoxedBatchIterator> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
@@ -299,7 +299,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
_for_flush: bool,
) -> Result<MemtableRanges> {
let projection = if let Some(projection) = projection {
@@ -463,7 +463,7 @@ impl SeriesSet {
projection: HashSet<ColumnId>,
predicate: Option<Predicate>,
dedup: bool,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
mem_scan_metrics: Option<MemScanMetrics>,
) -> Result<Iter> {
let primary_key_schema = primary_key_schema(&self.region_metadata);
@@ -531,7 +531,7 @@ struct Iter {
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<DensePrimaryKeyCodec>,
dedup: bool,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
metrics: Metrics,
mem_scan_metrics: Option<MemScanMetrics>,
}
@@ -547,7 +547,7 @@ impl Iter {
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<DensePrimaryKeyCodec>,
dedup: bool,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
mem_scan_metrics: Option<MemScanMetrics>,
) -> Result<Self> {
let predicate = predicate
@@ -1239,7 +1239,7 @@ struct TimeSeriesIterBuilder {
projection: HashSet<ColumnId>,
predicate: Option<Predicate>,
dedup: bool,
sequence: Option<SequenceNumber>,
sequence: Option<SequenceRange>,
merge_mode: MergeMode,
}

View File

@@ -41,7 +41,7 @@ use async_trait::async_trait;
use common_time::Timestamp;
use datafusion_common::arrow::array::UInt8Array;
use datatypes::arrow;
use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
use datatypes::arrow::array::{Array, ArrayRef};
use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow::row::{RowConverter, SortField};
@@ -60,7 +60,7 @@ use futures::stream::BoxStream;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};
use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
@@ -361,17 +361,29 @@ impl Batch {
}
/// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
let seq = match (sequence, self.last_sequence()) {
(None, _) | (_, None) => return Ok(()),
(Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
(Some(sequence), Some(_)) => sequence,
pub fn filter_by_sequence(&mut self, sequence: Option<SequenceRange>) -> Result<()> {
let seq_range = match sequence {
None => return Ok(()),
Some(seq_range) => {
let (Some(first), Some(last)) = (self.first_sequence(), self.last_sequence())
else {
return Ok(());
};
let is_subset = match seq_range {
SequenceRange::Gt { min } => min < first,
SequenceRange::LtEq { max } => max >= last,
SequenceRange::GtLtEq { min, max } => min < first && max >= last,
};
if is_subset {
return Ok(());
}
seq_range
}
};
let seqs = self.sequences.as_arrow();
let sequence = UInt64Array::new_scalar(seq);
let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
.context(ComputeArrowSnafu)?;
let predicate = seq_range.filter(seqs).context(ComputeArrowSnafu)?;
let predicate = BooleanVector::from(predicate);
self.filter(&predicate)?;
@@ -1292,7 +1304,9 @@ mod tests {
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
batch.filter_by_sequence(Some(13)).unwrap();
batch
.filter_by_sequence(Some(SequenceRange::LtEq { max: 13 }))
.unwrap();
let expect = new_batch(
&[1, 2, 3],
&[11, 12, 13],
@@ -1309,7 +1323,9 @@ mod tests {
&[21, 22, 23, 24],
);
batch.filter_by_sequence(Some(10)).unwrap();
batch
.filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
.unwrap();
assert!(batch.is_empty());
// None filter.
@@ -1325,13 +1341,95 @@ mod tests {
// Filter a empty batch
let mut batch = new_batch(&[], &[], &[], &[]);
batch.filter_by_sequence(Some(10)).unwrap();
batch
.filter_by_sequence(Some(SequenceRange::LtEq { max: 10 }))
.unwrap();
assert!(batch.is_empty());
// Filter a empty batch with None
let mut batch = new_batch(&[], &[], &[], &[]);
batch.filter_by_sequence(None).unwrap();
assert!(batch.is_empty());
// Test From variant - exclusive lower bound
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
batch
.filter_by_sequence(Some(SequenceRange::Gt { min: 12 }))
.unwrap();
let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
assert_eq!(expect, batch);
// Test From variant with no matches
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
batch
.filter_by_sequence(Some(SequenceRange::Gt { min: 20 }))
.unwrap();
assert!(batch.is_empty());
// Test Range variant - exclusive lower bound, inclusive upper bound
let mut batch = new_batch(
&[1, 2, 3, 4, 5],
&[11, 12, 13, 14, 15],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[21, 22, 23, 24, 25],
);
batch
.filter_by_sequence(Some(SequenceRange::GtLtEq { min: 12, max: 14 }))
.unwrap();
let expect = new_batch(&[3, 4], &[13, 14], &[OpType::Put, OpType::Put], &[23, 24]);
assert_eq!(expect, batch);
// Test Range variant with mixed operations
let mut batch = new_batch(
&[1, 2, 3, 4, 5],
&[11, 12, 13, 14, 15],
&[
OpType::Put,
OpType::Delete,
OpType::Put,
OpType::Delete,
OpType::Put,
],
&[21, 22, 23, 24, 25],
);
batch
.filter_by_sequence(Some(SequenceRange::GtLtEq { min: 11, max: 13 }))
.unwrap();
let expect = new_batch(
&[2, 3],
&[12, 13],
&[OpType::Delete, OpType::Put],
&[22, 23],
);
assert_eq!(expect, batch);
// Test Range variant with no matches
let mut batch = new_batch(
&[1, 2, 3, 4],
&[11, 12, 13, 14],
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
&[21, 22, 23, 24],
);
batch
.filter_by_sequence(Some(SequenceRange::GtLtEq { min: 20, max: 25 }))
.unwrap();
assert!(batch.is_empty());
}
#[test]

View File

@@ -35,7 +35,9 @@ use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use store_api::storage::{
RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
};
use table::predicate::{Predicate, build_time_range_predicate};
use tokio::sync::{Semaphore, mpsc};
use tokio_stream::wrappers::ReceiverStream;
@@ -438,7 +440,10 @@ impl ScanRegion {
let ranges_in_memtable = m.ranges(
Some(mapper.column_ids()),
predicate.clone(),
self.request.sequence,
SequenceRange::new(
self.request.memtable_min_sequence,
self.request.memtable_max_sequence,
),
false,
)?;
mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {

View File

@@ -30,7 +30,7 @@ use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortFi
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
use store_api::storage::{ColumnId, RegionId, SequenceNumber, SequenceRange};
use table::predicate::Predicate;
use crate::error::Result;
@@ -89,7 +89,7 @@ impl Memtable for EmptyMemtable {
&self,
_projection: Option<&[ColumnId]>,
_filters: Option<Predicate>,
_sequence: Option<SequenceNumber>,
_sequence: Option<SequenceRange>,
) -> Result<BoxedBatchIterator> {
Ok(Box::new(std::iter::empty()))
}
@@ -98,7 +98,7 @@ impl Memtable for EmptyMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: PredicateGroup,
_sequence: Option<SequenceNumber>,
_sequence: Option<SequenceRange>,
_for_flush: bool,
) -> Result<MemtableRanges> {
Ok(MemtableRanges::default())

View File

@@ -256,7 +256,7 @@ impl DummyTableProvider {
}
pub fn with_sequence(&self, sequence: u64) {
self.scan_request.lock().unwrap().sequence = Some(sequence);
self.scan_request.lock().unwrap().memtable_max_sequence = Some(sequence);
}
/// Gets the scan request of the provider.
@@ -287,7 +287,7 @@ impl DummyTableProviderFactory {
let scan_request = query_ctx
.as_ref()
.map(|ctx| ScanRequest {
sequence: ctx.get_snapshot(region_id.as_u64()),
memtable_max_sequence: ctx.get_snapshot(region_id.as_u64()),
sst_min_sequence: ctx.sst_min_sequence(region_id.as_u64()),
..Default::default()
})

View File

@@ -28,4 +28,4 @@ pub use datatypes::schema::{
pub use self::descriptors::*;
pub use self::file::{FileId, ParseIdError};
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
pub use self::types::SequenceNumber;
pub use self::types::{SequenceNumber, SequenceRange};

View File

@@ -55,9 +55,12 @@ pub struct ScanRequest {
/// Optional hint to select rows from time-series.
pub series_row_selector: Option<TimeSeriesRowSelector>,
/// Optional constraint on the sequence number of the rows to read.
/// If set, only rows with a sequence number lesser or equal to this value
/// If set, only rows with a sequence number **lesser or equal** to this value
/// will be returned.
pub sequence: Option<SequenceNumber>,
pub memtable_max_sequence: Option<SequenceNumber>,
/// Optional constraint on the minimal sequence number in the memtable.
/// If set, only the memtables that contain sequences **greater than** this value will be scanned
pub memtable_min_sequence: Option<SequenceNumber>,
/// Optional constraint on the minimal sequence number in the SST files.
/// If set, only the SST files that contain sequences greater than this value will be scanned.
pub sst_min_sequence: Option<SequenceNumber>,
@@ -121,7 +124,7 @@ impl Display for ScanRequest {
series_row_selector
)?;
}
if let Some(sequence) = &self.sequence {
if let Some(sequence) = &self.memtable_max_sequence {
write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
}
if let Some(sst_min_sequence) = &self.sst_min_sequence {

View File

@@ -14,6 +14,63 @@
//! Common types.
use datatypes::arrow::array::{BooleanArray, Datum, UInt64Array};
/// Represents a sequence number of data in storage. The offset of logstore can be used
/// as a sequence number.
pub type SequenceNumber = u64;
/// A range of sequence numbers.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SequenceRange {
Gt {
/// Exclusive lower bound
min: SequenceNumber,
},
LtEq {
/// Inclusive upper bound
max: SequenceNumber,
},
GtLtEq {
/// Exclusive lower bound
min: SequenceNumber,
/// Inclusive upper bound
max: SequenceNumber,
},
}
impl SequenceRange {
pub fn new(min: Option<SequenceNumber>, max: Option<SequenceNumber>) -> Option<Self> {
match (min, max) {
(Some(min), Some(max)) => Some(SequenceRange::GtLtEq { min, max }),
(Some(min), None) => Some(SequenceRange::Gt { min }),
(None, Some(max)) => Some(SequenceRange::LtEq { max }),
(None, None) => None,
}
}
pub fn filter(
&self,
seqs: &dyn Datum,
) -> Result<BooleanArray, datatypes::arrow::error::ArrowError> {
match self {
SequenceRange::Gt { min } => {
let min = UInt64Array::new_scalar(*min);
let pred = datafusion_common::arrow::compute::kernels::cmp::gt(seqs, &min)?;
Ok(pred)
}
SequenceRange::LtEq { max } => {
let max = UInt64Array::new_scalar(*max);
let pred = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &max)?;
Ok(pred)
}
SequenceRange::GtLtEq { min, max } => {
let min = UInt64Array::new_scalar(*min);
let max = UInt64Array::new_scalar(*max);
let pred_min = datafusion_common::arrow::compute::kernels::cmp::gt(seqs, &min)?;
let pred_max = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &max)?;
datafusion_common::arrow::compute::kernels::boolean::and(&pred_min, &pred_max)
}
}
}
}