From ab426cbf897f7c8291f5319f32b349f89a8cdb1d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 10 Dec 2025 18:11:19 +0800 Subject: [PATCH] refactor: remove duplication coverage and code from window sort tests (#7384) * refactor: remove duplication coverage and code from window sort tests Signed-off-by: Ruihang Xia * allow clippy Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/query/src/window_sort.rs | 901 +++++++++++------------------------ 1 file changed, 267 insertions(+), 634 deletions(-) diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index 47ee8be75a..e497656796 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -1259,6 +1259,41 @@ mod test { use super::*; use crate::test_util::{MockInputExec, new_ts_array}; + // Test helpers to reduce duplication + mod helpers { + use datafusion::physical_plan::expressions::Column; + + use super::*; + + pub fn default_sort_opts(descending: bool) -> SortOptions { + SortOptions { + descending, + nulls_first: true, + } + } + + pub fn ts_field(unit: TimeUnit) -> Field { + Field::new("ts", DataType::Timestamp(unit, None), false) + } + + pub fn ts_column() -> Column { + Column::new("ts", 0) + } + + pub fn partition_range(start: i64, end: i64, num_rows: usize, id: usize) -> PartitionRange { + PartitionRange { + start: Timestamp::new_millisecond(start), + end: Timestamp::new_millisecond(end), + num_rows, + identifier: id, + } + } + + pub fn ts_array(values: impl IntoIterator) -> ArrayRef { + Arc::new(TimestampMillisecondArray::from_iter_values(values)) + } + } + #[test] fn test_overlapping() { let testcases = [ @@ -1455,8 +1490,32 @@ mod test { } } + #[allow(clippy::type_complexity)] + fn run_compute_working_ranges_test( + testcases: Vec<( + BTreeMap<(Timestamp, Timestamp), Vec>, + Vec<((Timestamp, Timestamp), BTreeSet)>, + )>, + descending: bool, + ) { + for (input, expected) in testcases { + let expected = expected + .into_iter() + .map(|(r, s)| (r.into(), s)) + .collect_vec(); + let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect(); + assert_eq!( + compute_all_working_ranges(&input, descending), + expected, + "input: {:?}, descending: {}", + input, + descending + ); + } + } + #[test] - fn test_compute_working_ranges_rev() { + fn test_compute_working_ranges_descending() { let testcases = vec![ ( BTreeMap::from([( @@ -1655,23 +1714,11 @@ mod test { ), ]; - for (input, expected) in testcases { - let expected = expected - .into_iter() - .map(|(r, s)| (r.into(), s)) - .collect_vec(); - let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect(); - assert_eq!( - compute_all_working_ranges(&input, true), - expected, - "input: {:?}", - input - ); - } + run_compute_working_ranges_test(testcases, true); } #[test] - fn test_compute_working_ranges() { + fn test_compute_working_ranges_ascending() { let testcases = vec![ ( BTreeMap::from([( @@ -1871,19 +1918,7 @@ mod test { ), ]; - for (input, expected) in testcases { - let expected = expected - .into_iter() - .map(|(r, s)| (r.into(), s)) - .collect_vec(); - let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect(); - assert_eq!( - compute_all_working_ranges(&input, false), - expected, - "input: {:?}", - input - ); - } + run_compute_working_ranges_test(testcases, false); } #[test] @@ -2174,6 +2209,7 @@ mod test { #[test] fn test_cmp_with_opts() { let testcases = vec![ + // Test ascending vs descending for Some values ( Some(1), Some(2), @@ -2192,6 +2228,7 @@ mod test { }), std::cmp::Ordering::Greater, ), + // Test Some vs None with nulls_first ( Some(1), None, @@ -2210,6 +2247,7 @@ mod test { }), std::cmp::Ordering::Greater, ), + // Test Some vs None with nulls_last ( Some(1), None, @@ -2228,15 +2266,7 @@ mod test { }), std::cmp::Ordering::Less, ), - ( - None, - None, - Some(SortOptions { - descending: true, - nulls_first: true, - }), - std::cmp::Ordering::Equal, - ), + // Test None vs None - always Equal regardless of sort options ( None, None, @@ -2246,24 +2276,6 @@ mod test { }), std::cmp::Ordering::Equal, ), - ( - None, - None, - Some(SortOptions { - descending: true, - nulls_first: false, - }), - std::cmp::Ordering::Equal, - ), - ( - None, - None, - Some(SortOptions { - descending: false, - nulls_first: false, - }), - std::cmp::Ordering::Equal, - ), ]; for (a, b, opts, expected) in testcases { assert_eq!( @@ -2492,21 +2504,20 @@ mod test { output: Vec, schema: SchemaRef, } - use datafusion::physical_plan::expressions::Column; + impl TestStream { fn new( - ts_col: Column, opt: SortOptions, fetch: Option, - schema: impl Into, + unit: TimeUnit, input: Vec<(PartitionRange, Vec)>, expected: Vec>, ) -> Self { let expression = PhysicalSortExpr { - expr: Arc::new(ts_col), + expr: Arc::new(helpers::ts_column()), options: opt, }; - let schema = Schema::new(schema.into()); + let schema = Schema::new(vec![helpers::ts_field(unit)]); let schema = Arc::new(schema); let input = input .into_iter() @@ -2525,6 +2536,21 @@ mod test { } } + fn new_simple( + descending: bool, + fetch: Option, + input: Vec<(PartitionRange, Vec)>, + expected: Vec>, + ) -> Self { + Self::new( + helpers::default_sort_opts(descending), + fetch, + TimeUnit::Millisecond, + input, + expected, + ) + } + async fn run_test(&self) -> Vec { let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip(); @@ -2547,607 +2573,218 @@ mod test { } #[tokio::test] - async fn test_window_sort_stream() { + async fn test_window_sort_empty_and_minimal() { + use helpers::*; let test_cases = [ - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, + // Empty input + TestStream::new_simple(false, None, vec![], vec![]), + // One empty batch, one with data + TestStream::new_simple( + false, None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], - vec![], - vec![], - ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, - None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], vec![ - // test one empty - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(2), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))], - ), + (partition_range(1, 2, 1, 0), vec![ts_array([])]), + (partition_range(1, 3, 1, 0), vec![ts_array([2])]), ], - vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values( - [2], - ))]], + vec![vec![ts_array([2])]], ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, + // Both batches empty + TestStream::new_simple( + false, None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], vec![ - // test one empty - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(2), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))], - ), + (partition_range(1, 2, 1, 0), vec![ts_array([])]), + (partition_range(1, 3, 1, 0), vec![ts_array([])]), ], vec![], ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, + // Indistinguishable boundary case - value at exact boundary + TestStream::new_simple( + false, None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], vec![ - // test indistinguishable case - // we can't know whether `2` belong to which range - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(2), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))], - ), - ], - vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))], + (partition_range(1, 2, 1, 0), vec![ts_array([1])]), + (partition_range(1, 3, 1, 0), vec![ts_array([2])]), ], + vec![vec![ts_array([1])], vec![ts_array([2])]], ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, + ]; + + for (idx, testcase) in test_cases.iter().enumerate() { + let output = testcase.run_test().await; + assert_eq!(output, testcase.output, "empty/minimal case {idx} failed"); + } + } + + #[tokio::test] + async fn test_window_sort_overlapping() { + use helpers::*; + let test_cases = [ + // Direct emit - overlapping ranges without merge + TestStream::new_simple( + false, None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], vec![ - // test direct emit - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(4), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 2, 3, - ]))], - ), + (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]), + (partition_range(1, 4, 1, 0), vec![ts_array([2, 3])]), ], vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, - ]))], - // didn't trigger a merge sort/concat here so this is it - vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))], + vec![ts_array([1, 2])], + vec![ts_array([2])], + vec![ts_array([3])], ], ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, + // Cross working range batch intersection - triggers merge + TestStream::new_simple( + false, None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], vec![ - // test more of cross working range batch intersection - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(4), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, 3, - ]))], - ), - ], - vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 1, 2, 2, - ]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))], + (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]), + (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]), ], + vec![vec![ts_array([1, 1, 2, 2])], vec![ts_array([3])]], ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, + // No overlap case - separate ranges + TestStream::new_simple( + false, None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], vec![ - // no overlap, empty intersection batch case - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(4), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, 3, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(4), - end: Timestamp::new_millisecond(6), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 4, 5, - ]))], - ), + (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]), + (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]), + (partition_range(4, 6, 1, 1), vec![ts_array([4, 5])]), ], vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 1, 2, 2, - ]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 4, 5, - ]))], - ], - ), - // test fetch - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, - Some(6), - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], - vec![ - // no overlap, empty intersection batch case - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(4), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, 3, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(3), - end: Timestamp::new_millisecond(6), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 4, 5, - ]))], - ), - ], - vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 1, 2, 2, - ]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))], - ], - ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, - Some(3), - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], - vec![ - // no overlap, empty intersection batch case - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(4), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, 3, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(3), - end: Timestamp::new_millisecond(6), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 4, 5, - ]))], - ), - ], - vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values( - [1, 1, 2], - ))]], - ), - // rev case - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: true, - nulls_first: true, - }, - None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], - vec![ - // reverse order - ( - PartitionRange { - start: Timestamp::new_millisecond(3), - end: Timestamp::new_millisecond(6), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 5, 4, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(4), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 3, 2, 1, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 2, 1, - ]))], - ), - ], - vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 5, 4, - ]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 2, 2, 1, 1, - ]))], - ], - ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, - None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], - vec![ - // long have subset short run case - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(10), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 5, 9, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(3), - end: Timestamp::new_millisecond(7), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 3, 4, 5, 6, - ]))], - ), - ], - vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 3, 4, 5, 5, 6, 9, - ]))], - ], - ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, - None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], - vec![ - // complex overlap - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(3), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(10), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 3, 4, 5, 6, 8, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(7), - end: Timestamp::new_millisecond(10), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 7, 8, 9, - ]))], - ), - ], - vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 1, 2, - ]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 3, 4, 5, 6, - ]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 7, 8, 8, 9, - ]))], - ], - ), - TestStream::new( - Column::new("ts", 0), - SortOptions { - descending: false, - nulls_first: true, - }, - None, - vec![Field::new( - "ts", - DataType::Timestamp(TimeUnit::Millisecond, None), - false, - )], - vec![ - // complex subset with having same datapoint - ( - PartitionRange { - start: Timestamp::new_millisecond(1), - end: Timestamp::new_millisecond(11), - num_rows: 1, - identifier: 0, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, - ]))], - ), - ( - PartitionRange { - start: Timestamp::new_millisecond(5), - end: Timestamp::new_millisecond(7), - num_rows: 1, - identifier: 1, - }, - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 5, 6, - ]))], - ), - ], - vec![ - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 1, 2, 3, 4, - ]))], - vec![Arc::new(TimestampMillisecondArray::from_iter_values([ - 5, 5, 6, 6, 7, 8, 9, 10, - ]))], + vec![ts_array([1, 1, 2, 2])], + vec![ts_array([3])], + vec![ts_array([4, 5])], ], ), ]; - let indexed_test_cases = test_cases.iter().enumerate().collect_vec(); - - for (idx, testcase) in &indexed_test_cases { + for (idx, testcase) in test_cases.iter().enumerate() { let output = testcase.run_test().await; - assert_eq!(output, testcase.output, "case {idx} failed."); + assert_eq!(output, testcase.output, "overlapping case {idx} failed"); + } + } + + #[tokio::test] + async fn test_window_sort_with_fetch() { + use helpers::*; + let test_cases = [ + // Fetch limit stops at 6 rows + TestStream::new_simple( + false, + Some(6), + vec![ + (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]), + (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]), + (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]), + ], + vec![ + vec![ts_array([1, 1, 2, 2])], + vec![ts_array([3])], + vec![ts_array([4])], + ], + ), + // Fetch limit stops at 3 rows + TestStream::new_simple( + false, + Some(3), + vec![ + (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]), + (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]), + (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]), + ], + vec![vec![ts_array([1, 1, 2])]], + ), + ]; + + for (idx, testcase) in test_cases.iter().enumerate() { + let output = testcase.run_test().await; + assert_eq!(output, testcase.output, "fetch case {idx} failed"); + } + } + + #[tokio::test] + async fn test_window_sort_descending() { + use helpers::*; + let test_cases = [ + // Descending order sort + TestStream::new_simple( + true, + None, + vec![ + (partition_range(3, 6, 1, 1), vec![ts_array([5, 4])]), + (partition_range(1, 4, 1, 1), vec![ts_array([3, 2, 1])]), + (partition_range(1, 3, 1, 0), vec![ts_array([2, 1])]), + ], + vec![ + vec![ts_array([5, 4])], + vec![ts_array([3])], + vec![ts_array([2, 2, 1, 1])], + ], + ), + ]; + + for (idx, testcase) in test_cases.iter().enumerate() { + let output = testcase.run_test().await; + assert_eq!(output, testcase.output, "descending case {idx} failed"); + } + } + + #[tokio::test] + async fn test_window_sort_complex() { + use helpers::*; + let test_cases = [ + // Long range with subset short run + TestStream::new_simple( + false, + None, + vec![ + (partition_range(1, 10, 1, 0), vec![ts_array([1, 5, 9])]), + (partition_range(3, 7, 1, 1), vec![ts_array([3, 4, 5, 6])]), + ], + vec![vec![ts_array([1])], vec![ts_array([3, 4, 5, 5, 6, 9])]], + ), + // Complex multi-range overlap + TestStream::new_simple( + false, + None, + vec![ + (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]), + ( + partition_range(1, 10, 1, 1), + vec![ts_array([1, 3, 4, 5, 6, 8])], + ), + (partition_range(7, 10, 1, 1), vec![ts_array([7, 8, 9])]), + ], + vec![ + vec![ts_array([1, 1, 2])], + vec![ts_array([3, 4, 5, 6])], + vec![ts_array([7, 8, 8, 9])], + ], + ), + // Subset with duplicate datapoints + TestStream::new_simple( + false, + None, + vec![ + ( + partition_range(1, 11, 1, 0), + vec![ts_array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])], + ), + (partition_range(5, 7, 1, 1), vec![ts_array([5, 6])]), + ], + vec![ + vec![ts_array([1, 2, 3, 4])], + vec![ts_array([5, 5, 6, 6, 7, 8, 9, 10])], + ], + ), + ]; + + for (idx, testcase) in test_cases.iter().enumerate() { + let output = testcase.run_test().await; + assert_eq!(output, testcase.output, "complex case {idx} failed"); } } @@ -3234,13 +2871,9 @@ mod test { let output_arr = new_ts_array(unit, output_data); let test_stream = TestStream::new( - Column::new("ts", 0), - SortOptions { - descending, - nulls_first: true, - }, + helpers::default_sort_opts(descending), fetch, - vec![Field::new("ts", DataType::Timestamp(unit, None), false)], + unit, input_ranged_data.clone(), vec![vec![output_arr]], );