diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 3d3993d454..733155c636 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::sync::Arc; +use arrow_schema::DataType; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -26,7 +26,8 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_common::Result as DataFusionResult; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_physical_expr::expressions::Column as PhysicalColumn; +use datafusion_physical_expr::expressions::{CastExpr, Column as PhysicalColumn}; +use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; use store_api::region_engine::PartitionRange; use table::table::scan::RegionScanExec; @@ -91,9 +92,11 @@ impl WindowedSortPhysicalRule { .expr .as_any() .downcast_ref::() - && scanner_info - .time_index - .contains(input_schema.field(column_expr.index()).name()) + && matches!( + input_schema.field(column_expr.index()).data_type(), + DataType::Timestamp(_, _) + ) + && is_time_index_expr(sort_input.clone(), first_sort_expr.expr.clone())? && sort_exec.fetch().is_none() // skip if there is a limit, as dyn filter along is good enough in this case { @@ -154,14 +157,11 @@ impl WindowedSortPhysicalRule { #[derive(Debug)] struct ScannerInfo { partition_ranges: Vec>, - time_index: HashSet, tag_columns: Vec, } fn fetch_partition_range(input: Arc) -> DataFusionResult> { let mut partition_ranges = None; - let mut time_index = HashSet::new(); - let mut alias_map = Vec::new(); let mut tag_columns = None; input.transform_up(|plan| { @@ -184,18 +184,6 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() { - for expr in projection.expr() { - if let Some(column_expr) = expr.expr.as_any().downcast_ref::() { - alias_map.push((column_expr.name().to_string(), expr.alias.clone())); - } - } - // resolve alias properly - time_index = resolve_alias(&alias_map, &time_index); - } - if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { // `PerSeries` distribution is not supported in windowed sort. if region_scan_exec.distribution() @@ -206,8 +194,6 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult) -> DataFusionResult) -> DataFusionResult, + expr: Arc, +) -> DataFusionResult { + if let Some(column_expr) = expr.as_any().downcast_ref::() { + return is_time_index_column(plan, column_expr); + } + + if let Some(cast_expr) = expr.as_any().downcast_ref::() { + return if matches!(cast_expr.cast_type(), DataType::Timestamp(_, _)) { + is_time_index_expr(plan, cast_expr.expr().clone()) + } else { + Ok(false) + }; + } + + if let Some(scalar_function_expr) = expr.as_any().downcast_ref::() { + return if is_supported_time_index_wrapper(scalar_function_expr) + && scalar_function_expr.args().len() == 1 + { + is_time_index_expr(plan, scalar_function_expr.args()[0].clone()) + } else { + Ok(false) + }; + } + + Ok(false) +} + +fn is_time_index_column( + plan: Arc, + column_expr: &PhysicalColumn, +) -> DataFusionResult { + if let Some(projection) = plan.as_any().downcast_ref::() { + let Some(projection_expr) = projection.expr().get(column_expr.index()) else { + return Ok(false); + }; + return is_time_index_expr(projection.input().clone(), projection_expr.expr.clone()); + } + + if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { + return Ok(matches!( + plan.schema().field(column_expr.index()).data_type(), + DataType::Timestamp(_, _) + ) && plan.schema().field(column_expr.index()).name().as_ref() + == region_scan_exec.time_index()); + } + + let Some(child) = passthrough_child(plan.as_ref()) else { + return Ok(false); + }; + is_time_index_expr(child, Arc::new(column_expr.clone())) +} + +fn passthrough_child(plan: &dyn ExecutionPlan) -> Option> { + if plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + { + return plan.children().first().cloned().cloned(); + } + + None +} + +fn is_supported_time_index_wrapper(expr: &ScalarFunctionExpr) -> bool { + matches!( + expr.name(), + "to_timestamp" + | "to_timestamp_seconds" + | "to_timestamp_millis" + | "to_timestamp_micros" + | "to_timestamp_nanos" + ) && matches!(expr.return_type(), DataType::Timestamp(_, _)) +} + /// Removes the repartition plan between the filter and region scan. fn remove_repartition( plan: Arc, @@ -248,80 +310,212 @@ fn remove_repartition( }) } -/// Resolves alias of the time index column. -/// -/// i.e if a is time index, alias= {a:b, b:c}, then result should be {a, b}(not {a, c}) because projection is not transitive -/// if alias={b:a} and a is time index, then return empty -fn resolve_alias(alias_map: &[(String, String)], time_index: &HashSet) -> HashSet { - // available old name for time index - let mut avail_old_name = time_index.clone(); - let mut new_time_index = HashSet::new(); - for (old, new) in alias_map { - if time_index.contains(old) { - new_time_index.insert(new.clone()); - } else if time_index.contains(new) && old != new { - // other alias to time index, remove the old name - avail_old_name.remove(new); - continue; - } - } - // add the remaining time index that is not in alias map - new_time_index.extend(avail_old_name); - new_time_index -} - #[cfg(test)] mod test { - use itertools::Itertools; + use std::sync::Arc; + + use api::v1::SemanticType; + use arrow_schema::{Field, TimeUnit}; + use common_recordbatch::RecordBatches; + use datafusion::config::ConfigOptions; + use datafusion_functions::datetime::to_timestamp_millis; + use datafusion_physical_expr::expressions::CastExpr; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::region_engine::SinglePartitionScanner; + use store_api::storage::{RegionId, ScanRequest}; use super::*; #[test] - fn test_alias() { - let testcases = [ - // notice the old name is still in the result - ( - vec![("a", "b"), ("b", "c")], - HashSet::from(["a"]), - HashSet::from(["a", "b"]), - ), - // alias swap - ( - vec![("b", "a"), ("a", "b")], - HashSet::from(["a"]), - HashSet::from(["b"]), - ), - ( - vec![("b", "a"), ("b", "c")], - HashSet::from(["a"]), - HashSet::from([]), - ), - // not in alias map - ( - vec![("c", "d"), ("d", "c")], - HashSet::from(["a"]), - HashSet::from(["a"]), - ), - // no alias - (vec![], HashSet::from(["a"]), HashSet::from(["a"])), - // empty time index - (vec![], HashSet::from([]), HashSet::from([])), - ]; - for (alias_map, time_index, expected) in testcases { - let alias_map = alias_map - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect_vec(); - let time_index = time_index.into_iter().map(|i| i.to_string()).collect(); - let expected: HashSet = expected.into_iter().map(|i| i.to_string()).collect(); + fn test_is_time_index_expr_tracks_aliases_through_projection() { + let scan = new_region_scan(); + let projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(PhysicalColumn::new("ts", 1)) as Arc, + "alias_ts".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; - assert_eq!( - expected, - resolve_alias(&alias_map, &time_index), - "alias_map={:?}, time_index={:?}", - alias_map, - time_index - ); - } + assert!( + is_time_index_expr(projection, Arc::new(PhysicalColumn::new("alias_ts", 0))).unwrap() + ); + } + + #[test] + fn test_is_time_index_expr_tracks_multi_level_aliases() { + let scan = new_region_scan(); + let first_projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(PhysicalColumn::new("ts", 1)) as Arc, + "alias_1".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + let second_projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(PhysicalColumn::new("alias_1", 0)) as Arc, + "alias_2".to_string(), + )], + first_projection, + ) + .unwrap(), + ) as Arc; + + assert!( + is_time_index_expr( + second_projection, + Arc::new(PhysicalColumn::new("alias_2", 0)) + ) + .unwrap() + ); + } + + #[test] + fn test_is_time_index_expr_tracks_wrapped_aliases_through_projection() { + let scan = new_region_scan(); + let config = Arc::new(ConfigOptions::default()); + let return_field = Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + let projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(ScalarFunctionExpr::new( + "to_timestamp_millis", + to_timestamp_millis(config.as_ref()), + vec![Arc::new(PhysicalColumn::new("ts", 1))], + return_field, + config, + )) as Arc, + "ts".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + + assert!(is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts", 0))).unwrap()); + } + + #[test] + fn test_is_time_index_expr_tracks_cast_aliases_through_projection() { + let scan = new_region_scan(); + let projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(CastExpr::new( + Arc::new(PhysicalColumn::new("ts", 1)), + DataType::Timestamp(TimeUnit::Millisecond, None), + None, + )) as Arc, + "ts_ms".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + + assert!(is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts_ms", 0))).unwrap()); + } + + #[test] + fn test_is_time_index_expr_rejects_unsupported_wrappers() { + let scan = new_region_scan(); + let config = Arc::new(ConfigOptions::default()); + let return_field = Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + let projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(ScalarFunctionExpr::new( + "date_trunc", + to_timestamp_millis(config.as_ref()), + vec![Arc::new(PhysicalColumn::new("ts", 1))], + return_field, + config, + )) as Arc, + "ts".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + + assert!(!is_time_index_expr(projection, Arc::new(PhysicalColumn::new("ts", 0))).unwrap()); + } + + #[test] + fn test_is_time_index_expr_rejects_non_timestamp_casts() { + let scan = new_region_scan(); + let cast_expr = CastExpr::new( + Arc::new(PhysicalColumn::new("ts", 1)), + DataType::Timestamp(TimeUnit::Millisecond, None), + None, + ); + assert!(is_time_index_expr(scan.clone(), Arc::new(cast_expr)).unwrap()); + + let non_timestamp_cast = CastExpr::new( + Arc::new(PhysicalColumn::new("ts", 1)), + DataType::Int64, + None, + ); + assert!(!is_time_index_expr(scan, Arc::new(non_timestamp_cast)).unwrap()); + } + + fn new_region_scan() -> Arc { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("value", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_nanosecond_datatype(), + false, + ), + ])); + let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap(); + let stream = recordbatches.as_stream(); + + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "value", + ConcreteDataType::int32_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_nanosecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }); + + let scanner = Box::new(SinglePartitionScanner::new( + stream, + false, + Arc::new(builder.build().unwrap()), + None, + )); + Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap()) } } diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 19a114c8ce..933f0e47cf 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -56,7 +56,7 @@ use snafu::location; use store_api::region_engine::PartitionRange; use crate::error::Result; -use crate::window_sort::check_partition_range_monotonicity; +use crate::window_sort::{check_partition_range_monotonicity, project_partition_range_for_sort}; use crate::{array_iter_helper, downcast_ts_array}; /// Get the primary end of a `PartitionRange` based on sort direction. @@ -473,6 +473,7 @@ impl PartSortStream { snafu::location!() )? }; + let cur_range = project_partition_range_for_sort(cur_range, sort_column.data_type())?; downcast_ts_array!( sort_column.data_type() => (array_check_helper, sort_column, cur_range, min_max_idx), @@ -506,7 +507,10 @@ impl PartSortStream { snafu::location!() )?; } - let cur_range = self.partition_ranges[self.cur_part_idx]; + let cur_range = project_partition_range_for_sort( + self.partition_ranges[self.cur_part_idx], + sort_column.data_type(), + )?; let sort_column_iter = downcast_ts_array!( sort_column.data_type() => (array_iter_helper, sort_column), diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index 83feee6f30..89fbad63ea 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -30,6 +30,7 @@ use common_error::status_code::StatusCode; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; use common_telemetry::error; use common_time::Timestamp; +use common_time::timestamp::TimeUnit as TimestampUnit; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool}; use datafusion::execution::{RecordBatchStream, TaskContext}; use datafusion::physical_plan::memory::MemoryStream; @@ -778,18 +779,11 @@ fn find_slice_from_range( sort_column: &SortColumn, range: &TimeRange, ) -> datafusion_common::Result<(usize, usize)> { - let ty = sort_column.values.data_type(); - let time_unit = if let DataType::Timestamp(unit, _) = ty { - unit - } else { - return Err(DataFusionError::Internal(format!( - "Unsupported sort column type: {}", - sort_column.values.data_type() - ))); - }; + let time_unit = sort_timestamp_unit(sort_column.values.data_type())?; let array = &sort_column.values; let opt = &sort_column.options.unwrap_or_default(); let descending = opt.descending; + let range = convert_time_range_for_sort(range, time_unit)?; let typed_sorted_range = [range.start, range.end] .iter() @@ -836,6 +830,59 @@ fn find_slice_from_range( Ok((start, end - start)) } +fn sort_timestamp_unit(data_type: &DataType) -> datafusion_common::Result { + if let DataType::Timestamp(unit, _) = data_type { + Ok(*unit) + } else { + Err(DataFusionError::Internal(format!( + "Unsupported sort column type: {data_type}" + ))) + } +} + +fn convert_time_range_for_sort( + range: &TimeRange, + time_unit: arrow_schema::TimeUnit, +) -> datafusion_common::Result { + let target_unit = time_unit.into(); + Ok(TimeRange::new( + convert_timestamp_range_bound(range.start, target_unit, false)?, + convert_timestamp_range_bound(range.end, target_unit, true)?, + )) +} + +fn convert_timestamp_range_bound( + timestamp: Timestamp, + target_unit: TimestampUnit, + round_exclusive_end_up: bool, +) -> datafusion_common::Result { + let converted = if round_exclusive_end_up { + timestamp.convert_to_ceil(target_unit) + } else { + timestamp.convert_to(target_unit) + }; + + converted.ok_or_else(|| { + DataFusionError::Internal(format!( + "Failed to convert timestamp from {:?} to {:?}", + timestamp.unit(), + target_unit + )) + }) +} + +pub(crate) fn project_partition_range_for_sort( + range: PartitionRange, + sort_data_type: &DataType, +) -> datafusion_common::Result { + let target_unit = sort_timestamp_unit(sort_data_type)?.into(); + Ok(PartitionRange { + start: convert_timestamp_range_bound(range.start, target_unit, false)?, + end: convert_timestamp_range_bound(range.end, target_unit, true)?, + ..range + }) +} + /// Get an iterator from a primitive array. /// /// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`. @@ -1498,6 +1545,39 @@ mod test { } } + #[test] + fn test_project_partition_range_for_sort_uses_ceil_on_exclusive_end() { + let range = PartitionRange { + start: Timestamp::new_nanosecond(1_000_000), + end: Timestamp::new_nanosecond(1_000_001), + num_rows: 1, + identifier: 0, + }; + + let projected = project_partition_range_for_sort( + range, + &DataType::Timestamp(TimeUnit::Millisecond, None), + ) + .unwrap(); + + assert_eq!(Timestamp::new_millisecond(1), projected.start); + assert_eq!(Timestamp::new_millisecond(2), projected.end); + } + + #[test] + fn test_find_slice_from_range_preserves_last_row_after_precision_drop() { + let sort_column = SortColumn { + values: Arc::new(TimestampMillisecondArray::from_iter_values([1])) as ArrayRef, + options: Some(SortOptions::default()), + }; + let range = TimeRange::new( + Timestamp::new_nanosecond(1_000_000), + Timestamp::new_nanosecond(1_000_001), + ); + + assert_eq!((0, 1), find_slice_from_range(&sort_column, &range).unwrap()); + } + #[allow(clippy::type_complexity)] fn run_compute_working_ranges_test( testcases: Vec<( diff --git a/tests/cases/distributed/optimizer/windowed_sort_advance.result b/tests/cases/distributed/optimizer/windowed_sort_advance.result new file mode 100644 index 0000000000..e3c92a0a08 --- /dev/null +++ b/tests/cases/distributed/optimizer/windowed_sort_advance.result @@ -0,0 +1,118 @@ +create table `a` (`value` double, `status` bigint, ts timestamp(9) time index); + +Affected Rows: 0 + +INSERT INTO `a` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.000000000+08:00'), +(46.84, 200, '2026-03-12T08:00:15.000000000+08:00'), +(46.85, 200, '2026-03-12T08:00:25.000000000+08:00'), +(46.86, 200, '2026-03-12T08:00:35.000000000+08:00'), +(46.88, 200, '2026-03-12T08:00:45.000000000+08:00'), +(46.89, 200, '2026-03-12T08:00:55.000000000+08:00'), +(46.91, 200, '2026-03-12T08:01:05.000000000+08:00'), +(46.90, 200, '2026-03-12T08:01:15.000000000+08:00'), +(46.87, 200, '2026-03-12T08:01:25.000000000+08:00'), +(46.85, 200, '2026-03-12T08:01:35.000000000+08:00'); + +Affected Rows: 10 + +select ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=ts@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[ts@2 as ts, status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_ProjectionExec: expr=[to_timestamp_millis(a.ts)@0 as ts, status@1 as status, value@2 as value] REDACTED +|_|_|_SortPreservingMergeExec: [to_timestamp_millis(a.ts)@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=to_timestamp_millis(a.ts)@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[to_timestamp_millis(ts@2) as to_timestamp_millis(a.ts), status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +DROP TABLE `a`; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/optimizer/windowed_sort_advance.sql b/tests/cases/distributed/optimizer/windowed_sort_advance.sql new file mode 100644 index 0000000000..fa73b97933 --- /dev/null +++ b/tests/cases/distributed/optimizer/windowed_sort_advance.sql @@ -0,0 +1,37 @@ +create table `a` (`value` double, `status` bigint, ts timestamp(9) time index); + +INSERT INTO `a` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.000000000+08:00'), +(46.84, 200, '2026-03-12T08:00:15.000000000+08:00'), +(46.85, 200, '2026-03-12T08:00:25.000000000+08:00'), +(46.86, 200, '2026-03-12T08:00:35.000000000+08:00'), +(46.88, 200, '2026-03-12T08:00:45.000000000+08:00'), +(46.89, 200, '2026-03-12T08:00:55.000000000+08:00'), +(46.91, 200, '2026-03-12T08:01:05.000000000+08:00'), +(46.90, 200, '2026-03-12T08:01:15.000000000+08:00'), +(46.87, 200, '2026-03-12T08:01:25.000000000+08:00'), +(46.85, 200, '2026-03-12T08:01:35.000000000+08:00'); + +select ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +DROP TABLE `a`; diff --git a/tests/cases/standalone/optimizer/windowed_sort_advance.result b/tests/cases/standalone/optimizer/windowed_sort_advance.result new file mode 100644 index 0000000000..c0be1bece9 --- /dev/null +++ b/tests/cases/standalone/optimizer/windowed_sort_advance.result @@ -0,0 +1,117 @@ +create table `a` (`value` double, `status` bigint, ts timestamp(9) time index); + +Affected Rows: 0 + +INSERT INTO `a` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.000000000+08:00'), +(46.84, 200, '2026-03-12T08:00:15.000000000+08:00'), +(46.85, 200, '2026-03-12T08:00:25.000000000+08:00'), +(46.86, 200, '2026-03-12T08:00:35.000000000+08:00'), +(46.88, 200, '2026-03-12T08:00:45.000000000+08:00'), +(46.89, 200, '2026-03-12T08:00:55.000000000+08:00'), +(46.91, 200, '2026-03-12T08:01:05.000000000+08:00'), +(46.90, 200, '2026-03-12T08:01:15.000000000+08:00'), +(46.87, 200, '2026-03-12T08:01:25.000000000+08:00'), +(46.85, 200, '2026-03-12T08:01:35.000000000+08:00'); + +Affected Rows: 10 + +select ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=ts@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[ts@2 as ts, status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=ts@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[to_timestamp_millis(ts@2) as ts, status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +DROP TABLE `a`; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/optimizer/windowed_sort_advance.sql b/tests/cases/standalone/optimizer/windowed_sort_advance.sql new file mode 100644 index 0000000000..fa73b97933 --- /dev/null +++ b/tests/cases/standalone/optimizer/windowed_sort_advance.sql @@ -0,0 +1,37 @@ +create table `a` (`value` double, `status` bigint, ts timestamp(9) time index); + +INSERT INTO `a` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.000000000+08:00'), +(46.84, 200, '2026-03-12T08:00:15.000000000+08:00'), +(46.85, 200, '2026-03-12T08:00:25.000000000+08:00'), +(46.86, 200, '2026-03-12T08:00:35.000000000+08:00'), +(46.88, 200, '2026-03-12T08:00:45.000000000+08:00'), +(46.89, 200, '2026-03-12T08:00:55.000000000+08:00'), +(46.91, 200, '2026-03-12T08:01:05.000000000+08:00'), +(46.90, 200, '2026-03-12T08:01:15.000000000+08:00'), +(46.87, 200, '2026-03-12T08:01:25.000000000+08:00'), +(46.85, 200, '2026-03-12T08:01:35.000000000+08:00'); + +select ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +DROP TABLE `a`;