diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 3d3993d454..2480613ec6 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::sync::Arc; +use arrow_schema::DataType; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -26,7 +26,8 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_common::Result as DataFusionResult; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_physical_expr::expressions::Column as PhysicalColumn; +use datafusion_physical_expr::expressions::{CastExpr, Column as PhysicalColumn}; +use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; use store_api::region_engine::PartitionRange; use table::table::scan::RegionScanExec; @@ -91,9 +92,11 @@ impl WindowedSortPhysicalRule { .expr .as_any() .downcast_ref::() - && scanner_info - .time_index - .contains(input_schema.field(column_expr.index()).name()) + && matches!( + input_schema.field(column_expr.index()).data_type(), + DataType::Timestamp(_, _) + ) + && is_time_index_expr(&sort_input, &first_sort_expr.expr)? && sort_exec.fetch().is_none() // skip if there is a limit, as dyn filter along is good enough in this case { @@ -154,14 +157,11 @@ impl WindowedSortPhysicalRule { #[derive(Debug)] struct ScannerInfo { partition_ranges: Vec>, - time_index: HashSet, tag_columns: Vec, } fn fetch_partition_range(input: Arc) -> DataFusionResult> { let mut partition_ranges = None; - let mut time_index = HashSet::new(); - let mut alias_map = Vec::new(); let mut tag_columns = None; input.transform_up(|plan| { @@ -184,18 +184,6 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() { - for expr in projection.expr() { - if let Some(column_expr) = expr.expr.as_any().downcast_ref::() { - alias_map.push((column_expr.name().to_string(), expr.alias.clone())); - } - } - // resolve alias properly - time_index = resolve_alias(&alias_map, &time_index); - } - if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { // `PerSeries` distribution is not supported in windowed sort. if region_scan_exec.distribution() @@ -206,8 +194,6 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult) -> DataFusionResult) -> DataFusionResult, + expr: &Arc, +) -> DataFusionResult { + if let Some(column_expr) = expr.as_any().downcast_ref::() { + return is_time_index_column(plan, column_expr); + } + + if let Some(cast_expr) = expr.as_any().downcast_ref::() { + return if matches!(cast_expr.cast_type(), DataType::Timestamp(_, _)) { + is_time_index_expr(plan, cast_expr.expr()) + } else { + Ok(false) + }; + } + + if let Some(scalar_function_expr) = expr.as_any().downcast_ref::() { + return if is_supported_time_index_wrapper(scalar_function_expr) + && scalar_function_expr.args().len() == 1 + { + is_time_index_expr(plan, &scalar_function_expr.args()[0]) + } else { + Ok(false) + }; + } + + Ok(false) +} + +fn is_time_index_column( + plan: &Arc, + column_expr: &PhysicalColumn, +) -> DataFusionResult { + if let Some(projection) = plan.as_any().downcast_ref::() { + let Some(projection_expr) = projection.expr().get(column_expr.index()) else { + return Ok(false); + }; + return is_time_index_expr(projection.input(), &projection_expr.expr); + } + + if let Some(filter) = plan.as_any().downcast_ref::() { + let child_column_expr = filter + .projection() + .as_ref() + .and_then(|projection| projection.get(column_expr.index()).copied()) + .map(|input_index| { + PhysicalColumn::new( + filter.input().schema().field(input_index).name(), + input_index, + ) + }) + .unwrap_or_else(|| column_expr.clone()); + let child_expr = Arc::new(child_column_expr) as Arc; + return is_time_index_expr(filter.input(), &child_expr); + } + + if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { + let schema = plan.schema(); + let column_field = schema.field(column_expr.index()); + return Ok( + matches!(column_field.data_type(), DataType::Timestamp(_, _)) + && column_field.name().as_ref() == region_scan_exec.time_index(), + ); + } + + let Some(child) = passthrough_child(plan.as_ref()) else { + return Ok(false); + }; + let child_expr = Arc::new(column_expr.clone()) as Arc; + is_time_index_expr(&child, &child_expr) +} + +fn passthrough_child(plan: &dyn ExecutionPlan) -> Option> { + if plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + { + return schema_preserving_child(plan); + } + + None +} + +fn schema_preserving_child(plan: &dyn ExecutionPlan) -> Option> { + let child = plan.children().first().cloned().cloned()?; + (plan.schema().as_ref() == child.schema().as_ref()).then_some(child) +} + +fn is_supported_time_index_wrapper(expr: &ScalarFunctionExpr) -> bool { + (expr.name().eq_ignore_ascii_case("to_timestamp") + || expr.name().eq_ignore_ascii_case("to_timestamp_seconds") + || expr.name().eq_ignore_ascii_case("to_timestamp_millis") + || expr.name().eq_ignore_ascii_case("to_timestamp_micros") + || expr.name().eq_ignore_ascii_case("to_timestamp_nanos")) + && matches!(expr.return_type(), DataType::Timestamp(_, _)) +} + /// Removes the repartition plan between the filter and region scan. fn remove_repartition( plan: Arc, @@ -248,80 +330,321 @@ fn remove_repartition( }) } -/// Resolves alias of the time index column. -/// -/// i.e if a is time index, alias= {a:b, b:c}, then result should be {a, b}(not {a, c}) because projection is not transitive -/// if alias={b:a} and a is time index, then return empty -fn resolve_alias(alias_map: &[(String, String)], time_index: &HashSet) -> HashSet { - // available old name for time index - let mut avail_old_name = time_index.clone(); - let mut new_time_index = HashSet::new(); - for (old, new) in alias_map { - if time_index.contains(old) { - new_time_index.insert(new.clone()); - } else if time_index.contains(new) && old != new { - // other alias to time index, remove the old name - avail_old_name.remove(new); - continue; - } - } - // add the remaining time index that is not in alias map - new_time_index.extend(avail_old_name); - new_time_index -} - #[cfg(test)] mod test { - use itertools::Itertools; + use std::sync::Arc; + + use api::v1::SemanticType; + use arrow_schema::{Field, TimeUnit}; + use common_recordbatch::RecordBatches; + use datafusion::config::ConfigOptions; + use datafusion::physical_plan::filter::FilterExecBuilder; + use datafusion_common::ScalarValue; + use datafusion_functions::datetime::to_timestamp_millis; + use datafusion_physical_expr::expressions::{CastExpr, Literal}; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::region_engine::SinglePartitionScanner; + use store_api::storage::{RegionId, ScanRequest}; use super::*; #[test] - fn test_alias() { - let testcases = [ - // notice the old name is still in the result - ( - vec![("a", "b"), ("b", "c")], - HashSet::from(["a"]), - HashSet::from(["a", "b"]), - ), - // alias swap - ( - vec![("b", "a"), ("a", "b")], - HashSet::from(["a"]), - HashSet::from(["b"]), - ), - ( - vec![("b", "a"), ("b", "c")], - HashSet::from(["a"]), - HashSet::from([]), - ), - // not in alias map - ( - vec![("c", "d"), ("d", "c")], - HashSet::from(["a"]), - HashSet::from(["a"]), - ), - // no alias - (vec![], HashSet::from(["a"]), HashSet::from(["a"])), - // empty time index - (vec![], HashSet::from([]), HashSet::from([])), - ]; - for (alias_map, time_index, expected) in testcases { - let alias_map = alias_map - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect_vec(); - let time_index = time_index.into_iter().map(|i| i.to_string()).collect(); - let expected: HashSet = expected.into_iter().map(|i| i.to_string()).collect(); + fn test_is_time_index_expr_tracks_aliases_through_projection() { + let scan = new_region_scan(); + let projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(PhysicalColumn::new("ts", 1)) as Arc, + "alias_ts".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + let expr = Arc::new(PhysicalColumn::new("alias_ts", 0)) as Arc; - assert_eq!( - expected, - resolve_alias(&alias_map, &time_index), - "alias_map={:?}, time_index={:?}", - alias_map, - time_index - ); - } + assert!(is_time_index_expr(&projection, &expr).unwrap()); + } + + #[test] + fn test_is_time_index_expr_tracks_multi_level_aliases() { + let scan = new_region_scan(); + let first_projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(PhysicalColumn::new("ts", 1)) as Arc, + "alias_1".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + let second_projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(PhysicalColumn::new("alias_1", 0)) as Arc, + "alias_2".to_string(), + )], + first_projection, + ) + .unwrap(), + ) as Arc; + let expr = Arc::new(PhysicalColumn::new("alias_2", 0)) as Arc; + + assert!(is_time_index_expr(&second_projection, &expr).unwrap()); + } + + #[test] + fn test_is_time_index_expr_tracks_wrapped_aliases_through_projection() { + let scan = new_region_scan(); + let config = Arc::new(ConfigOptions::default()); + let return_field = Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + let projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(ScalarFunctionExpr::new( + "to_timestamp_millis", + to_timestamp_millis(config.as_ref()), + vec![Arc::new(PhysicalColumn::new("ts", 1))], + return_field, + config, + )) as Arc, + "ts".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc; + + assert!(is_time_index_expr(&projection, &expr).unwrap()); + } + + #[test] + fn test_is_time_index_expr_tracks_cast_aliases_through_projection() { + let scan = new_region_scan(); + let projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(CastExpr::new( + Arc::new(PhysicalColumn::new("ts", 1)), + DataType::Timestamp(TimeUnit::Millisecond, None), + None, + )) as Arc, + "ts_ms".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts_ms", 0)) as Arc; + + assert!(is_time_index_expr(&projection, &expr).unwrap()); + } + + #[test] + fn test_is_time_index_expr_rejects_unsupported_wrappers() { + let scan = new_region_scan(); + let config = Arc::new(ConfigOptions::default()); + let return_field = Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + let projection = Arc::new( + ProjectionExec::try_new( + vec![( + Arc::new(ScalarFunctionExpr::new( + "date_trunc", + to_timestamp_millis(config.as_ref()), + vec![Arc::new(PhysicalColumn::new("ts", 1))], + return_field, + config, + )) as Arc, + "ts".to_string(), + )], + scan, + ) + .unwrap(), + ) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc; + + assert!(!is_time_index_expr(&projection, &expr).unwrap()); + } + + #[test] + fn test_is_supported_time_index_wrapper_ignores_function_name_case() { + let config = Arc::new(ConfigOptions::default()); + let return_field = Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )); + let expr = ScalarFunctionExpr::new( + "To_Timestamp_Millis", + to_timestamp_millis(config.as_ref()), + vec![Arc::new(PhysicalColumn::new("ts", 1))], + return_field, + config, + ); + + assert!(is_supported_time_index_wrapper(&expr)); + } + + #[test] + fn test_is_time_index_expr_rejects_non_timestamp_casts() { + let scan = new_region_scan(); + let cast_expr = Arc::new(CastExpr::new( + Arc::new(PhysicalColumn::new("ts", 1)), + DataType::Timestamp(TimeUnit::Millisecond, None), + None, + )) as Arc; + assert!(is_time_index_expr(&scan, &cast_expr).unwrap()); + + let non_timestamp_cast = Arc::new(CastExpr::new( + Arc::new(PhysicalColumn::new("ts", 1)), + DataType::Int64, + None, + )) as Arc; + assert!(!is_time_index_expr(&scan, &non_timestamp_cast).unwrap()); + } + + #[test] + fn test_is_time_index_expr_tracks_time_index_through_filter() { + let scan = new_region_scan(); + let filter = Arc::new( + FilterExec::try_new( + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + scan, + ) + .unwrap(), + ) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts", 1)) as Arc; + + assert!(is_time_index_expr(&filter, &expr).unwrap()); + } + + #[test] + fn test_is_time_index_expr_tracks_time_index_through_passthrough_wrapper_and_filter_projection() + { + let scan = new_region_scan(); + let projected_filter = Arc::new( + FilterExecBuilder::new( + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + scan, + ) + .apply_projection(Some(vec![1])) + .unwrap() + .build() + .unwrap(), + ) as Arc; + let cooperative = + Arc::new(CooperativeExec::new(projected_filter)) as Arc; + let expr = Arc::new(PhysicalColumn::new("ts", 0)) as Arc; + + assert!(is_time_index_expr(&cooperative, &expr).unwrap()); + } + + #[test] + fn test_schema_preserving_child_rejects_schema_changing_projection() { + let scan = new_region_scan(); + let projection = ProjectionExec::try_new( + vec![( + Arc::new(PhysicalColumn::new("ts", 1)) as Arc, + "ts".to_string(), + )], + scan, + ) + .unwrap(); + + assert!(schema_preserving_child(&projection).is_none()); + } + + #[test] + fn test_cooperative_exec_satisfies_passthrough_schema_contract() { + let child = new_region_scan(); + let plan = Arc::new(CooperativeExec::new(child.clone())) as Arc; + + assert_passthrough_schema_contract(plan, child); + } + + #[test] + fn test_repartition_exec_satisfies_passthrough_schema_contract() { + let child = new_region_scan(); + let plan = Arc::new( + RepartitionExec::try_new( + child.clone(), + datafusion_physical_expr::Partitioning::RoundRobinBatch(2), + ) + .unwrap(), + ) as Arc; + + assert_passthrough_schema_contract(plan, child); + } + + #[test] + fn test_coalesce_partitions_exec_satisfies_passthrough_schema_contract() { + let child = new_region_scan(); + let plan = Arc::new(CoalescePartitionsExec::new(child.clone())) as Arc; + + assert_passthrough_schema_contract(plan, child); + } + + fn assert_passthrough_schema_contract( + plan: Arc, + child: Arc, + ) { + assert_eq!(plan.schema().as_ref(), child.schema().as_ref()); + + let passthrough = passthrough_child(plan.as_ref()).expect("wrapper should preserve schema"); + assert_eq!(passthrough.schema().as_ref(), child.schema().as_ref()); + } + + fn new_region_scan() -> Arc { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("value", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_nanosecond_datatype(), + false, + ), + ])); + let recordbatches = RecordBatches::try_new(schema.clone(), vec![]).unwrap(); + let stream = recordbatches.as_stream(); + + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "value", + ConcreteDataType::int32_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_nanosecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }); + + let scanner = Box::new(SinglePartitionScanner::new( + stream, + false, + Arc::new(builder.build().unwrap()), + None, + )); + Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap()) } } diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 19a114c8ce..933f0e47cf 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -56,7 +56,7 @@ use snafu::location; use store_api::region_engine::PartitionRange; use crate::error::Result; -use crate::window_sort::check_partition_range_monotonicity; +use crate::window_sort::{check_partition_range_monotonicity, project_partition_range_for_sort}; use crate::{array_iter_helper, downcast_ts_array}; /// Get the primary end of a `PartitionRange` based on sort direction. @@ -473,6 +473,7 @@ impl PartSortStream { snafu::location!() )? }; + let cur_range = project_partition_range_for_sort(cur_range, sort_column.data_type())?; downcast_ts_array!( sort_column.data_type() => (array_check_helper, sort_column, cur_range, min_max_idx), @@ -506,7 +507,10 @@ impl PartSortStream { snafu::location!() )?; } - let cur_range = self.partition_ranges[self.cur_part_idx]; + let cur_range = project_partition_range_for_sort( + self.partition_ranges[self.cur_part_idx], + sort_column.data_type(), + )?; let sort_column_iter = downcast_ts_array!( sort_column.data_type() => (array_iter_helper, sort_column), diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index 83feee6f30..7267de3dab 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -30,6 +30,7 @@ use common_error::status_code::StatusCode; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; use common_telemetry::error; use common_time::Timestamp; +use common_time::timestamp::TimeUnit as TimestampUnit; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool}; use datafusion::execution::{RecordBatchStream, TaskContext}; use datafusion::physical_plan::memory::MemoryStream; @@ -778,18 +779,11 @@ fn find_slice_from_range( sort_column: &SortColumn, range: &TimeRange, ) -> datafusion_common::Result<(usize, usize)> { - let ty = sort_column.values.data_type(); - let time_unit = if let DataType::Timestamp(unit, _) = ty { - unit - } else { - return Err(DataFusionError::Internal(format!( - "Unsupported sort column type: {}", - sort_column.values.data_type() - ))); - }; + let time_unit = sort_timestamp_unit(sort_column.values.data_type())?; let array = &sort_column.values; let opt = &sort_column.options.unwrap_or_default(); let descending = opt.descending; + let range = convert_time_range_for_sort(range, time_unit)?; let typed_sorted_range = [range.start, range.end] .iter() @@ -836,6 +830,72 @@ fn find_slice_from_range( Ok((start, end - start)) } +fn sort_timestamp_unit(data_type: &DataType) -> datafusion_common::Result { + if let DataType::Timestamp(unit, _) = data_type { + Ok(*unit) + } else { + Err(DataFusionError::Internal(format!( + "Unsupported sort column type: {data_type}" + ))) + } +} + +#[derive(Debug, Clone, Copy)] +enum RangeBoundKind { + InclusiveStart, + ExclusiveEnd, +} + +fn convert_time_range_for_sort( + range: &TimeRange, + time_unit: arrow_schema::TimeUnit, +) -> datafusion_common::Result { + let target_unit = time_unit.into(); + Ok(TimeRange::new( + convert_timestamp_range_bound(range.start, target_unit, RangeBoundKind::InclusiveStart)?, + convert_timestamp_range_bound(range.end, target_unit, RangeBoundKind::ExclusiveEnd)?, + )) +} + +fn convert_timestamp_range_bound( + timestamp: Timestamp, + target_unit: TimestampUnit, + bound_kind: RangeBoundKind, +) -> datafusion_common::Result { + let converted = match bound_kind { + RangeBoundKind::InclusiveStart => timestamp.convert_to(target_unit), + RangeBoundKind::ExclusiveEnd => timestamp.convert_to_ceil(target_unit), + }; + + converted.ok_or_else(|| { + DataFusionError::Internal(format!( + "Failed to convert timestamp from {:?} to {:?}", + timestamp.unit(), + target_unit + )) + }) +} + +pub(crate) fn project_partition_range_for_sort( + range: PartitionRange, + sort_data_type: &DataType, +) -> datafusion_common::Result { + let target_unit = sort_timestamp_unit(sort_data_type)?.into(); + Ok(PartitionRange { + start: convert_timestamp_range_bound( + range.start, + target_unit, + RangeBoundKind::InclusiveStart, + )?, + end: convert_timestamp_range_bound(range.end, target_unit, RangeBoundKind::ExclusiveEnd)?, + ..range + }) +} + +fn discrete_exclusive_end(timestamp: Timestamp) -> Timestamp { + Timestamp::new(timestamp.value() + 1, timestamp.unit()) +} + /// Get an iterator from a primitive array. /// /// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`. @@ -905,7 +965,7 @@ impl SucRun { let end = self .first_val .max(self.last_val) - .map(|i| Timestamp::new(i.value() + 1, i.unit())); + .map(discrete_exclusive_end); start.zip(end).map(|(s, e)| TimeRange::new(s, e)) } } @@ -1498,6 +1558,49 @@ mod test { } } + #[test] + fn test_project_partition_range_for_sort_uses_ceil_on_exclusive_end() { + let range = PartitionRange { + start: Timestamp::new_nanosecond(1_000_000), + end: Timestamp::new_nanosecond(1_000_001), + num_rows: 1, + identifier: 0, + }; + + let projected = project_partition_range_for_sort( + range, + &DataType::Timestamp(TimeUnit::Millisecond, None), + ) + .unwrap(); + + assert_eq!(Timestamp::new_millisecond(1), projected.start); + assert_eq!(Timestamp::new_millisecond(2), projected.end); + } + + #[test] + fn test_find_slice_from_range_preserves_last_row_after_precision_drop() { + let sort_column = SortColumn { + values: Arc::new(TimestampMillisecondArray::from_iter_values([1])) as ArrayRef, + options: Some(SortOptions::default()), + }; + let range = TimeRange::new( + Timestamp::new_nanosecond(1_000_000), + Timestamp::new_nanosecond(1_000_001), + ); + + assert_eq!((0, 1), find_slice_from_range(&sort_column, &range).unwrap()); + } + + #[test] + fn test_discrete_exclusive_end_creates_half_open_upper_bound() { + let timestamp = Timestamp::new_millisecond(42); + + assert_eq!( + Timestamp::new_millisecond(43), + discrete_exclusive_end(timestamp) + ); + } + #[allow(clippy::type_complexity)] fn run_compute_working_ranges_test( testcases: Vec<( diff --git a/tests/cases/distributed/optimizer/windowed_sort_advance.result b/tests/cases/distributed/optimizer/windowed_sort_advance.result new file mode 100644 index 0000000000..e3c92a0a08 --- /dev/null +++ b/tests/cases/distributed/optimizer/windowed_sort_advance.result @@ -0,0 +1,118 @@ +create table `a` (`value` double, `status` bigint, ts timestamp(9) time index); + +Affected Rows: 0 + +INSERT INTO `a` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.000000000+08:00'), +(46.84, 200, '2026-03-12T08:00:15.000000000+08:00'), +(46.85, 200, '2026-03-12T08:00:25.000000000+08:00'), +(46.86, 200, '2026-03-12T08:00:35.000000000+08:00'), +(46.88, 200, '2026-03-12T08:00:45.000000000+08:00'), +(46.89, 200, '2026-03-12T08:00:55.000000000+08:00'), +(46.91, 200, '2026-03-12T08:01:05.000000000+08:00'), +(46.90, 200, '2026-03-12T08:01:15.000000000+08:00'), +(46.87, 200, '2026-03-12T08:01:25.000000000+08:00'), +(46.85, 200, '2026-03-12T08:01:35.000000000+08:00'); + +Affected Rows: 10 + +select ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=ts@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[ts@2 as ts, status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_ProjectionExec: expr=[to_timestamp_millis(a.ts)@0 as ts, status@1 as status, value@2 as value] REDACTED +|_|_|_SortPreservingMergeExec: [to_timestamp_millis(a.ts)@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=to_timestamp_millis(a.ts)@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[to_timestamp_millis(ts@2) as to_timestamp_millis(a.ts), status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +DROP TABLE `a`; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/optimizer/windowed_sort_advance.sql b/tests/cases/distributed/optimizer/windowed_sort_advance.sql new file mode 100644 index 0000000000..fa73b97933 --- /dev/null +++ b/tests/cases/distributed/optimizer/windowed_sort_advance.sql @@ -0,0 +1,37 @@ +create table `a` (`value` double, `status` bigint, ts timestamp(9) time index); + +INSERT INTO `a` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.000000000+08:00'), +(46.84, 200, '2026-03-12T08:00:15.000000000+08:00'), +(46.85, 200, '2026-03-12T08:00:25.000000000+08:00'), +(46.86, 200, '2026-03-12T08:00:35.000000000+08:00'), +(46.88, 200, '2026-03-12T08:00:45.000000000+08:00'), +(46.89, 200, '2026-03-12T08:00:55.000000000+08:00'), +(46.91, 200, '2026-03-12T08:01:05.000000000+08:00'), +(46.90, 200, '2026-03-12T08:01:15.000000000+08:00'), +(46.87, 200, '2026-03-12T08:01:25.000000000+08:00'), +(46.85, 200, '2026-03-12T08:01:35.000000000+08:00'); + +select ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +DROP TABLE `a`; diff --git a/tests/cases/distributed/optimizer/windowed_sort_nanos.result b/tests/cases/distributed/optimizer/windowed_sort_nanos.result new file mode 100644 index 0000000000..2310ec8f28 --- /dev/null +++ b/tests/cases/distributed/optimizer/windowed_sort_nanos.result @@ -0,0 +1,79 @@ +create table `a_ms` (`value` double, `status` bigint, ts timestamp(3) time index); + +Affected Rows: 0 + +INSERT INTO `a_ms` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.123+08:00'), +(46.84, 200, '2026-03-12T08:00:15.234+08:00'), +(46.85, 200, '2026-03-12T08:00:25.345+08:00'), +(46.86, 200, '2026-03-12T08:00:35.456+08:00'), +(46.88, 200, '2026-03-12T08:00:45.567+08:00'), +(46.89, 200, '2026-03-12T08:00:55.678+08:00'), +(46.91, 200, '2026-03-12T08:01:05.789+08:00'), +(46.90, 200, '2026-03-12T08:01:15.890+08:00'), +(46.87, 200, '2026-03-12T08:01:25.901+08:00'), +(46.85, 200, '2026-03-12T08:01:35.999+08:00'); + +Affected Rows: 10 + +select ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + ++-------------------------+--------+-------+ +| ts | status | value | ++-------------------------+--------+-------+ +| 2026-03-12T00:00:05.123 | 200 | 46.82 | +| 2026-03-12T00:00:15.234 | 200 | 46.84 | +| 2026-03-12T00:00:25.345 | 200 | 46.85 | +| 2026-03-12T00:00:35.456 | 200 | 46.86 | +| 2026-03-12T00:00:45.567 | 200 | 46.88 | +| 2026-03-12T00:00:55.678 | 200 | 46.89 | +| 2026-03-12T00:01:05.789 | 200 | 46.91 | +| 2026-03-12T00:01:15.890 | 200 | 46.9 | +| 2026-03-12T00:01:25.901 | 200 | 46.87 | +| 2026-03-12T00:01:35.999 | 200 | 46.85 | ++-------------------------+--------+-------+ + +select to_timestamp_nanos(ts) as ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + ++-------------------------+--------+-------+ +| ts | status | value | ++-------------------------+--------+-------+ +| 2026-03-12T00:00:05.123 | 200 | 46.82 | +| 2026-03-12T00:00:15.234 | 200 | 46.84 | +| 2026-03-12T00:00:25.345 | 200 | 46.85 | +| 2026-03-12T00:00:35.456 | 200 | 46.86 | +| 2026-03-12T00:00:45.567 | 200 | 46.88 | +| 2026-03-12T00:00:55.678 | 200 | 46.89 | +| 2026-03-12T00:01:05.789 | 200 | 46.91 | +| 2026-03-12T00:01:15.890 | 200 | 46.9 | +| 2026-03-12T00:01:25.901 | 200 | 46.87 | +| 2026-03-12T00:01:35.999 | 200 | 46.85 | ++-------------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_nanos(ts) as ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_ProjectionExec: expr=[to_timestamp_nanos(a_ms.ts)@0 as ts, status@1 as status, value@2 as value] REDACTED +|_|_|_SortPreservingMergeExec: [to_timestamp_nanos(a_ms.ts)@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=to_timestamp_nanos(a_ms.ts)@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[to_timestamp_nanos(ts@2) as to_timestamp_nanos(a_ms.ts), status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +DROP TABLE `a_ms`; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/optimizer/windowed_sort_nanos.sql b/tests/cases/distributed/optimizer/windowed_sort_nanos.sql new file mode 100644 index 0000000000..6a6cf51c4b --- /dev/null +++ b/tests/cases/distributed/optimizer/windowed_sort_nanos.sql @@ -0,0 +1,27 @@ +create table `a_ms` (`value` double, `status` bigint, ts timestamp(3) time index); + +INSERT INTO `a_ms` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.123+08:00'), +(46.84, 200, '2026-03-12T08:00:15.234+08:00'), +(46.85, 200, '2026-03-12T08:00:25.345+08:00'), +(46.86, 200, '2026-03-12T08:00:35.456+08:00'), +(46.88, 200, '2026-03-12T08:00:45.567+08:00'), +(46.89, 200, '2026-03-12T08:00:55.678+08:00'), +(46.91, 200, '2026-03-12T08:01:05.789+08:00'), +(46.90, 200, '2026-03-12T08:01:15.890+08:00'), +(46.87, 200, '2026-03-12T08:01:25.901+08:00'), +(46.85, 200, '2026-03-12T08:01:35.999+08:00'); + +select ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + +select to_timestamp_nanos(ts) as ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_nanos(ts) as ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + +DROP TABLE `a_ms`; diff --git a/tests/cases/standalone/optimizer/windowed_sort_advance.result b/tests/cases/standalone/optimizer/windowed_sort_advance.result new file mode 100644 index 0000000000..c0be1bece9 --- /dev/null +++ b/tests/cases/standalone/optimizer/windowed_sort_advance.result @@ -0,0 +1,117 @@ +create table `a` (`value` double, `status` bigint, ts timestamp(9) time index); + +Affected Rows: 0 + +INSERT INTO `a` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.000000000+08:00'), +(46.84, 200, '2026-03-12T08:00:15.000000000+08:00'), +(46.85, 200, '2026-03-12T08:00:25.000000000+08:00'), +(46.86, 200, '2026-03-12T08:00:35.000000000+08:00'), +(46.88, 200, '2026-03-12T08:00:45.000000000+08:00'), +(46.89, 200, '2026-03-12T08:00:55.000000000+08:00'), +(46.91, 200, '2026-03-12T08:01:05.000000000+08:00'), +(46.90, 200, '2026-03-12T08:01:15.000000000+08:00'), +(46.87, 200, '2026-03-12T08:01:25.000000000+08:00'), +(46.85, 200, '2026-03-12T08:01:35.000000000+08:00'); + +Affected Rows: 10 + +select ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=ts@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[ts@2 as ts, status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++---------------------+--------+-------+ +| ts | status | value | ++---------------------+--------+-------+ +| 2026-03-12T00:00:05 | 200 | 46.82 | +| 2026-03-12T00:00:15 | 200 | 46.84 | +| 2026-03-12T00:00:25 | 200 | 46.85 | +| 2026-03-12T00:00:35 | 200 | 46.86 | +| 2026-03-12T00:00:45 | 200 | 46.88 | +| 2026-03-12T00:00:55 | 200 | 46.89 | +| 2026-03-12T00:01:05 | 200 | 46.91 | +| 2026-03-12T00:01:15 | 200 | 46.9 | +| 2026-03-12T00:01:25 | 200 | 46.87 | +| 2026-03-12T00:01:35 | 200 | 46.85 | ++---------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=ts@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[to_timestamp_millis(ts@2) as ts, status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +DROP TABLE `a`; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/optimizer/windowed_sort_advance.sql b/tests/cases/standalone/optimizer/windowed_sort_advance.sql new file mode 100644 index 0000000000..fa73b97933 --- /dev/null +++ b/tests/cases/standalone/optimizer/windowed_sort_advance.sql @@ -0,0 +1,37 @@ +create table `a` (`value` double, `status` bigint, ts timestamp(9) time index); + +INSERT INTO `a` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.000000000+08:00'), +(46.84, 200, '2026-03-12T08:00:15.000000000+08:00'), +(46.85, 200, '2026-03-12T08:00:25.000000000+08:00'), +(46.86, 200, '2026-03-12T08:00:35.000000000+08:00'), +(46.88, 200, '2026-03-12T08:00:45.000000000+08:00'), +(46.89, 200, '2026-03-12T08:00:55.000000000+08:00'), +(46.91, 200, '2026-03-12T08:01:05.000000000+08:00'), +(46.90, 200, '2026-03-12T08:01:15.000000000+08:00'), +(46.87, 200, '2026-03-12T08:01:25.000000000+08:00'), +(46.85, 200, '2026-03-12T08:01:35.000000000+08:00'); + +select ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select ts as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_millis(ts) as ts, status, value from `a` where ts >= '2026-03-12T08:00:00+08:00' and ts < '2026-03-12T08:02:01+08:00' order by ts asc; + +DROP TABLE `a`; diff --git a/tests/cases/standalone/optimizer/windowed_sort_nanos.result b/tests/cases/standalone/optimizer/windowed_sort_nanos.result new file mode 100644 index 0000000000..f673636307 --- /dev/null +++ b/tests/cases/standalone/optimizer/windowed_sort_nanos.result @@ -0,0 +1,78 @@ +create table `a_ms` (`value` double, `status` bigint, ts timestamp(3) time index); + +Affected Rows: 0 + +INSERT INTO `a_ms` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.123+08:00'), +(46.84, 200, '2026-03-12T08:00:15.234+08:00'), +(46.85, 200, '2026-03-12T08:00:25.345+08:00'), +(46.86, 200, '2026-03-12T08:00:35.456+08:00'), +(46.88, 200, '2026-03-12T08:00:45.567+08:00'), +(46.89, 200, '2026-03-12T08:00:55.678+08:00'), +(46.91, 200, '2026-03-12T08:01:05.789+08:00'), +(46.90, 200, '2026-03-12T08:01:15.890+08:00'), +(46.87, 200, '2026-03-12T08:01:25.901+08:00'), +(46.85, 200, '2026-03-12T08:01:35.999+08:00'); + +Affected Rows: 10 + +select ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + ++-------------------------+--------+-------+ +| ts | status | value | ++-------------------------+--------+-------+ +| 2026-03-12T00:00:05.123 | 200 | 46.82 | +| 2026-03-12T00:00:15.234 | 200 | 46.84 | +| 2026-03-12T00:00:25.345 | 200 | 46.85 | +| 2026-03-12T00:00:35.456 | 200 | 46.86 | +| 2026-03-12T00:00:45.567 | 200 | 46.88 | +| 2026-03-12T00:00:55.678 | 200 | 46.89 | +| 2026-03-12T00:01:05.789 | 200 | 46.91 | +| 2026-03-12T00:01:15.890 | 200 | 46.9 | +| 2026-03-12T00:01:25.901 | 200 | 46.87 | +| 2026-03-12T00:01:35.999 | 200 | 46.85 | ++-------------------------+--------+-------+ + +select to_timestamp_nanos(ts) as ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + ++-------------------------+--------+-------+ +| ts | status | value | ++-------------------------+--------+-------+ +| 2026-03-12T00:00:05.123 | 200 | 46.82 | +| 2026-03-12T00:00:15.234 | 200 | 46.84 | +| 2026-03-12T00:00:25.345 | 200 | 46.85 | +| 2026-03-12T00:00:35.456 | 200 | 46.86 | +| 2026-03-12T00:00:45.567 | 200 | 46.88 | +| 2026-03-12T00:00:55.678 | 200 | 46.89 | +| 2026-03-12T00:01:05.789 | 200 | 46.91 | +| 2026-03-12T00:01:15.890 | 200 | 46.9 | +| 2026-03-12T00:01:25.901 | 200 | 46.87 | +| 2026-03-12T00:01:35.999 | 200 | 46.85 | ++-------------------------+--------+-------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_nanos(ts) as ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec: expr=ts@0 ASC NULLS LAST num_ranges=REDACTED REDACTED +|_|_|_ProjectionExec: expr=[to_timestamp_nanos(ts@2) as ts, status@1 as status, value@0 as value] REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 10_| ++-+-+-+ + +DROP TABLE `a_ms`; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/optimizer/windowed_sort_nanos.sql b/tests/cases/standalone/optimizer/windowed_sort_nanos.sql new file mode 100644 index 0000000000..6a6cf51c4b --- /dev/null +++ b/tests/cases/standalone/optimizer/windowed_sort_nanos.sql @@ -0,0 +1,27 @@ +create table `a_ms` (`value` double, `status` bigint, ts timestamp(3) time index); + +INSERT INTO `a_ms` (`value`, `status`, `ts`) VALUES +(46.82, 200, '2026-03-12T08:00:05.123+08:00'), +(46.84, 200, '2026-03-12T08:00:15.234+08:00'), +(46.85, 200, '2026-03-12T08:00:25.345+08:00'), +(46.86, 200, '2026-03-12T08:00:35.456+08:00'), +(46.88, 200, '2026-03-12T08:00:45.567+08:00'), +(46.89, 200, '2026-03-12T08:00:55.678+08:00'), +(46.91, 200, '2026-03-12T08:01:05.789+08:00'), +(46.90, 200, '2026-03-12T08:01:15.890+08:00'), +(46.87, 200, '2026-03-12T08:01:25.901+08:00'), +(46.85, 200, '2026-03-12T08:01:35.999+08:00'); + +select ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + +select to_timestamp_nanos(ts) as ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +-- SQLNESS REPLACE num_ranges=\d+ num_ranges=REDACTED +EXPLAIN ANALYZE select to_timestamp_nanos(ts) as ts, status, value from `a_ms` where ts >= '2026-03-12T08:00:00.000+08:00' and ts < '2026-03-12T08:02:01.000+08:00' order by ts asc; + +DROP TABLE `a_ms`;