mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
feat: filter batch by sequence in memtable (#5367)
* feat: add seq field * feat: filter by sequence * chore: per review * docs: explain why not prune * chore: correct doc * test: test filter by seq
This commit is contained in:
@@ -411,6 +411,7 @@ impl MetadataRegion {
|
||||
output_ordering: None,
|
||||
limit: None,
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
};
|
||||
let record_batch_stream = self
|
||||
.mito
|
||||
@@ -469,6 +470,7 @@ impl MetadataRegion {
|
||||
output_ordering: None,
|
||||
limit: None,
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -630,6 +632,7 @@ mod test {
|
||||
output_ordering: None,
|
||||
limit: None,
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
};
|
||||
let actual_scan_request = MetadataRegion::build_read_request(key);
|
||||
assert_eq!(actual_scan_request, expected_scan_request);
|
||||
|
||||
@@ -85,7 +85,7 @@ fn full_scan(c: &mut Criterion) {
|
||||
}
|
||||
|
||||
b.iter(|| {
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
for batch in iter {
|
||||
let _batch = batch.unwrap();
|
||||
}
|
||||
@@ -98,7 +98,7 @@ fn full_scan(c: &mut Criterion) {
|
||||
}
|
||||
|
||||
b.iter(|| {
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
for batch in iter {
|
||||
let _batch = batch.unwrap();
|
||||
}
|
||||
@@ -124,7 +124,7 @@ fn filter_1_host(c: &mut Criterion) {
|
||||
let predicate = generator.random_host_filter();
|
||||
|
||||
b.iter(|| {
|
||||
let iter = memtable.iter(None, Some(predicate.clone())).unwrap();
|
||||
let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap();
|
||||
for batch in iter {
|
||||
let _batch = batch.unwrap();
|
||||
}
|
||||
@@ -138,7 +138,7 @@ fn filter_1_host(c: &mut Criterion) {
|
||||
let predicate = generator.random_host_filter();
|
||||
|
||||
b.iter(|| {
|
||||
let iter = memtable.iter(None, Some(predicate.clone())).unwrap();
|
||||
let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap();
|
||||
for batch in iter {
|
||||
let _batch = batch.unwrap();
|
||||
}
|
||||
|
||||
@@ -79,6 +79,7 @@ async fn test_scan_projection() {
|
||||
output_ordering: None,
|
||||
limit: None,
|
||||
series_row_selector: None,
|
||||
sequence: None,
|
||||
};
|
||||
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
|
||||
@@ -348,7 +348,7 @@ impl RegionFlushTask {
|
||||
|
||||
let max_sequence = mem.stats().max_sequence();
|
||||
let file_id = FileId::random();
|
||||
let iter = mem.iter(None, None)?;
|
||||
let iter = mem.iter(None, None, None)?;
|
||||
let source = Source::Iter(iter);
|
||||
|
||||
// Flush to level 0.
|
||||
|
||||
@@ -147,6 +147,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> Result<BoxedBatchIterator>;
|
||||
|
||||
/// Returns the ranges in the memtable.
|
||||
@@ -155,6 +156,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges;
|
||||
|
||||
/// Returns true if the memtable is empty.
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -63,6 +63,7 @@ impl Memtable for BulkMemtable {
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<Predicate>,
|
||||
_sequence: Option<SequenceNumber>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
todo!()
|
||||
}
|
||||
@@ -71,6 +72,7 @@ impl Memtable for BulkMemtable {
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<Predicate>,
|
||||
_sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges {
|
||||
todo!()
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ use parquet::file::metadata::ParquetMetaData;
|
||||
use parquet::file::properties::WriterProperties;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error;
|
||||
@@ -68,7 +68,11 @@ impl BulkPart {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
pub(crate) fn read(&self, context: BulkIterContextRef) -> Result<Option<BoxedBatchIterator>> {
|
||||
pub(crate) fn read(
|
||||
&self,
|
||||
context: BulkIterContextRef,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> Result<Option<BoxedBatchIterator>> {
|
||||
// use predicate to find row groups to read.
|
||||
let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
|
||||
|
||||
@@ -82,6 +86,7 @@ impl BulkPart {
|
||||
row_groups_to_read,
|
||||
self.metadata.parquet_metadata.clone(),
|
||||
self.data.clone(),
|
||||
sequence,
|
||||
)?;
|
||||
Ok(Some(Box::new(iter) as BoxedBatchIterator))
|
||||
}
|
||||
@@ -786,11 +791,14 @@ mod tests {
|
||||
let projection = &[4u32];
|
||||
|
||||
let mut reader = part
|
||||
.read(Arc::new(BulkIterContext::new(
|
||||
part.metadata.region_metadata.clone(),
|
||||
&Some(projection.as_slice()),
|
||||
.read(
|
||||
Arc::new(BulkIterContext::new(
|
||||
part.metadata.region_metadata.clone(),
|
||||
&Some(projection.as_slice()),
|
||||
None,
|
||||
)),
|
||||
None,
|
||||
)))
|
||||
)
|
||||
.unwrap()
|
||||
.expect("expect at least one row group");
|
||||
|
||||
@@ -837,7 +845,7 @@ mod tests {
|
||||
predicate,
|
||||
));
|
||||
let mut reader = part
|
||||
.read(context)
|
||||
.read(context, None)
|
||||
.unwrap()
|
||||
.expect("expect at least one row group");
|
||||
let mut total_rows_read = 0;
|
||||
@@ -866,7 +874,7 @@ mod tests {
|
||||
datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
|
||||
)])),
|
||||
));
|
||||
assert!(part.read(context).unwrap().is_none());
|
||||
assert!(part.read(context, None).unwrap().is_none());
|
||||
|
||||
check_prune_row_group(&part, None, 310);
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use bytes::Bytes;
|
||||
use parquet::arrow::ProjectionMask;
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::error;
|
||||
use crate::memtable::bulk::context::BulkIterContextRef;
|
||||
@@ -31,6 +32,7 @@ pub struct BulkPartIter {
|
||||
row_groups_to_read: VecDeque<usize>,
|
||||
current_reader: Option<PruneReader>,
|
||||
builder: MemtableRowGroupReaderBuilder,
|
||||
sequence: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
impl BulkPartIter {
|
||||
@@ -40,6 +42,7 @@ impl BulkPartIter {
|
||||
mut row_groups_to_read: VecDeque<usize>,
|
||||
parquet_meta: Arc<ParquetMetaData>,
|
||||
data: Bytes,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> error::Result<Self> {
|
||||
let projection_mask = ProjectionMask::roots(
|
||||
parquet_meta.file_metadata().schema_descr(),
|
||||
@@ -62,6 +65,7 @@ impl BulkPartIter {
|
||||
row_groups_to_read,
|
||||
current_reader: init_reader,
|
||||
builder,
|
||||
sequence,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -71,14 +75,16 @@ impl BulkPartIter {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if let Some(batch) = current.next_batch()? {
|
||||
if let Some(mut batch) = current.next_batch()? {
|
||||
batch.filter_by_sequence(self.sequence)?;
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
|
||||
// Previous row group exhausted, read next row group
|
||||
while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
|
||||
current.reset(self.builder.build_row_group_reader(next_row_group, None)?);
|
||||
if let Some(next_batch) = current.next_batch()? {
|
||||
if let Some(mut next_batch) = current.next_batch()? {
|
||||
next_batch.filter_by_sequence(self.sequence)?;
|
||||
return Ok(Some(next_batch));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ pub(crate) use primary_key_filter::DensePrimaryKeyFilter;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{Result, UnsupportedOperationSnafu};
|
||||
@@ -190,20 +190,23 @@ impl Memtable for PartitionTreeMemtable {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
self.tree.read(projection, predicate)
|
||||
self.tree.read(projection, predicate, sequence)
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges {
|
||||
let projection = projection.map(|ids| ids.to_vec());
|
||||
let builder = Box::new(PartitionTreeIterBuilder {
|
||||
tree: self.tree.clone(),
|
||||
projection,
|
||||
predicate,
|
||||
sequence,
|
||||
});
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
|
||||
|
||||
@@ -350,12 +353,16 @@ struct PartitionTreeIterBuilder {
|
||||
tree: Arc<PartitionTree>,
|
||||
projection: Option<Vec<ColumnId>>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
impl IterBuilder for PartitionTreeIterBuilder {
|
||||
fn build(&self) -> Result<BoxedBatchIterator> {
|
||||
self.tree
|
||||
.read(self.projection.as_deref(), self.predicate.clone())
|
||||
self.tree.read(
|
||||
self.projection.as_deref(),
|
||||
self.predicate.clone(),
|
||||
self.sequence,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -410,7 +417,7 @@ mod tests {
|
||||
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
assert_eq!(expected_ts, read);
|
||||
|
||||
@@ -464,11 +471,11 @@ mod tests {
|
||||
);
|
||||
memtable.write(&kvs).unwrap();
|
||||
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
|
||||
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let read = iter
|
||||
.flat_map(|batch| {
|
||||
batch
|
||||
@@ -509,7 +516,7 @@ mod tests {
|
||||
let expect = (0..100).collect::<Vec<_>>();
|
||||
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
|
||||
memtable.write(&kvs).unwrap();
|
||||
let iter = memtable.iter(Some(&[3]), None).unwrap();
|
||||
let iter = memtable.iter(Some(&[3]), None, None).unwrap();
|
||||
|
||||
let mut v0_all = vec![];
|
||||
for res in iter {
|
||||
@@ -581,7 +588,7 @@ mod tests {
|
||||
data.sort_unstable();
|
||||
|
||||
let expect = data.into_iter().map(|x| x.2).collect::<Vec<_>>();
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
assert_eq!(expect, read);
|
||||
}
|
||||
@@ -617,7 +624,7 @@ mod tests {
|
||||
right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
|
||||
});
|
||||
let iter = memtable
|
||||
.iter(None, Some(Predicate::new(vec![expr])))
|
||||
.iter(None, Some(Predicate::new(vec![expr])), None)
|
||||
.unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
assert_eq!(timestamps, read);
|
||||
@@ -784,7 +791,7 @@ mod tests {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut reader = new_memtable.iter(None, None).unwrap();
|
||||
let mut reader = new_memtable.iter(None, None, None).unwrap();
|
||||
let batch = reader.next().unwrap().unwrap();
|
||||
let pk = codec.decode(batch.primary_key()).unwrap();
|
||||
if let Value::String(s) = &pk[2] {
|
||||
|
||||
@@ -27,7 +27,7 @@ use memcomparable::Serializer;
|
||||
use serde::Serialize;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu};
|
||||
@@ -202,6 +202,7 @@ impl PartitionTree {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
let start = Instant::now();
|
||||
// Creates the projection set.
|
||||
@@ -225,6 +226,7 @@ impl PartitionTree {
|
||||
let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);
|
||||
|
||||
let mut iter = TreeIter {
|
||||
sequence,
|
||||
partitions,
|
||||
current_reader: None,
|
||||
metrics: tree_iter_metric,
|
||||
@@ -451,6 +453,8 @@ struct TreeIterMetrics {
|
||||
}
|
||||
|
||||
struct TreeIter {
|
||||
/// Optional Sequence number of the current reader which limit results batch to lower than this sequence number.
|
||||
sequence: Option<SequenceNumber>,
|
||||
partitions: VecDeque<PartitionRef>,
|
||||
current_reader: Option<PartitionReader>,
|
||||
metrics: TreeIterMetrics,
|
||||
@@ -519,6 +523,8 @@ impl TreeIter {
|
||||
if part_reader.is_valid() {
|
||||
self.metrics.rows_fetched += batch.num_rows();
|
||||
self.metrics.batches_fetched += 1;
|
||||
let mut batch = batch;
|
||||
batch.filter_by_sequence(self.sequence)?;
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
|
||||
@@ -529,6 +535,8 @@ impl TreeIter {
|
||||
|
||||
self.metrics.rows_fetched += batch.num_rows();
|
||||
self.metrics.batches_fetched += 1;
|
||||
let mut batch = batch;
|
||||
batch.filter_by_sequence(self.sequence)?;
|
||||
Ok(Some(batch))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -482,7 +482,7 @@ mod tests {
|
||||
partitions.list_memtables(&mut memtables);
|
||||
assert_eq!(0, memtables[0].id());
|
||||
|
||||
let iter = memtables[0].iter(None, None).unwrap();
|
||||
let iter = memtables[0].iter(None, None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
|
||||
}
|
||||
@@ -520,7 +520,7 @@ mod tests {
|
||||
|
||||
let mut memtables = Vec::new();
|
||||
partitions.list_memtables(&mut memtables);
|
||||
let iter = memtables[0].iter(None, None).unwrap();
|
||||
let iter = memtables[0].iter(None, None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
|
||||
let parts = partitions.list_partitions();
|
||||
@@ -572,7 +572,7 @@ mod tests {
|
||||
let partitions = new_multi_partitions(&metadata);
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
let iter = parts[0].memtable.iter(None, None).unwrap();
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(0, parts[0].memtable.id());
|
||||
assert_eq!(
|
||||
@@ -584,7 +584,7 @@ mod tests {
|
||||
parts[0].time_range.unwrap().max_timestamp
|
||||
);
|
||||
assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
|
||||
let iter = parts[1].memtable.iter(None, None).unwrap();
|
||||
let iter = parts[1].memtable.iter(None, None, None).unwrap();
|
||||
assert_eq!(1, parts[1].memtable.id());
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[5000, 7000], ×tamps[..]);
|
||||
|
||||
@@ -33,7 +33,7 @@ use datatypes::vectors::{
|
||||
};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{
|
||||
@@ -236,6 +236,7 @@ impl Memtable for TimeSeriesMemtable {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
filters: Option<Predicate>,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
let projection = if let Some(projection) = projection {
|
||||
projection.iter().copied().collect()
|
||||
@@ -248,7 +249,7 @@ impl Memtable for TimeSeriesMemtable {
|
||||
|
||||
let iter = self
|
||||
.series_set
|
||||
.iter_series(projection, filters, self.dedup)?;
|
||||
.iter_series(projection, filters, self.dedup, sequence)?;
|
||||
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
@@ -262,6 +263,7 @@ impl Memtable for TimeSeriesMemtable {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges {
|
||||
let projection = if let Some(projection) = projection {
|
||||
projection.iter().copied().collect()
|
||||
@@ -277,6 +279,7 @@ impl Memtable for TimeSeriesMemtable {
|
||||
predicate,
|
||||
dedup: self.dedup,
|
||||
merge_mode: self.merge_mode,
|
||||
sequence,
|
||||
});
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
|
||||
|
||||
@@ -384,6 +387,7 @@ impl SeriesSet {
|
||||
projection: HashSet<ColumnId>,
|
||||
predicate: Option<Predicate>,
|
||||
dedup: bool,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> Result<Iter> {
|
||||
let primary_key_schema = primary_key_schema(&self.region_metadata);
|
||||
let primary_key_datatypes = self
|
||||
@@ -401,6 +405,7 @@ impl SeriesSet {
|
||||
primary_key_datatypes,
|
||||
self.codec.clone(),
|
||||
dedup,
|
||||
sequence,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -448,6 +453,7 @@ struct Iter {
|
||||
pk_datatypes: Vec<ConcreteDataType>,
|
||||
codec: Arc<DensePrimaryKeyCodec>,
|
||||
dedup: bool,
|
||||
sequence: Option<SequenceNumber>,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
@@ -462,6 +468,7 @@ impl Iter {
|
||||
pk_datatypes: Vec<ConcreteDataType>,
|
||||
codec: Arc<DensePrimaryKeyCodec>,
|
||||
dedup: bool,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> Result<Self> {
|
||||
let predicate = predicate
|
||||
.map(|predicate| {
|
||||
@@ -482,6 +489,7 @@ impl Iter {
|
||||
pk_datatypes,
|
||||
codec,
|
||||
dedup,
|
||||
sequence,
|
||||
metrics: Metrics::default(),
|
||||
})
|
||||
}
|
||||
@@ -546,6 +554,12 @@ impl Iterator for Iter {
|
||||
self.metrics.num_batches += 1;
|
||||
self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
|
||||
let mut batch = batch;
|
||||
batch = batch.and_then(|mut batch| {
|
||||
batch.filter_by_sequence(self.sequence)?;
|
||||
Ok(batch)
|
||||
});
|
||||
return Some(batch);
|
||||
}
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
@@ -855,6 +869,7 @@ struct TimeSeriesIterBuilder {
|
||||
projection: HashSet<ColumnId>,
|
||||
predicate: Option<Predicate>,
|
||||
dedup: bool,
|
||||
sequence: Option<SequenceNumber>,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
@@ -864,6 +879,7 @@ impl IterBuilder for TimeSeriesIterBuilder {
|
||||
self.projection.clone(),
|
||||
self.predicate.clone(),
|
||||
self.dedup,
|
||||
self.sequence,
|
||||
)?;
|
||||
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
@@ -1253,7 +1269,7 @@ mod tests {
|
||||
*expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
|
||||
}
|
||||
|
||||
let iter = memtable.iter(None, None).unwrap();
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut read = HashMap::new();
|
||||
|
||||
for ts in iter
|
||||
@@ -1293,7 +1309,7 @@ mod tests {
|
||||
let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
|
||||
memtable.write(&kvs).unwrap();
|
||||
|
||||
let iter = memtable.iter(Some(&[3]), None).unwrap();
|
||||
let iter = memtable.iter(Some(&[3]), None, None).unwrap();
|
||||
|
||||
let mut v0_all = vec![];
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ use async_trait::async_trait;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::arrow::array::UInt8Array;
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{Array, ArrayRef};
|
||||
use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
|
||||
use datatypes::arrow::compute::SortOptions;
|
||||
use datatypes::arrow::row::{RowConverter, SortField};
|
||||
use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
|
||||
@@ -334,6 +334,24 @@ impl Batch {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`.
|
||||
pub fn filter_by_sequence(&mut self, sequence: Option<SequenceNumber>) -> Result<()> {
|
||||
let seq = match (sequence, self.last_sequence()) {
|
||||
(None, _) | (_, None) => return Ok(()),
|
||||
(Some(sequence), Some(last_sequence)) if sequence >= last_sequence => return Ok(()),
|
||||
(Some(sequence), Some(_)) => sequence,
|
||||
};
|
||||
|
||||
let seqs = self.sequences.as_arrow();
|
||||
let sequence = UInt64Array::new_scalar(seq);
|
||||
let predicate = datafusion_common::arrow::compute::kernels::cmp::lt_eq(seqs, &sequence)
|
||||
.context(ComputeArrowSnafu)?;
|
||||
let predicate = BooleanVector::from(predicate);
|
||||
self.filter(&predicate)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sorts rows in the batch. If `dedup` is true, it also removes
|
||||
/// duplicated rows according to primary keys.
|
||||
///
|
||||
@@ -1212,6 +1230,57 @@ mod tests {
|
||||
assert_eq!(expect, batch);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter_by_sequence() {
|
||||
// Filters put only.
|
||||
let mut batch = new_batch(
|
||||
&[1, 2, 3, 4],
|
||||
&[11, 12, 13, 14],
|
||||
&[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
|
||||
&[21, 22, 23, 24],
|
||||
);
|
||||
batch.filter_by_sequence(Some(13)).unwrap();
|
||||
let expect = new_batch(
|
||||
&[1, 2, 3],
|
||||
&[11, 12, 13],
|
||||
&[OpType::Put, OpType::Put, OpType::Put],
|
||||
&[21, 22, 23],
|
||||
);
|
||||
assert_eq!(expect, batch);
|
||||
|
||||
// Filters to empty.
|
||||
let mut batch = new_batch(
|
||||
&[1, 2, 3, 4],
|
||||
&[11, 12, 13, 14],
|
||||
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
|
||||
&[21, 22, 23, 24],
|
||||
);
|
||||
|
||||
batch.filter_by_sequence(Some(10)).unwrap();
|
||||
assert!(batch.is_empty());
|
||||
|
||||
// None filter.
|
||||
let mut batch = new_batch(
|
||||
&[1, 2, 3, 4],
|
||||
&[11, 12, 13, 14],
|
||||
&[OpType::Put, OpType::Delete, OpType::Put, OpType::Put],
|
||||
&[21, 22, 23, 24],
|
||||
);
|
||||
let expect = batch.clone();
|
||||
batch.filter_by_sequence(None).unwrap();
|
||||
assert_eq!(expect, batch);
|
||||
|
||||
// Filter a empty batch
|
||||
let mut batch = new_batch(&[], &[], &[], &[]);
|
||||
batch.filter_by_sequence(Some(10)).unwrap();
|
||||
assert!(batch.is_empty());
|
||||
|
||||
// Filter a empty batch with None
|
||||
let mut batch = new_batch(&[], &[], &[], &[]);
|
||||
batch.filter_by_sequence(None).unwrap();
|
||||
assert!(batch.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter() {
|
||||
// Filters put only.
|
||||
|
||||
@@ -300,6 +300,9 @@ impl ScanRegion {
|
||||
if file_in_range(file, &time_range) {
|
||||
files.push(file.clone());
|
||||
}
|
||||
// There is no need to check and prune for file's sequence here as the sequence number is usually very new,
|
||||
// unless the timing is too good, or the sequence number wouldn't be in file.
|
||||
// and the batch will be filtered out by tree reader anyway.
|
||||
}
|
||||
}
|
||||
|
||||
@@ -347,7 +350,11 @@ impl ScanRegion {
|
||||
let memtables = memtables
|
||||
.into_iter()
|
||||
.map(|mem| {
|
||||
let ranges = mem.ranges(Some(mapper.column_ids()), Some(predicate.clone()));
|
||||
let ranges = mem.ranges(
|
||||
Some(mapper.column_ids()),
|
||||
Some(predicate.clone()),
|
||||
self.request.sequence,
|
||||
);
|
||||
MemRangeBuilder::new(ranges)
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -84,6 +84,7 @@ impl Memtable for EmptyMemtable {
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_filters: Option<Predicate>,
|
||||
_sequence: Option<SequenceNumber>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
Ok(Box::new(std::iter::empty()))
|
||||
}
|
||||
@@ -92,6 +93,7 @@ impl Memtable for EmptyMemtable {
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<Predicate>,
|
||||
_sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges {
|
||||
MemtableRanges::default()
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ use common_recordbatch::OrderOption;
|
||||
use datafusion_expr::expr::Expr;
|
||||
use strum::Display;
|
||||
|
||||
use crate::storage::SequenceNumber;
|
||||
|
||||
/// A hint on how to select rows from a time-series.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
|
||||
pub enum TimeSeriesRowSelector {
|
||||
@@ -39,4 +41,8 @@ pub struct ScanRequest {
|
||||
pub limit: Option<usize>,
|
||||
/// Optional hint to select rows from time-series.
|
||||
pub series_row_selector: Option<TimeSeriesRowSelector>,
|
||||
/// Optional constraint on the sequence number of the rows to read.
|
||||
/// If set, only rows with a sequence number lesser or equal to this value
|
||||
/// will be returned.
|
||||
pub sequence: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user