mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
feat: support exact filter on time index column (#5671)
* feat: add predicate group * feat: pass predicate group * feat: memtable prune by time filters * test: test PruneTimeIterator with time filters * feat: push down returns exact for timestamp simple filters --------- Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -40,7 +40,6 @@ use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use table::predicate::Predicate;
|
||||
use task::MAX_PARALLEL_COMPACTION;
|
||||
use tokio::sync::mpsc::{self, Sender};
|
||||
|
||||
@@ -57,7 +56,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
use crate::read::scan_region::ScanInput;
|
||||
use crate::read::scan_region::{PredicateGroup, ScanInput};
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::read::BoxedBatchReader;
|
||||
use crate::region::options::MergeMode;
|
||||
@@ -657,7 +656,7 @@ impl CompactionSstReaderBuilder<'_> {
|
||||
fn time_range_to_predicate(
|
||||
range: TimestampRange,
|
||||
metadata: &RegionMetadataRef,
|
||||
) -> Result<Option<Predicate>> {
|
||||
) -> Result<PredicateGroup> {
|
||||
let ts_col = metadata.time_index_column();
|
||||
|
||||
// safety: time index column's type must be a valid timestamp type.
|
||||
@@ -687,10 +686,12 @@ fn time_range_to_predicate(
|
||||
.lt(ts_to_lit(*end, ts_col_unit)?)]
|
||||
}
|
||||
(None, None) => {
|
||||
return Ok(None);
|
||||
return Ok(PredicateGroup::default());
|
||||
}
|
||||
};
|
||||
Ok(Some(Predicate::new(exprs)))
|
||||
|
||||
let predicate = PredicateGroup::new(metadata, &exprs);
|
||||
Ok(predicate)
|
||||
}
|
||||
|
||||
fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result<Expr> {
|
||||
|
||||
@@ -35,6 +35,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
|
||||
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
|
||||
use crate::metrics::WRITE_BUFFER_BYTES;
|
||||
use crate::read::prune::PruneTimeIterator;
|
||||
use crate::read::scan_region::PredicateGroup;
|
||||
use crate::read::Batch;
|
||||
use crate::region::options::{MemtableOptions, MergeMode};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
@@ -155,7 +156,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
predicate: PredicateGroup,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges;
|
||||
|
||||
@@ -346,14 +347,20 @@ pub struct MemtableRangeContext {
|
||||
id: MemtableId,
|
||||
/// Iterator builder.
|
||||
builder: BoxedIterBuilder,
|
||||
/// All filters.
|
||||
predicate: PredicateGroup,
|
||||
}
|
||||
|
||||
pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
|
||||
|
||||
impl MemtableRangeContext {
|
||||
/// Creates a new [MemtableRangeContext].
|
||||
pub fn new(id: MemtableId, builder: BoxedIterBuilder) -> Self {
|
||||
Self { id, builder }
|
||||
pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
|
||||
Self {
|
||||
id,
|
||||
builder,
|
||||
predicate,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,10 +383,16 @@ impl MemtableRange {
|
||||
}
|
||||
|
||||
/// Builds an iterator to read the range.
|
||||
/// Filters the result by the specific time range.
|
||||
/// Filters the result by the specific time range, this ensures memtable won't return
|
||||
/// rows out of the time range when new rows are inserted.
|
||||
pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
|
||||
let iter = self.context.builder.build()?;
|
||||
Ok(Box::new(PruneTimeIterator::new(iter, time_range)))
|
||||
let time_filters = self.context.predicate.time_filters();
|
||||
Ok(Box::new(PruneTimeIterator::new(
|
||||
iter,
|
||||
time_range,
|
||||
time_filters,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,8 @@ use crate::error::Result;
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::{
|
||||
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats,
|
||||
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef,
|
||||
MemtableStats, PredicateGroup,
|
||||
};
|
||||
|
||||
#[allow(unused)]
|
||||
@@ -71,7 +72,7 @@ impl Memtable for BulkMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<Predicate>,
|
||||
_predicate: PredicateGroup,
|
||||
_sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges {
|
||||
todo!()
|
||||
|
||||
@@ -43,6 +43,7 @@ use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
|
||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
|
||||
PredicateGroup,
|
||||
};
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
|
||||
@@ -195,17 +196,17 @@ impl Memtable for PartitionTreeMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
predicate: PredicateGroup,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges {
|
||||
let projection = projection.map(|ids| ids.to_vec());
|
||||
let builder = Box::new(PartitionTreeIterBuilder {
|
||||
tree: self.tree.clone(),
|
||||
projection,
|
||||
predicate,
|
||||
predicate: predicate.predicate().cloned(),
|
||||
sequence,
|
||||
});
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
|
||||
|
||||
MemtableRanges {
|
||||
ranges: [(0, MemtableRange::new(context))].into(),
|
||||
|
||||
@@ -48,6 +48,7 @@ use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
|
||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
|
||||
PredicateGroup,
|
||||
};
|
||||
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
@@ -267,7 +268,7 @@ impl Memtable for TimeSeriesMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
predicate: PredicateGroup,
|
||||
sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges {
|
||||
let projection = if let Some(projection) = projection {
|
||||
@@ -281,12 +282,12 @@ impl Memtable for TimeSeriesMemtable {
|
||||
let builder = Box::new(TimeSeriesIterBuilder {
|
||||
series_set: self.series_set.clone(),
|
||||
projection,
|
||||
predicate,
|
||||
predicate: predicate.predicate().cloned(),
|
||||
dedup: self.dedup,
|
||||
merge_mode: self.merge_mode,
|
||||
sequence,
|
||||
});
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate));
|
||||
|
||||
MemtableRanges {
|
||||
ranges: [(0, MemtableRange::new(context))].into(),
|
||||
|
||||
@@ -12,11 +12,16 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::BooleanVectorBuilder;
|
||||
use std::ops::BitAnd;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::error::Result;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::array::BooleanArray;
|
||||
use datatypes::arrow::buffer::BooleanBuffer;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{FilterRecordBatchSnafu, Result};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||
use crate::read::{Batch, BatchReader};
|
||||
@@ -133,16 +138,26 @@ impl PruneReader {
|
||||
pub(crate) struct PruneTimeIterator {
|
||||
iter: BoxedBatchIterator,
|
||||
time_range: FileTimeRange,
|
||||
/// Precise time filters.
|
||||
time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
|
||||
}
|
||||
|
||||
impl PruneTimeIterator {
|
||||
/// Creates a new `PruneTimeIterator` with the given iterator and time range.
|
||||
pub(crate) fn new(iter: BoxedBatchIterator, time_range: FileTimeRange) -> Self {
|
||||
Self { iter, time_range }
|
||||
pub(crate) fn new(
|
||||
iter: BoxedBatchIterator,
|
||||
time_range: FileTimeRange,
|
||||
time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
iter,
|
||||
time_range,
|
||||
time_filters,
|
||||
}
|
||||
}
|
||||
|
||||
/// Prune batch by time range.
|
||||
fn prune(&self, mut batch: Batch) -> Result<Batch> {
|
||||
fn prune(&self, batch: Batch) -> Result<Batch> {
|
||||
if batch.is_empty() {
|
||||
return Ok(batch);
|
||||
}
|
||||
@@ -152,7 +167,7 @@ impl PruneTimeIterator {
|
||||
if self.time_range.0 <= batch.first_timestamp().unwrap()
|
||||
&& batch.last_timestamp().unwrap() <= self.time_range.1
|
||||
{
|
||||
return Ok(batch);
|
||||
return self.prune_by_time_filters(batch, Vec::new());
|
||||
}
|
||||
|
||||
// slow path, prune the batch by time range.
|
||||
@@ -164,19 +179,41 @@ impl PruneTimeIterator {
|
||||
.as_timestamp()
|
||||
.unwrap()
|
||||
.unit();
|
||||
let mut filter_builder = BooleanVectorBuilder::with_capacity(batch.timestamps().len());
|
||||
let mut mask = Vec::with_capacity(batch.timestamps().len());
|
||||
let timestamps = batch.timestamps_native().unwrap();
|
||||
for ts in timestamps {
|
||||
let ts = Timestamp::new(*ts, unit);
|
||||
if self.time_range.0 <= ts && ts <= self.time_range.1 {
|
||||
filter_builder.push(Some(true));
|
||||
mask.push(true);
|
||||
} else {
|
||||
filter_builder.push(Some(false));
|
||||
mask.push(false);
|
||||
}
|
||||
}
|
||||
let filter = filter_builder.finish();
|
||||
|
||||
batch.filter(&filter)?;
|
||||
self.prune_by_time_filters(batch, mask)
|
||||
}
|
||||
|
||||
/// Prunes the batch by time filters.
|
||||
/// Also applies existing mask to the batch if the mask is not empty.
|
||||
fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
|
||||
if let Some(filters) = &self.time_filters {
|
||||
let mut mask = BooleanBuffer::new_set(batch.num_rows());
|
||||
for filter in filters.iter() {
|
||||
let result = filter
|
||||
.evaluate_vector(batch.timestamps())
|
||||
.context(FilterRecordBatchSnafu)?;
|
||||
mask = mask.bitand(&result);
|
||||
}
|
||||
|
||||
if !existing_mask.is_empty() {
|
||||
mask = mask.bitand(&BooleanBuffer::from(existing_mask));
|
||||
}
|
||||
|
||||
batch.filter(&BooleanArray::from(mask).into())?;
|
||||
} else if !existing_mask.is_empty() {
|
||||
batch.filter(&BooleanArray::from(existing_mask).into())?;
|
||||
}
|
||||
|
||||
Ok(batch)
|
||||
}
|
||||
|
||||
@@ -204,6 +241,8 @@ impl Iterator for PruneTimeIterator {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::OpType;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{col, lit, Expr};
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::new_batch;
|
||||
@@ -218,6 +257,7 @@ mod tests {
|
||||
Timestamp::new_millisecond(0),
|
||||
Timestamp::new_millisecond(1000),
|
||||
),
|
||||
None,
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert!(actual.is_empty());
|
||||
@@ -256,6 +296,7 @@ mod tests {
|
||||
Timestamp::new_millisecond(10),
|
||||
Timestamp::new_millisecond(15),
|
||||
),
|
||||
None,
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert_eq!(
|
||||
@@ -279,6 +320,7 @@ mod tests {
|
||||
Timestamp::new_millisecond(11),
|
||||
Timestamp::new_millisecond(20),
|
||||
),
|
||||
None,
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert_eq!(
|
||||
@@ -309,6 +351,7 @@ mod tests {
|
||||
Timestamp::new_millisecond(10),
|
||||
Timestamp::new_millisecond(18),
|
||||
),
|
||||
None,
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert_eq!(
|
||||
@@ -338,4 +381,124 @@ mod tests {
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
|
||||
let filters = expr
|
||||
.iter()
|
||||
.map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
|
||||
.collect();
|
||||
Some(Arc::new(filters))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_time_iter_with_time_filters() {
|
||||
let input = [
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[10, 11],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[110, 111],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[15, 16],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[115, 116],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[17, 18],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[117, 118],
|
||||
),
|
||||
];
|
||||
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
// We won't use the column name.
|
||||
let time_filters = create_time_filters(&[
|
||||
col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
|
||||
col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
|
||||
]);
|
||||
let iter = PruneTimeIterator::new(
|
||||
Box::new(iter),
|
||||
(
|
||||
Timestamp::new_millisecond(10),
|
||||
Timestamp::new_millisecond(20),
|
||||
),
|
||||
time_filters,
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert_eq!(
|
||||
actual,
|
||||
[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[10, 11],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[110, 111],
|
||||
),
|
||||
new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_time_iter_in_range_with_time_filters() {
|
||||
let input = [
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[10, 11],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[110, 111],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[15, 16],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[115, 116],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[17, 18],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[117, 118],
|
||||
),
|
||||
];
|
||||
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
// We won't use the column name.
|
||||
let time_filters = create_time_filters(&[
|
||||
col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
|
||||
col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
|
||||
]);
|
||||
let iter = PruneTimeIterator::new(
|
||||
Box::new(iter),
|
||||
(
|
||||
Timestamp::new_millisecond(5),
|
||||
Timestamp::new_millisecond(18),
|
||||
),
|
||||
time_filters,
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert_eq!(
|
||||
actual,
|
||||
[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[10, 11],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[110, 111],
|
||||
),
|
||||
new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,12 +19,17 @@ use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::{debug, error, tracing, warn};
|
||||
use common_time::range::TimestampRange;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::utils::expr_to_columns;
|
||||
use datafusion_expr::Expr;
|
||||
use smallvec::SmallVec;
|
||||
use store_api::metadata::RegionMetadata;
|
||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
|
||||
use table::predicate::{build_time_range_predicate, Predicate};
|
||||
@@ -339,7 +344,7 @@ impl ScanRegion {
|
||||
let inverted_index_applier = self.build_invereted_index_applier();
|
||||
let bloom_filter_applier = self.build_bloom_filter_applier();
|
||||
let fulltext_index_applier = self.build_fulltext_index_applier();
|
||||
let predicate = Predicate::new(self.request.filters.clone());
|
||||
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
|
||||
// The mapper always computes projected column ids as the schema of SSTs may change.
|
||||
let mapper = match &self.request.projection {
|
||||
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
|
||||
@@ -351,7 +356,7 @@ impl ScanRegion {
|
||||
.map(|mem| {
|
||||
let ranges = mem.ranges(
|
||||
Some(mapper.column_ids()),
|
||||
Some(predicate.clone()),
|
||||
predicate.clone(),
|
||||
self.request.sequence,
|
||||
);
|
||||
MemRangeBuilder::new(ranges)
|
||||
@@ -360,7 +365,7 @@ impl ScanRegion {
|
||||
|
||||
let input = ScanInput::new(self.access_layer, mapper)
|
||||
.with_time_range(Some(time_range))
|
||||
.with_predicate(Some(predicate))
|
||||
.with_predicate(predicate)
|
||||
.with_memtables(memtables)
|
||||
.with_files(files)
|
||||
.with_cache(self.cache_strategy)
|
||||
@@ -527,7 +532,7 @@ pub(crate) struct ScanInput {
|
||||
/// Time range filter for time index.
|
||||
time_range: Option<TimestampRange>,
|
||||
/// Predicate to push down.
|
||||
pub(crate) predicate: Option<Predicate>,
|
||||
pub(crate) predicate: PredicateGroup,
|
||||
/// Memtable range builders for memtables in the time range..
|
||||
pub(crate) memtables: Vec<MemRangeBuilder>,
|
||||
/// Handles to SST files to scan.
|
||||
@@ -562,7 +567,7 @@ impl ScanInput {
|
||||
access_layer,
|
||||
mapper: Arc::new(mapper),
|
||||
time_range: None,
|
||||
predicate: None,
|
||||
predicate: PredicateGroup::default(),
|
||||
memtables: Vec::new(),
|
||||
files: Vec::new(),
|
||||
cache_strategy: CacheStrategy::Disabled,
|
||||
@@ -588,7 +593,7 @@ impl ScanInput {
|
||||
|
||||
/// Sets predicate to push down.
|
||||
#[must_use]
|
||||
pub(crate) fn with_predicate(mut self, predicate: Option<Predicate>) -> Self {
|
||||
pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
|
||||
self.predicate = predicate;
|
||||
self
|
||||
}
|
||||
@@ -741,7 +746,7 @@ impl ScanInput {
|
||||
let res = self
|
||||
.access_layer
|
||||
.read_sst(file.clone())
|
||||
.predicate(self.predicate.clone())
|
||||
.predicate(self.predicate.predicate().cloned())
|
||||
.projection(Some(self.mapper.column_ids().to_vec()))
|
||||
.cache(self.cache_strategy.clone())
|
||||
.inverted_index_applier(self.inverted_index_applier.clone())
|
||||
@@ -812,8 +817,9 @@ impl ScanInput {
|
||||
rows_in_files + rows_in_memtables
|
||||
}
|
||||
|
||||
pub(crate) fn predicate(&self) -> Option<Predicate> {
|
||||
self.predicate.clone()
|
||||
/// Returns table predicate of all exprs.
|
||||
pub(crate) fn predicate(&self) -> Option<&Predicate> {
|
||||
self.predicate.predicate()
|
||||
}
|
||||
|
||||
/// Returns number of memtables to scan.
|
||||
@@ -915,3 +921,73 @@ impl StreamContext {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Predicates to evaluate.
|
||||
/// It only keeps filters that [SimpleFilterEvaluator] supports.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct PredicateGroup {
|
||||
time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
|
||||
|
||||
/// Table predicate for all logical exprs to evaluate.
|
||||
/// Parquet reader uses it to prune row groups.
|
||||
predicate: Option<Predicate>,
|
||||
}
|
||||
|
||||
impl PredicateGroup {
|
||||
/// Creates a new `PredicateGroup` from exprs according to the metadata.
|
||||
pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
|
||||
let mut time_filters = Vec::with_capacity(exprs.len());
|
||||
// Columns in the expr.
|
||||
let mut columns = HashSet::new();
|
||||
for expr in exprs {
|
||||
columns.clear();
|
||||
let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
|
||||
continue;
|
||||
};
|
||||
time_filters.push(filter);
|
||||
}
|
||||
let time_filters = if time_filters.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Arc::new(time_filters))
|
||||
};
|
||||
let predicate = Predicate::new(exprs.to_vec());
|
||||
|
||||
Self {
|
||||
time_filters,
|
||||
predicate: Some(predicate),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns time filters.
|
||||
pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
|
||||
self.time_filters.clone()
|
||||
}
|
||||
|
||||
/// Returns predicate of all exprs.
|
||||
pub(crate) fn predicate(&self) -> Option<&Predicate> {
|
||||
self.predicate.as_ref()
|
||||
}
|
||||
|
||||
fn expr_to_filter(
|
||||
expr: &Expr,
|
||||
metadata: &RegionMetadata,
|
||||
columns: &mut HashSet<Column>,
|
||||
) -> Option<SimpleFilterEvaluator> {
|
||||
columns.clear();
|
||||
// `expr_to_columns` won't return error.
|
||||
// We still ignore these expressions for safety.
|
||||
expr_to_columns(expr, columns).ok()?;
|
||||
if columns.len() > 1 {
|
||||
// Simple filter doesn't support multiple columns.
|
||||
return None;
|
||||
}
|
||||
let column = columns.iter().next()?;
|
||||
let column_meta = metadata.column_by_name(&column.name)?;
|
||||
if column_meta.semantic_type == SemanticType::Timestamp {
|
||||
SimpleFilterEvaluator::try_new(expr)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::memtable::{
|
||||
BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
|
||||
MemtableRef, MemtableStats,
|
||||
};
|
||||
use crate::read::scan_region::PredicateGroup;
|
||||
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
|
||||
|
||||
/// Empty memtable for test.
|
||||
@@ -92,7 +93,7 @@ impl Memtable for EmptyMemtable {
|
||||
fn ranges(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<Predicate>,
|
||||
_predicate: PredicateGroup,
|
||||
_sequence: Option<SequenceNumber>,
|
||||
) -> MemtableRanges {
|
||||
MemtableRanges::default()
|
||||
|
||||
@@ -193,7 +193,11 @@ impl TableProvider for DummyTableProvider {
|
||||
if self
|
||||
.metadata
|
||||
.column_by_name(simple_filter.column_name())
|
||||
.and_then(|c| (c.semantic_type == SemanticType::Tag).then_some(()))
|
||||
.and_then(|c| {
|
||||
(c.semantic_type == SemanticType::Tag
|
||||
|| c.semantic_type == SemanticType::Timestamp)
|
||||
.then_some(())
|
||||
})
|
||||
.is_some()
|
||||
{
|
||||
TableProviderFilterPushDown::Exact
|
||||
|
||||
@@ -198,7 +198,6 @@ EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4;
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=4 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=4 REDACTED
|
||||
|_|_|_FilterExec: t@1 > 8 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
|
||||
@@ -24,11 +24,7 @@ tql analyze (1, 3, '1s') t1{ a = "a" };
|
||||
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [a@0 DESC NULLS LAST, b@1 DESC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: b@1 >= -299000 AND b@1 <= 303000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
| 1_| 0_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 3_|
|
||||
@@ -52,11 +48,7 @@ tql analyze (1, 3, '1s') t1{ a =~ ".*" };
|
||||
|_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [a@0 DESC NULLS LAST, b@1 DESC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: b@1 >= -299000 AND b@1 <= 303000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
| 1_| 0_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 6_|
|
||||
@@ -83,7 +75,7 @@ tql analyze (1, 3, '1s') t1{ a =~ "a.*" };
|
||||
| 1_| 0_|_SortPreservingMergeExec: [a@0 DESC NULLS LAST, b@1 DESC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: a@0 ~ a.* AND b@1 >= -299000 AND b@1 <= 303000 REDACTED
|
||||
|_|_|_FilterExec: a@0 ~ a.* REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|
||||
@@ -26,11 +26,7 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST, j@1 DESC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
@@ -56,11 +52,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|
||||
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST, j@1 DESC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
@@ -85,11 +77,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|
||||
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST, j@1 DESC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
@@ -116,11 +104,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|
||||
|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST, j@1 DESC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
|
||||
Reference in New Issue
Block a user