mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 01:40:36 +00:00
feat: window sort supports where on fields and time index (#5527)
* feat: handle filter for window sort * test: sqlness filter test for window sort * test: add test on tag column filter * test: test for filter on ts * test: update sqlness test
This commit is contained in:
@@ -17,6 +17,7 @@ use std::sync::Arc;
|
||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
|
||||
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
|
||||
use datafusion::physical_plan::filter::FilterExec;
|
||||
use datafusion::physical_plan::repartition::RepartitionExec;
|
||||
use datafusion::physical_plan::sorts::sort::SortExec;
|
||||
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
|
||||
@@ -76,8 +77,9 @@ impl WindowedSortPhysicalRule {
|
||||
|
||||
let preserve_partitioning = sort_exec.preserve_partitioning();
|
||||
|
||||
let Some(scanner_info) = fetch_partition_range(sort_exec.input().clone())?
|
||||
else {
|
||||
let sort_input = remove_repartition(sort_exec.input().clone())?.data;
|
||||
// Gets scanner info from the input without repartition before filter.
|
||||
let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else {
|
||||
return Ok(Transformed::no(plan));
|
||||
};
|
||||
|
||||
@@ -99,13 +101,13 @@ impl WindowedSortPhysicalRule {
|
||||
let new_input = if scanner_info.tag_columns.is_empty()
|
||||
&& !first_sort_expr.options.descending
|
||||
{
|
||||
sort_exec.input().clone()
|
||||
sort_input
|
||||
} else {
|
||||
Arc::new(PartSortExec::new(
|
||||
first_sort_expr.clone(),
|
||||
sort_exec.fetch(),
|
||||
scanner_info.partition_ranges.clone(),
|
||||
sort_exec.input().clone(),
|
||||
sort_input,
|
||||
))
|
||||
};
|
||||
|
||||
@@ -194,3 +196,24 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Removes the repartition plan between the filter and region scan.
|
||||
fn remove_repartition(
|
||||
plan: Arc<dyn ExecutionPlan>,
|
||||
) -> DataFusionResult<Transformed<Arc<dyn ExecutionPlan>>> {
|
||||
plan.transform_down(|plan| {
|
||||
if plan.as_any().is::<FilterExec>() {
|
||||
// Checks child.
|
||||
let maybe_repartition = plan.children()[0];
|
||||
if maybe_repartition.as_any().is::<RepartitionExec>() {
|
||||
let maybe_scan = maybe_repartition.children()[0];
|
||||
if maybe_scan.as_any().is::<RegionScanExec>() {
|
||||
let new_filter = plan.clone().with_new_children(vec![maybe_scan.clone()])?;
|
||||
return Ok(Transformed::yes(new_filter));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Transformed::no(plan))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -297,17 +297,17 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2;
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|
||||
|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=1 fetch=2 REDACTED
|
||||
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=1 limit=2 REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|
||||
|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=1 fetch=2 REDACTED
|
||||
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=1 limit=2 REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 2_|
|
||||
|
||||
@@ -106,6 +106,107 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5;
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
|
||||
-- Filter on a field.
|
||||
SELECT * FROM test where i > 2 ORDER BY t LIMIT 4;
|
||||
|
||||
+---+-------------------------+
|
||||
| i | t |
|
||||
+---+-------------------------+
|
||||
| 3 | 1970-01-01T00:00:00.007 |
|
||||
| 3 | 1970-01-01T00:00:00.008 |
|
||||
| 3 | 1970-01-01T00:00:00.009 |
|
||||
| 4 | 1970-01-01T00:00:00.010 |
|
||||
+---+-------------------------+
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t LIMIT 4;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST], fetch=4 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=4 fetch=4 REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: i@0 > 2 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- Filter on a field.
|
||||
SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4;
|
||||
|
||||
+---+-------------------------+
|
||||
| i | t |
|
||||
+---+-------------------------+
|
||||
| 4 | 1970-01-01T00:00:00.012 |
|
||||
| 4 | 1970-01-01T00:00:00.011 |
|
||||
| 4 | 1970-01-01T00:00:00.010 |
|
||||
| 3 | 1970-01-01T00:00:00.009 |
|
||||
+---+-------------------------+
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=4 fetch=4 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=4 limit=4 REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: i@0 > 2 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- Filter on the time index.
|
||||
SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4;
|
||||
|
||||
+---+-------------------------+
|
||||
| i | t |
|
||||
+---+-------------------------+
|
||||
| 4 | 1970-01-01T00:00:00.012 |
|
||||
| 4 | 1970-01-01T00:00:00.011 |
|
||||
| 4 | 1970-01-01T00:00:00.010 |
|
||||
| 3 | 1970-01-01T00:00:00.009 |
|
||||
+---+-------------------------+
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=4 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=4 REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: t@1 > 8 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
DROP TABLE test;
|
||||
|
||||
Affected Rows: 0
|
||||
@@ -219,6 +320,39 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5;
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
|
||||
-- Filter on a pk column.
|
||||
SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5;
|
||||
|
||||
+----+---+-------------------------+
|
||||
| pk | i | t |
|
||||
+----+---+-------------------------+
|
||||
| 8 | 3 | 1970-01-01T00:00:00.008 |
|
||||
| 9 | 3 | 1970-01-01T00:00:00.009 |
|
||||
| 10 | 4 | 1970-01-01T00:00:00.010 |
|
||||
| 11 | 4 | 1970-01-01T00:00:00.011 |
|
||||
| 12 | 4 | 1970-01-01T00:00:00.012 |
|
||||
+----+---+-------------------------+
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED
|
||||
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 fetch=5 REDACTED
|
||||
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 limit=5 REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
|
||||
DROP TABLE test_pk;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -33,6 +33,36 @@ SELECT * FROM test ORDER BY t DESC LIMIT 5;
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5;
|
||||
|
||||
-- Filter on a field.
|
||||
SELECT * FROM test where i > 2 ORDER BY t LIMIT 4;
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t LIMIT 4;
|
||||
|
||||
-- Filter on a field.
|
||||
SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4;
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4;
|
||||
|
||||
-- Filter on the time index.
|
||||
SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4;
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4;
|
||||
|
||||
DROP TABLE test;
|
||||
|
||||
-- Test with PK, with a windowed sort query.
|
||||
@@ -70,4 +100,14 @@ SELECT * FROM test_pk ORDER BY t DESC LIMIT 5;
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5;
|
||||
|
||||
-- Filter on a pk column.
|
||||
SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5;
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
EXPLAIN ANALYZE SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5;
|
||||
|
||||
DROP TABLE test_pk;
|
||||
|
||||
Reference in New Issue
Block a user