From 9860bca986c3639468d261468802930e8dec79d2 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sat, 8 Mar 2025 05:55:46 +0800 Subject: [PATCH] feat: support exact filter on time index column (#5671) * feat: add predicate group * feat: pass predicate group * feat: memtable prune by time filters * test: test PruneTimeIterator with time filters * feat: push down returns exact for timestamp simple filters --------- Co-authored-by: Ruihang Xia --- src/mito2/src/compaction.rs | 11 +- src/mito2/src/memtable.rs | 23 ++- src/mito2/src/memtable/bulk.rs | 5 +- src/mito2/src/memtable/partition_tree.rs | 7 +- src/mito2/src/memtable/time_series.rs | 7 +- src/mito2/src/read/prune.rs | 189 ++++++++++++++++-- src/mito2/src/read/scan_region.rs | 94 ++++++++- src/mito2/src/test_util/memtable_util.rs | 3 +- src/query/src/dummy_catalog.rs | 6 +- .../common/order/windowed_sort.result | 1 - .../common/select/tql_filter.result | 14 +- .../common/tql-explain-analyze/analyze.result | 24 +-- 12 files changed, 310 insertions(+), 74 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 057d9ca720..8b1c319161 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -40,7 +40,6 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{RegionId, TableId}; -use table::predicate::Predicate; use task::MAX_PARALLEL_COMPACTION; use tokio::sync::mpsc::{self, Sender}; @@ -57,7 +56,7 @@ use crate::error::{ }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; use crate::read::projection::ProjectionMapper; -use crate::read::scan_region::ScanInput; +use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::read::seq_scan::SeqScan; use crate::read::BoxedBatchReader; use crate::region::options::MergeMode; @@ -657,7 +656,7 @@ impl CompactionSstReaderBuilder<'_> { fn time_range_to_predicate( range: TimestampRange, metadata: &RegionMetadataRef, -) -> Result> { +) -> Result { let ts_col = metadata.time_index_column(); // safety: time index column's type must be a valid timestamp type. @@ -687,10 +686,12 @@ fn time_range_to_predicate( .lt(ts_to_lit(*end, ts_col_unit)?)] } (None, None) => { - return Ok(None); + return Ok(PredicateGroup::default()); } }; - Ok(Some(Predicate::new(exprs))) + + let predicate = PredicateGroup::new(metadata, &exprs); + Ok(predicate) } fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result { diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 7c6e51509b..af185c9c61 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -35,6 +35,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::prune::PruneTimeIterator; +use crate::read::scan_region::PredicateGroup; use crate::read::Batch; use crate::region::options::{MemtableOptions, MergeMode}; use crate::sst::file::FileTimeRange; @@ -155,7 +156,7 @@ pub trait Memtable: Send + Sync + fmt::Debug { fn ranges( &self, projection: Option<&[ColumnId]>, - predicate: Option, + predicate: PredicateGroup, sequence: Option, ) -> MemtableRanges; @@ -346,14 +347,20 @@ pub struct MemtableRangeContext { id: MemtableId, /// Iterator builder. builder: BoxedIterBuilder, + /// All filters. + predicate: PredicateGroup, } pub type MemtableRangeContextRef = Arc; impl MemtableRangeContext { /// Creates a new [MemtableRangeContext]. - pub fn new(id: MemtableId, builder: BoxedIterBuilder) -> Self { - Self { id, builder } + pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self { + Self { + id, + builder, + predicate, + } } } @@ -376,10 +383,16 @@ impl MemtableRange { } /// Builds an iterator to read the range. - /// Filters the result by the specific time range. + /// Filters the result by the specific time range, this ensures memtable won't return + /// rows out of the time range when new rows are inserted. pub fn build_iter(&self, time_range: FileTimeRange) -> Result { let iter = self.context.builder.build()?; - Ok(Box::new(PruneTimeIterator::new(iter, time_range))) + let time_filters = self.context.predicate.time_filters(); + Ok(Box::new(PruneTimeIterator::new( + iter, + time_range, + time_filters, + ))) } } diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 2060a81cdc..cf7e7403ed 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -24,7 +24,8 @@ use crate::error::Result; use crate::memtable::bulk::part::BulkPart; use crate::memtable::key_values::KeyValue; use crate::memtable::{ - BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats, + BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, + MemtableStats, PredicateGroup, }; #[allow(unused)] @@ -71,7 +72,7 @@ impl Memtable for BulkMemtable { fn ranges( &self, _projection: Option<&[ColumnId]>, - _predicate: Option, + _predicate: PredicateGroup, _sequence: Option, ) -> MemtableRanges { todo!() diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index d790b65f6b..3000707418 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -43,6 +43,7 @@ use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, + PredicateGroup, }; use crate::region::options::MergeMode; use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec}; @@ -195,17 +196,17 @@ impl Memtable for PartitionTreeMemtable { fn ranges( &self, projection: Option<&[ColumnId]>, - predicate: Option, + predicate: PredicateGroup, sequence: Option, ) -> MemtableRanges { let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { tree: self.tree.clone(), projection, - predicate, + predicate: predicate.predicate().cloned(), sequence, }); - let context = Arc::new(MemtableRangeContext::new(self.id, builder)); + let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); MemtableRanges { ranges: [(0, MemtableRange::new(context))].into(), diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 75a009f945..0ffab35e50 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -48,6 +48,7 @@ use crate::memtable::stats::WriteMetrics; use crate::memtable::{ AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, + PredicateGroup, }; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::dedup::LastNonNullIter; @@ -267,7 +268,7 @@ impl Memtable for TimeSeriesMemtable { fn ranges( &self, projection: Option<&[ColumnId]>, - predicate: Option, + predicate: PredicateGroup, sequence: Option, ) -> MemtableRanges { let projection = if let Some(projection) = projection { @@ -281,12 +282,12 @@ impl Memtable for TimeSeriesMemtable { let builder = Box::new(TimeSeriesIterBuilder { series_set: self.series_set.clone(), projection, - predicate, + predicate: predicate.predicate().cloned(), dedup: self.dedup, merge_mode: self.merge_mode, sequence, }); - let context = Arc::new(MemtableRangeContext::new(self.id, builder)); + let context = Arc::new(MemtableRangeContext::new(self.id, builder, predicate)); MemtableRanges { ranges: [(0, MemtableRange::new(context))].into(), diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 07177a03f1..25f0a385d3 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_time::Timestamp; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::BooleanVectorBuilder; +use std::ops::BitAnd; +use std::sync::Arc; -use crate::error::Result; +use common_recordbatch::filter::SimpleFilterEvaluator; +use common_time::Timestamp; +use datatypes::arrow::array::BooleanArray; +use datatypes::arrow::buffer::BooleanBuffer; +use snafu::ResultExt; + +use crate::error::{FilterRecordBatchSnafu, Result}; use crate::memtable::BoxedBatchIterator; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::{Batch, BatchReader}; @@ -133,16 +138,26 @@ impl PruneReader { pub(crate) struct PruneTimeIterator { iter: BoxedBatchIterator, time_range: FileTimeRange, + /// Precise time filters. + time_filters: Option>>, } impl PruneTimeIterator { /// Creates a new `PruneTimeIterator` with the given iterator and time range. - pub(crate) fn new(iter: BoxedBatchIterator, time_range: FileTimeRange) -> Self { - Self { iter, time_range } + pub(crate) fn new( + iter: BoxedBatchIterator, + time_range: FileTimeRange, + time_filters: Option>>, + ) -> Self { + Self { + iter, + time_range, + time_filters, + } } /// Prune batch by time range. - fn prune(&self, mut batch: Batch) -> Result { + fn prune(&self, batch: Batch) -> Result { if batch.is_empty() { return Ok(batch); } @@ -152,7 +167,7 @@ impl PruneTimeIterator { if self.time_range.0 <= batch.first_timestamp().unwrap() && batch.last_timestamp().unwrap() <= self.time_range.1 { - return Ok(batch); + return self.prune_by_time_filters(batch, Vec::new()); } // slow path, prune the batch by time range. @@ -164,19 +179,41 @@ impl PruneTimeIterator { .as_timestamp() .unwrap() .unit(); - let mut filter_builder = BooleanVectorBuilder::with_capacity(batch.timestamps().len()); + let mut mask = Vec::with_capacity(batch.timestamps().len()); let timestamps = batch.timestamps_native().unwrap(); for ts in timestamps { let ts = Timestamp::new(*ts, unit); if self.time_range.0 <= ts && ts <= self.time_range.1 { - filter_builder.push(Some(true)); + mask.push(true); } else { - filter_builder.push(Some(false)); + mask.push(false); } } - let filter = filter_builder.finish(); - batch.filter(&filter)?; + self.prune_by_time_filters(batch, mask) + } + + /// Prunes the batch by time filters. + /// Also applies existing mask to the batch if the mask is not empty. + fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec) -> Result { + if let Some(filters) = &self.time_filters { + let mut mask = BooleanBuffer::new_set(batch.num_rows()); + for filter in filters.iter() { + let result = filter + .evaluate_vector(batch.timestamps()) + .context(FilterRecordBatchSnafu)?; + mask = mask.bitand(&result); + } + + if !existing_mask.is_empty() { + mask = mask.bitand(&BooleanBuffer::from(existing_mask)); + } + + batch.filter(&BooleanArray::from(mask).into())?; + } else if !existing_mask.is_empty() { + batch.filter(&BooleanArray::from(existing_mask).into())?; + } + Ok(batch) } @@ -204,6 +241,8 @@ impl Iterator for PruneTimeIterator { #[cfg(test)] mod tests { use api::v1::OpType; + use datafusion_common::ScalarValue; + use datafusion_expr::{col, lit, Expr}; use super::*; use crate::test_util::new_batch; @@ -218,6 +257,7 @@ mod tests { Timestamp::new_millisecond(0), Timestamp::new_millisecond(1000), ), + None, ); let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); assert!(actual.is_empty()); @@ -256,6 +296,7 @@ mod tests { Timestamp::new_millisecond(10), Timestamp::new_millisecond(15), ), + None, ); let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); assert_eq!( @@ -279,6 +320,7 @@ mod tests { Timestamp::new_millisecond(11), Timestamp::new_millisecond(20), ), + None, ); let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); assert_eq!( @@ -309,6 +351,7 @@ mod tests { Timestamp::new_millisecond(10), Timestamp::new_millisecond(18), ), + None, ); let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); assert_eq!( @@ -338,4 +381,124 @@ mod tests { ] ); } + + fn create_time_filters(expr: &[Expr]) -> Option>> { + let filters = expr + .iter() + .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap()) + .collect(); + Some(Arc::new(filters)) + } + + #[test] + fn test_prune_time_iter_with_time_filters() { + let input = [ + new_batch( + b"k1", + &[10, 11], + &[20, 20], + &[OpType::Put, OpType::Put], + &[110, 111], + ), + new_batch( + b"k1", + &[15, 16], + &[20, 20], + &[OpType::Put, OpType::Put], + &[115, 116], + ), + new_batch( + b"k1", + &[17, 18], + &[20, 20], + &[OpType::Put, OpType::Put], + &[117, 118], + ), + ]; + + let iter = input.clone().into_iter().map(Ok); + // We won't use the column name. + let time_filters = create_time_filters(&[ + col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))), + col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))), + ]); + let iter = PruneTimeIterator::new( + Box::new(iter), + ( + Timestamp::new_millisecond(10), + Timestamp::new_millisecond(20), + ), + time_filters, + ); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + assert_eq!( + actual, + [ + new_batch( + b"k1", + &[10, 11], + &[20, 20], + &[OpType::Put, OpType::Put], + &[110, 111], + ), + new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],), + ] + ); + } + + #[test] + fn test_prune_time_iter_in_range_with_time_filters() { + let input = [ + new_batch( + b"k1", + &[10, 11], + &[20, 20], + &[OpType::Put, OpType::Put], + &[110, 111], + ), + new_batch( + b"k1", + &[15, 16], + &[20, 20], + &[OpType::Put, OpType::Put], + &[115, 116], + ), + new_batch( + b"k1", + &[17, 18], + &[20, 20], + &[OpType::Put, OpType::Put], + &[117, 118], + ), + ]; + + let iter = input.clone().into_iter().map(Ok); + // We won't use the column name. + let time_filters = create_time_filters(&[ + col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))), + col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))), + ]); + let iter = PruneTimeIterator::new( + Box::new(iter), + ( + Timestamp::new_millisecond(5), + Timestamp::new_millisecond(18), + ), + time_filters, + ); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + assert_eq!( + actual, + [ + new_batch( + b"k1", + &[10, 11], + &[20, 20], + &[OpType::Put, OpType::Put], + &[110, 111], + ), + new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],), + ] + ); + } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 4dd5baf5b1..fca1e89860 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -19,12 +19,17 @@ use std::fmt; use std::sync::Arc; use std::time::Instant; +use api::v1::SemanticType; use common_error::ext::BoxedError; +use common_recordbatch::filter::SimpleFilterEvaluator; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; +use datafusion_common::Column; use datafusion_expr::utils::expr_to_columns; +use datafusion_expr::Expr; use smallvec::SmallVec; +use store_api::metadata::RegionMetadata; use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ScanRequest, TimeSeriesRowSelector}; use table::predicate::{build_time_range_predicate, Predicate}; @@ -339,7 +344,7 @@ impl ScanRegion { let inverted_index_applier = self.build_invereted_index_applier(); let bloom_filter_applier = self.build_bloom_filter_applier(); let fulltext_index_applier = self.build_fulltext_index_applier(); - let predicate = Predicate::new(self.request.filters.clone()); + let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?, @@ -351,7 +356,7 @@ impl ScanRegion { .map(|mem| { let ranges = mem.ranges( Some(mapper.column_ids()), - Some(predicate.clone()), + predicate.clone(), self.request.sequence, ); MemRangeBuilder::new(ranges) @@ -360,7 +365,7 @@ impl ScanRegion { let input = ScanInput::new(self.access_layer, mapper) .with_time_range(Some(time_range)) - .with_predicate(Some(predicate)) + .with_predicate(predicate) .with_memtables(memtables) .with_files(files) .with_cache(self.cache_strategy) @@ -527,7 +532,7 @@ pub(crate) struct ScanInput { /// Time range filter for time index. time_range: Option, /// Predicate to push down. - pub(crate) predicate: Option, + pub(crate) predicate: PredicateGroup, /// Memtable range builders for memtables in the time range.. pub(crate) memtables: Vec, /// Handles to SST files to scan. @@ -562,7 +567,7 @@ impl ScanInput { access_layer, mapper: Arc::new(mapper), time_range: None, - predicate: None, + predicate: PredicateGroup::default(), memtables: Vec::new(), files: Vec::new(), cache_strategy: CacheStrategy::Disabled, @@ -588,7 +593,7 @@ impl ScanInput { /// Sets predicate to push down. #[must_use] - pub(crate) fn with_predicate(mut self, predicate: Option) -> Self { + pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self { self.predicate = predicate; self } @@ -741,7 +746,7 @@ impl ScanInput { let res = self .access_layer .read_sst(file.clone()) - .predicate(self.predicate.clone()) + .predicate(self.predicate.predicate().cloned()) .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_strategy.clone()) .inverted_index_applier(self.inverted_index_applier.clone()) @@ -812,8 +817,9 @@ impl ScanInput { rows_in_files + rows_in_memtables } - pub(crate) fn predicate(&self) -> Option { - self.predicate.clone() + /// Returns table predicate of all exprs. + pub(crate) fn predicate(&self) -> Option<&Predicate> { + self.predicate.predicate() } /// Returns number of memtables to scan. @@ -915,3 +921,73 @@ impl StreamContext { Ok(()) } } + +/// Predicates to evaluate. +/// It only keeps filters that [SimpleFilterEvaluator] supports. +#[derive(Clone, Default)] +pub struct PredicateGroup { + time_filters: Option>>, + + /// Table predicate for all logical exprs to evaluate. + /// Parquet reader uses it to prune row groups. + predicate: Option, +} + +impl PredicateGroup { + /// Creates a new `PredicateGroup` from exprs according to the metadata. + pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self { + let mut time_filters = Vec::with_capacity(exprs.len()); + // Columns in the expr. + let mut columns = HashSet::new(); + for expr in exprs { + columns.clear(); + let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else { + continue; + }; + time_filters.push(filter); + } + let time_filters = if time_filters.is_empty() { + None + } else { + Some(Arc::new(time_filters)) + }; + let predicate = Predicate::new(exprs.to_vec()); + + Self { + time_filters, + predicate: Some(predicate), + } + } + + /// Returns time filters. + pub(crate) fn time_filters(&self) -> Option>> { + self.time_filters.clone() + } + + /// Returns predicate of all exprs. + pub(crate) fn predicate(&self) -> Option<&Predicate> { + self.predicate.as_ref() + } + + fn expr_to_filter( + expr: &Expr, + metadata: &RegionMetadata, + columns: &mut HashSet, + ) -> Option { + columns.clear(); + // `expr_to_columns` won't return error. + // We still ignore these expressions for safety. + expr_to_columns(expr, columns).ok()?; + if columns.len() > 1 { + // Simple filter doesn't support multiple columns. + return None; + } + let column = columns.iter().next()?; + let column_meta = metadata.column_by_name(&column.name)?; + if column_meta.semantic_type == SemanticType::Timestamp { + SimpleFilterEvaluator::try_new(expr) + } else { + None + } + } +} diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 4cb4469dc0..d8064ba86b 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -36,6 +36,7 @@ use crate::memtable::{ BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, MemtableRef, MemtableStats, }; +use crate::read::scan_region::PredicateGroup; use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; /// Empty memtable for test. @@ -92,7 +93,7 @@ impl Memtable for EmptyMemtable { fn ranges( &self, _projection: Option<&[ColumnId]>, - _predicate: Option, + _predicate: PredicateGroup, _sequence: Option, ) -> MemtableRanges { MemtableRanges::default() diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index c7d485b7fb..cbd9749b2f 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -193,7 +193,11 @@ impl TableProvider for DummyTableProvider { if self .metadata .column_by_name(simple_filter.column_name()) - .and_then(|c| (c.semantic_type == SemanticType::Tag).then_some(())) + .and_then(|c| { + (c.semantic_type == SemanticType::Tag + || c.semantic_type == SemanticType::Timestamp) + .then_some(()) + }) .is_some() { TableProviderFilterPushDown::Exact diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index 38fa690c4c..e2fdb18163 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -198,7 +198,6 @@ EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4; | 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED |_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=4 REDACTED |_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=4 REDACTED -|_|_|_FilterExec: t@1 > 8 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| diff --git a/tests/cases/standalone/common/select/tql_filter.result b/tests/cases/standalone/common/select/tql_filter.result index ffd221887a..bad003eb05 100644 --- a/tests/cases/standalone/common/select/tql_filter.result +++ b/tests/cases/standalone/common/select/tql_filter.result @@ -24,11 +24,7 @@ tql analyze (1, 3, '1s') t1{ a = "a" }; |_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [a@0 DESC NULLS LAST, b@1 DESC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED -|_|_|_FilterExec: b@1 >= -299000 AND b@1 <= 303000 REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 3_| @@ -52,11 +48,7 @@ tql analyze (1, 3, '1s') t1{ a =~ ".*" }; |_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [a@0 DESC NULLS LAST, b@1 DESC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED -|_|_|_FilterExec: b@1 >= -299000 AND b@1 <= 303000 REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 6_| @@ -83,7 +75,7 @@ tql analyze (1, 3, '1s') t1{ a =~ "a.*" }; | 1_| 0_|_SortPreservingMergeExec: [a@0 DESC NULLS LAST, b@1 DESC NULLS LAST] REDACTED |_|_|_SortExec: expr=[a@0 DESC NULLS LAST, b@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED -|_|_|_FilterExec: a@0 ~ a.* AND b@1 >= -299000 AND b@1 <= 303000 REDACTED +|_|_|_FilterExec: a@0 ~ a.* REDACTED |_|_|_RepartitionExec: partitioning=REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index b40f30f2c1..f9c1e3651e 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -26,11 +26,7 @@ TQL ANALYZE (0, 10, '5s') test; |_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST, j@1 DESC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED -|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| @@ -56,11 +52,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test; |_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST, j@1 DESC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED -|_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| @@ -85,11 +77,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp |_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST, j@1 DESC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED -|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| @@ -116,11 +104,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test; |_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST, j@1 DESC NULLS LAST] REDACTED -|_|_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[true] REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED -|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED -|_|_|_RepartitionExec: partitioning=REDACTED +| 1_| 0_|_SortExec: expr=[k@2 DESC NULLS LAST, j@1 DESC NULLS LAST], preserve_partitioning=[false] REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| |_|_| Total rows: 4_|