fix: stricter win sort condition (#6477)

test: sqlness



test: fix sqlness redacted

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-07-09 06:27:17 +08:00
committed by GitHub
parent e7a64b7dc0
commit af03e89139
4 changed files with 115 additions and 6 deletions

View File

@@ -181,6 +181,15 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
is_batch_coalesced = true;
}
// only a very limited set of plans can exist between region scan and sort exec
// other plans might make this optimize wrong, so be safe here by limiting it
if !(plan.as_any().is::<ProjectionExec>()
|| plan.as_any().is::<FilterExec>()
|| plan.as_any().is::<CoalesceBatchesExec>())
{
partition_ranges = None;
}
// TODO(discord9): do this in logical plan instead as it's lessy bugy there
// Collects alias of the time index column.
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
@@ -194,6 +203,14 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
}
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
// `PerSeries` distribution is not supported in windowed sort.
if region_scan_exec.distribution()
== Some(store_api::storage::TimeSeriesDistribution::PerSeries)
{
partition_ranges = None;
return Ok(Transformed::no(plan));
}
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
// Reset time index column.
time_index = HashSet::from([region_scan_exec.time_index()]);

View File

@@ -96,9 +96,10 @@ impl PartSortExec {
if partition >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
partition,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
@@ -322,9 +323,10 @@ impl PartSortStream {
) -> datafusion_common::Result<()> {
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
self.cur_part_idx,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];
@@ -355,9 +357,10 @@ impl PartSortStream {
// check if the current partition index is out of range
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
self.cur_part_idx,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];

View File

@@ -233,3 +233,65 @@ DROP TABLE lightning;
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
`greptime_value` DOUBLE NULL,
`instance` STRING NULL,
`job` STRING NULL,
TIME INDEX (`greptime_timestamp`),
PRIMARY KEY (`instance`, `job`)
);
Affected Rows: 0
INSERT INTO `instance_job_metrics` VALUES
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
Affected Rows: 3
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
+---------------------+------------------------------------------+
| greptime_timestamp | sum(instance_job_metrics.greptime_value) |
+---------------------+------------------------------------------+
| 2023-10-01T00:00:01 | 1696118400.0 |
| 2023-10-01T00:00:02 | 3392236800.0 |
| 2023-10-01T00:00:03 | 5088355200.0 |
| 2023-10-01T00:00:04 | 5088355200.0 |
| 2023-10-01T00:00:05 | 5088355200.0 |
+---------------------+------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["instance", "job"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
DROP TABLE IF EXISTS `instance_job_metrics`;
Affected Rows: 0

View File

@@ -120,3 +120,30 @@ ORDER BY
true_collect_time DESC;
DROP TABLE lightning;
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
`greptime_value` DOUBLE NULL,
`instance` STRING NULL,
`job` STRING NULL,
TIME INDEX (`greptime_timestamp`),
PRIMARY KEY (`instance`, `job`)
);
INSERT INTO `instance_job_metrics` VALUES
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
DROP TABLE IF EXISTS `instance_job_metrics`;