From af03e8913928aa921ad8eaf371a4a3d70fdc9f4e Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 9 Jul 2025 06:27:17 +0800 Subject: [PATCH] fix: stricter win sort condition (#6477) test: sqlness test: fix sqlness redacted Signed-off-by: discord9 --- src/query/src/optimizer/windowed_sort.rs | 17 +++++ src/query/src/part_sort.rs | 15 +++-- .../standalone/optimizer/windowed_sort.result | 62 +++++++++++++++++++ .../standalone/optimizer/windowed_sort.sql | 27 ++++++++ 4 files changed, 115 insertions(+), 6 deletions(-) diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 3ef6296247..590e277ede 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -181,6 +181,15 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() + || plan.as_any().is::() + || plan.as_any().is::()) + { + partition_ranges = None; + } + // TODO(discord9): do this in logical plan instead as it's lessy bugy there // Collects alias of the time index column. if let Some(projection) = plan.as_any().downcast_ref::() { @@ -194,6 +203,14 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() { + // `PerSeries` distribution is not supported in windowed sort. + if region_scan_exec.distribution() + == Some(store_api::storage::TimeSeriesDistribution::PerSeries) + { + partition_ranges = None; + return Ok(Transformed::no(plan)); + } + partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges()); // Reset time index column. time_index = HashSet::from([region_scan_exec.time_index()]); diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index cd35fb66fb..29b7307801 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -96,9 +96,10 @@ impl PartSortExec { if partition >= self.partition_ranges.len() { internal_err!( - "Partition index out of range: {} >= {}", + "Partition index out of range: {} >= {} at {}", partition, - self.partition_ranges.len() + self.partition_ranges.len(), + snafu::location!() )?; } @@ -322,9 +323,10 @@ impl PartSortStream { ) -> datafusion_common::Result<()> { if self.cur_part_idx >= self.partition_ranges.len() { internal_err!( - "Partition index out of range: {} >= {}", + "Partition index out of range: {} >= {} at {}", self.cur_part_idx, - self.partition_ranges.len() + self.partition_ranges.len(), + snafu::location!() )?; } let cur_range = self.partition_ranges[self.cur_part_idx]; @@ -355,9 +357,10 @@ impl PartSortStream { // check if the current partition index is out of range if self.cur_part_idx >= self.partition_ranges.len() { internal_err!( - "Partition index out of range: {} >= {}", + "Partition index out of range: {} >= {} at {}", self.cur_part_idx, - self.partition_ranges.len() + self.partition_ranges.len(), + snafu::location!() )?; } let cur_range = self.partition_ranges[self.cur_part_idx]; diff --git a/tests/cases/standalone/optimizer/windowed_sort.result b/tests/cases/standalone/optimizer/windowed_sort.result index 2162ded406..5b78c8c5b8 100644 --- a/tests/cases/standalone/optimizer/windowed_sort.result +++ b/tests/cases/standalone/optimizer/windowed_sort.result @@ -233,3 +233,65 @@ DROP TABLE lightning; Affected Rows: 0 +CREATE TABLE IF NOT EXISTS `instance_job_metrics` ( + `greptime_timestamp` TIMESTAMP(3) NOT NULL, + `greptime_value` DOUBLE NULL, + `instance` STRING NULL, + `job` STRING NULL, + TIME INDEX (`greptime_timestamp`), + PRIMARY KEY (`instance`, `job`) +); + +Affected Rows: 0 + +INSERT INTO `instance_job_metrics` VALUES + ('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'), + ('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'), + ('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2'); + +Affected Rows: 3 + +TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics); + ++---------------------+------------------------------------------+ +| greptime_timestamp | sum(instance_job_metrics.greptime_value) | ++---------------------+------------------------------------------+ +| 2023-10-01T00:00:01 | 1696118400.0 | +| 2023-10-01T00:00:02 | 3392236800.0 | +| 2023-10-01T00:00:03 | 5088355200.0 | +| 2023-10-01T00:00:04 | 5088355200.0 | +| 2023-10-01T00:00:05 | 5088355200.0 | ++---------------------+------------------------------------------+ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED +|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED +|_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED +|_|_|_PromSeriesDivideExec: tags=["instance", "job"] REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 5_| ++-+-+-+ + +DROP TABLE IF EXISTS `instance_job_metrics`; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/optimizer/windowed_sort.sql b/tests/cases/standalone/optimizer/windowed_sort.sql index 7573b71b91..b74db2b6b7 100644 --- a/tests/cases/standalone/optimizer/windowed_sort.sql +++ b/tests/cases/standalone/optimizer/windowed_sort.sql @@ -120,3 +120,30 @@ ORDER BY true_collect_time DESC; DROP TABLE lightning; + +CREATE TABLE IF NOT EXISTS `instance_job_metrics` ( + `greptime_timestamp` TIMESTAMP(3) NOT NULL, + `greptime_value` DOUBLE NULL, + `instance` STRING NULL, + `job` STRING NULL, + TIME INDEX (`greptime_timestamp`), + PRIMARY KEY (`instance`, `job`) +); + +INSERT INTO `instance_job_metrics` VALUES + ('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'), + ('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'), + ('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2'); + +TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics); + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics); + +DROP TABLE IF EXISTS `instance_job_metrics`;