diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index e89f801a6a..9dc0a491e2 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -1340,72 +1340,107 @@ mod test { } } + /// Comprehensive test case structure that encapsulates all test parameters + #[derive(Debug, Clone)] + struct TestCase { + #[allow(dead_code)] + name: &'static str, + unit: TimeUnit, + input_ranges: Vec<((i64, i64), Vec>)>, // (start, end) -> data batches + descending: bool, + nulls_first: bool, + limit: Option, + expected_output: Vec>, + expected_polled_rows: Option, + #[allow(dead_code)] + nullable: bool, + } + + /// Comprehensive test suite covering all PartSort functionality #[tokio::test] - async fn simple_cases() { - let testcases = vec![ - ( - TimeUnit::Millisecond, - vec![ + async fn comprehensive_part_sort_tests() { + let test_cases = vec![ + // ===== Basic Cases ===== + // Test basic ascending sort with overlapping ranges + TestCase { + name: "basic_ascending_overlapping_ranges", + unit: TimeUnit::Millisecond, + input_ranges: vec![ ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]), ((5, 10), vec![vec![5, 6], vec![7, 8]]), ], - false, - None, - vec![vec![1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]], - ), - // Case 1: Descending sort with overlapping ranges that have the same primary end (end=10). - // Ranges [5,10) and [0,10) are grouped together, so their data is merged before sorting. - ( - TimeUnit::Millisecond, - vec![ + descending: false, + nulls_first: false, + limit: None, + expected_output: vec![vec![1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]], + expected_polled_rows: None, + nullable: false, + }, + // Test descending sort with overlapping ranges that have same primary end + TestCase { + name: "descending_overlapping_same_primary_end", + unit: TimeUnit::Millisecond, + input_ranges: vec![ ((5, 10), vec![vec![5, 6], vec![7, 8, 9]]), ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), ], - true, - None, - vec![vec![9, 8, 8, 7, 7, 6, 6, 5, 5, 4, 3, 2, 1]], - ), - ( - TimeUnit::Millisecond, - vec![ + descending: true, + nulls_first: false, + limit: None, + expected_output: vec![vec![9, 8, 8, 7, 7, 6, 6, 5, 5, 4, 3, 2, 1]], + expected_polled_rows: None, + nullable: false, + }, + TestCase { + name: "empty_first_range", + unit: TimeUnit::Millisecond, + input_ranges: vec![ ((5, 10), vec![]), ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), ], - true, - None, - vec![vec![8, 7, 6, 5, 4, 3, 2, 1]], - ), - ( - TimeUnit::Millisecond, - vec![ + descending: true, + nulls_first: false, + limit: None, + expected_output: vec![vec![8, 7, 6, 5, 4, 3, 2, 1]], + expected_polled_rows: None, + nullable: false, + }, + TestCase { + name: "multiple_ranges_with_empty_middle", + unit: TimeUnit::Millisecond, + input_ranges: vec![ ((15, 20), vec![vec![17, 18, 19]]), ((10, 15), vec![]), ((5, 10), vec![]), ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), ], - true, - None, - vec![vec![19, 18, 17], vec![8, 7, 6, 5, 4, 3, 2, 1]], - ), - ( - TimeUnit::Millisecond, - vec![ + descending: true, + nulls_first: false, + limit: None, + expected_output: vec![vec![19, 18, 17], vec![8, 7, 6, 5, 4, 3, 2, 1]], + expected_polled_rows: None, + nullable: false, + }, + TestCase { + name: "all_empty_ranges", + unit: TimeUnit::Millisecond, + input_ranges: vec![ ((15, 20), vec![]), ((10, 15), vec![]), ((5, 10), vec![]), ((0, 10), vec![]), ], - true, - None, - vec![], - ), - // Case 5: Data from one batch spans multiple ranges. Ranges with same end are grouped. - // Ranges: [15,20) end=20, [10,15) end=15, [5,10) end=10, [0,10) end=10 - // Groups: {[15,20)}, {[10,15)}, {[5,10), [0,10)} - // The last two ranges are merged because they share end=10. - ( - TimeUnit::Millisecond, - vec![ + descending: true, + nulls_first: false, + limit: None, + expected_output: vec![], + expected_polled_rows: None, + nullable: false, + }, + TestCase { + name: "data_spanning_multiple_ranges_with_limit", + unit: TimeUnit::Millisecond, + input_ranges: vec![ ( (15, 20), vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]], @@ -1414,90 +1449,654 @@ mod test { ((5, 10), vec![]), ((0, 10), vec![]), ], - true, - None, + descending: true, + nulls_first: false, + limit: Some(2), + expected_output: vec![vec![19, 17]], + expected_polled_rows: None, + nullable: false, + }, + + // ===== Limit Tests ===== + TestCase { + name: "limit_multiple_batches_single_partition", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]), + ], + descending: true, + nulls_first: false, + limit: Some(3), + expected_output: vec![vec![9, 8, 7]], + expected_polled_rows: None, + nullable: false, + }, + TestCase { + name: "limit_multiple_partitions", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((10, 20), vec![vec![10, 11, 12], vec![13, 14, 15]]), + ((0, 10), vec![vec![1, 2, 3], vec![4, 5]]), + ], + descending: true, + nulls_first: false, + limit: Some(2), + expected_output: vec![vec![15, 14]], + expected_polled_rows: None, + nullable: false, + }, + TestCase { + name: "limit_ascending_sort", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((0, 10), vec![vec![7, 8, 9], vec![4, 5, 6], vec![1, 2, 3]]), + ], + descending: false, + nulls_first: false, + limit: Some(2), + expected_output: vec![vec![1, 2]], + expected_polled_rows: None, + nullable: false, + }, + + // ===== Early Termination Tests ===== + // Test early termination with limit - should stop after processing first partition group + // Input: Three partitions with ranges (20,30), (10,20), (0,10), each with 2 batches of 5 values + // With limit=2 and descending sort, we only need top 2 values [29,28] from the first partition + // expected_polled_rows=10: Only processes first partition (10 rows) before early termination + TestCase { + name: "early_termination_descending", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((20, 30), vec![vec![21, 22, 23, 24, 25], vec![26, 27, 28, 29, 30]]), + ((10, 20), vec![vec![11, 12, 13, 14, 15], vec![16, 17, 18, 19, 20]]), + ((0, 10), vec![vec![1, 2, 3, 4, 5], vec![6, 7, 8, 9, 10]]), + ], + descending: true, + nulls_first: false, + limit: Some(2), + expected_output: vec![vec![29, 28]], + expected_polled_rows: Some(10), + nullable: false, + }, + + // ===== Primary End Grouping Tests ===== + TestCase { + name: "primary_end_grouping_with_limit", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![80, 90, 95]]), + ((50, 100), vec![vec![55, 65, 75, 85, 95]]), + ], + descending: true, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![95, 95, 90, 85]], + expected_polled_rows: None, + nullable: false, + }, + + // ===== Three Ranges Keep Pulling Tests ===== + TestCase { + name: "three_ranges_keep_pulling", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![80, 90, 95]]), + ((50, 100), vec![vec![55, 75, 85]]), + ((40, 95), vec![vec![45, 65, 94]]), + ], + descending: true, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![95, 94, 90, 85]], + expected_polled_rows: None, + nullable: false, + }, + + // ===== Threshold-Based Early Termination Tests ===== + // Test threshold-based early termination - when threshold >= next group's primary end + // Input: Two partitions (70,100) with [94,95,96,97,98,99] and (50,90) with [85,86,87] + // With limit=4, threshold becomes 96 (4th largest value), next_primary_end=90 + // Since 96 >= 90, we can stop early and skip the second partition + // expected_polled_rows=9: Only processes first partition (6 rows) + partial second (3 rows) + TestCase { + name: "threshold_based_early_termination_desc", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![94, 95, 96, 97, 98, 99]]), + ((50, 90), vec![vec![85, 86, 87]]), + ], + descending: true, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![99, 98, 97, 96]], + expected_polled_rows: Some(9), + nullable: false, + }, + // Test continuing when threshold falls within next group's range + // Input: Two partitions (70,100) with [94,95,96,97,98,99] and (50,98) with [55,60,65] + // With limit=4, threshold=96, next_primary_end=98 + // Since 96 < 98, we cannot stop early - next group might have values > 96 + // expected_polled_rows=9: Processes first partition (6) + second partition (3) + TestCase { + name: "continue_when_threshold_in_next_group_range", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![94, 95, 96, 97, 98, 99]]), + ((50, 98), vec![vec![55, 60, 65]]), + ], + descending: true, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![99, 98, 97, 96]], + expected_polled_rows: Some(9), + nullable: false, + }, + // Test ascending threshold early termination + // Input: Three partitions (10,50), (20,60), (60,70) with values [10-15], [25,30,35], [60,61] + // With limit=4, threshold=13 (4th smallest value), next_primary_end=20 + // Since 13 < 20, we need to check next group, but 13 < 25 (smallest in next), we can stop + // expected_polled_rows=11: Processes first partition (6) + second partition (3) + third (2) + TestCase { + name: "ascending_threshold_early_termination", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((10, 50), vec![vec![10, 11, 12, 13, 14, 15]]), + ((20, 60), vec![vec![25, 30, 35]]), + ((60, 70), vec![vec![60, 61]]), + ((80, 90), vec![vec![80, 81]]), + ], + descending: false, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![10, 11, 12, 13]], + expected_polled_rows: Some(11), + nullable: false, + }, + // Test complex ascending threshold early termination case + // Input: Multiple partitions with overlapping ranges and various values + // With limit=4, we need smallest 4 values. After processing (5,25) with [5,6,7,8], + // threshold=8, and remaining partitions have min values > 8, so we can stop early + // expected_polled_rows=11: Processes necessary partitions until threshold condition met + TestCase { + name: "ascending_threshold_early_termination_case_two", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((0, 20), vec![vec![9, 10, 11, 12]]), + ((4, 25), vec![vec![21]]), + ((5, 25), vec![vec![5, 6, 7, 8]]), + ((42, 52), vec![vec![42, 51]]), + ((48, 53), vec![vec![48, 51]]), + ], + descending: false, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![5, 6, 7, 8]], + expected_polled_rows: Some(11), + nullable: false, + }, + + // ===== Null Handling Tests ===== + + // ===== Single Group Tests ===== + // Test early stop within a single group - partitions share same primary end + // Input: Two partitions both ending at 100, first has [94-99], second has [85-87] + // With limit=4, we get [99,98,97,96] from first partition, threshold=96 + // Since both partitions have same primary end (100), we cannot stop early based on range + // expected_polled_rows=9: Processes both partitions completely (6 + 3 rows) + TestCase { + name: "can_early_stop_single_group", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![94, 95, 96, 97, 98, 99]]), + ((50, 100), vec![vec![85, 86, 87]]), + ], + descending: true, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![99, 98, 97, 96]], + expected_polled_rows: Some(9), + nullable: false, + }, + + // ===== Exact Boundary Equality Tests ===== + // Test exact boundary equality for descending sort - threshold equals next primary end + // Input: Two partitions (70,100) with [92,91,90,89] and (50,90) with [88,87,86] + // With limit=3, threshold=90, next_primary_end=90 + // Since threshold == next_primary_end, we can stop early (90 >= 90) + // expected_polled_rows=7: Processes first partition (4) + partial second (3 rows) + TestCase { + name: "exact_boundary_equality_desc", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![92, 91, 90, 89]]), + ((50, 90), vec![vec![88, 87, 86]]), + ], + descending: true, + nulls_first: false, + limit: Some(3), + expected_output: vec![vec![92, 91, 90]], + expected_polled_rows: Some(7), + nullable: false, + }, + // Test exact boundary equality for ascending sort - threshold equals next primary start + // Input: Two partitions (10,50) with [10,15,20,25] and (20,60) with [21,22,23] + // With limit=3, threshold=20, next_primary_start=20 + // Since threshold == next_primary_start, we can stop early (20 <= 20) + // expected_polled_rows=7: Processes first partition (4) + partial second (3 rows) + TestCase { + name: "exact_boundary_equality_asc", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((10, 50), vec![vec![10, 15, 20, 25]]), + ((20, 60), vec![vec![21, 22, 23]]), + ], + descending: false, + nulls_first: false, + limit: Some(3), + expected_output: vec![vec![10, 15, 20]], + expected_polled_rows: Some(7), + nullable: false, + }, + + // ===== Empty Partition Tests ===== + // Test early stop with empty partitions at the start + // Input: Four partitions, first two are empty, third has [74-77], fourth has [58-60] + // With limit=2, we get [77,76] from third partition, threshold=76, next_primary_end=60 + // Since 76 >= 60, we can stop early and skip the fourth partition + // expected_polled_rows=7: Processes empty partitions (0) + third partition (4) + partial fourth (3) + TestCase { + name: "early_stop_with_empty_partitions_at_start", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![]]), + ((50, 100), vec![vec![]]), + ((30, 80), vec![vec![74, 75, 76, 77]]), + ((10, 60), vec![vec![58, 59, 60]]), + ], + descending: true, + nulls_first: false, + limit: Some(2), + expected_output: vec![vec![77, 76]], + expected_polled_rows: Some(7), + nullable: false, + }, + // Test early stop with empty partitions between data partitions + // Input: Four partitions with data in first and last, middle two are empty + // First partition has [96-99], last has [48-50], limit=2 gives [99,98] + // Threshold=98, next_primary_end=50, since 98 >= 50, we can stop early + // expected_polled_rows=7: Processes first partition (4) + empty partitions (0) + partial last (3) + TestCase { + name: "early_stop_with_empty_partitions_between_data", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![96, 97, 98, 99]]), + ((50, 90), vec![vec![]]), + ((30, 70), vec![vec![]]), + ((10, 50), vec![vec![48, 49, 50]]), + ], + descending: true, + nulls_first: false, + limit: Some(2), + expected_output: vec![vec![99, 98]], + expected_polled_rows: Some(7), + nullable: false, + }, + + // ===== Empty Ranges at Beginning Tests ===== + TestCase { + name: "early_stop_empty_ranges_beginning_desc", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![]]), + ((50, 100), vec![vec![]]), + ((60, 80), vec![vec![75, 76, 77, 78]]), + ((40, 70), vec![vec![65, 66, 67]]), + ], + descending: true, + nulls_first: false, + limit: Some(2), + expected_output: vec![vec![78, 77]], + expected_polled_rows: Some(7), + nullable: false, + }, + + // ===== Empty Ranges Threshold Calculation Tests ===== + TestCase { + name: "early_stop_empty_ranges_threshold_calculation", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((70, 100), vec![vec![94, 95, 96, 97, 98]]), + ((50, 100), vec![vec![]]), + ((60, 90), vec![vec![84, 85, 86, 87]]), + ((40, 90), vec![vec![]]), + ((30, 80), vec![vec![75, 76, 77]]), + ], + descending: true, + nulls_first: false, + limit: Some(3), + expected_output: vec![vec![98, 97, 96]], + expected_polled_rows: Some(12), + nullable: false, + }, + + // ===== Consecutive Empty Ranges Tests ===== + TestCase { + name: "consecutive_empty_ranges_early_stop_desc", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((90, 120), vec![vec![]]), + ((80, 110), vec![vec![]]), + ((70, 100), vec![vec![95, 96, 97, 98, 99]]), + ((60, 90), vec![vec![85, 86, 87, 88, 89]]), + ((40, 80), vec![vec![75, 76, 77]]), + ], + descending: true, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![99, 98, 97, 96]], + expected_polled_rows: Some(13), + nullable: false, + }, + + // ===== Empty Ranges Exact Boundary Tests ===== + TestCase { + name: "empty_ranges_exact_boundary_early_stop", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((80, 100), vec![vec![]]), + ((70, 100), vec![vec![96, 97, 98, 99]]), + ((60, 90), vec![vec![85, 86, 87, 88, 89]]), + ], + descending: true, + nulls_first: false, + limit: Some(3), + expected_output: vec![vec![99, 98, 97]], + expected_polled_rows: Some(9), + nullable: false, + }, + + // ===== Empty Ranges Middle Processing Tests ===== + TestCase { + name: "empty_ranges_middle_processing_early_stop", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((90, 100), vec![vec![94, 95, 96, 97, 98]]), + ((60, 90), vec![vec![]]), + ((50, 90), vec![vec![]]), + ((40, 80), vec![vec![75, 76, 77, 78, 79]]), + ], + descending: true, + nulls_first: false, + limit: Some(4), + expected_output: vec![vec![98, 97, 96, 95]], + expected_polled_rows: Some(10), + nullable: false, + }, + + // ===== Two Non-Overlapping Ranges Tests ===== + // Test early stop with two non-overlapping ranges - limit reached in first + // Input: Two partitions (85,100) with [90-94] and (65,80) with [70-74], no overlap + // With limit=3, we get [94,93,92] from first partition, threshold=92 + // Since ranges don't overlap and we have enough values, we can stop early + // expected_polled_rows=10: Processes first partition completely (5) + second partition (5)(which is needed to make sure first partition is completed) + TestCase { + name: "two_no_overlap_range_limit_reached_in_first", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((85, 100), vec![vec![90, 91, 92, 93, 94]]), + ((65, 80), vec![vec![70, 71, 72, 73, 74]]), + ], + descending: true, + nulls_first: false, + limit: Some(3), + expected_output: vec![vec![94, 93, 92]], + expected_polled_rows: Some(10), + nullable: false, + }, + TestCase { + name: "two_no_overlap_range_need_both_groups", + unit: TimeUnit::Millisecond, + input_ranges: vec![ + ((85, 100), vec![vec![90, 91, 92, 93, 94]]), + ((65, 80), vec![vec![70, 71, 72, 73, 74]]), + ], + descending: true, + nulls_first: false, + limit: Some(7), + expected_output: vec![vec![94, 93, 92, 91, 90, 74, 73]], + expected_polled_rows: Some(10), + nullable: false, + }, + ]; + + for (case_id, test_case) in test_cases.into_iter().enumerate() { + run_comprehensive_test(case_id, test_case).await; + } + } + + /// Test early stop behavior with null values in sort column. + /// Verifies that nulls are handled correctly based on nulls_first option. + #[tokio::test] + async fn test_early_stop_with_nulls() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + true, // nullable + )])); + + // Helper function to create nullable timestamp arrays + let new_nullable_ts_array = |unit: TimeUnit, arr: Vec>| -> ArrayRef { + match unit { + TimeUnit::Second => Arc::new(TimestampSecondArray::from(arr)) as ArrayRef, + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(arr)) as ArrayRef, + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(arr)) as ArrayRef, + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(arr)) as ArrayRef, + } + }; + + // Test case 1: nulls_first=true, null values should appear first + // Group 1 (end=100): [null, null, 99, 98, 97] -> with limit=3, top 3 are [null, null, 99] + // Threshold is 99, next group end=90, since 99 >= 90, we should stop early + let input_ranged_data = vec![ + ( + PartitionRange { + start: Timestamp::new(70, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 5, + identifier: 0, + }, vec![ - vec![19, 17, 15], - vec![12, 11, 10], - vec![9, 8, 7, 6, 5, 4, 3, 2, 1], + DfRecordBatch::try_new( + schema.clone(), + vec![new_nullable_ts_array( + unit, + vec![Some(99), Some(98), None, Some(97), None], + )], + ) + .unwrap(), ], ), ( - TimeUnit::Millisecond, + PartitionRange { + start: Timestamp::new(50, unit.into()), + end: Timestamp::new(90, unit.into()), + num_rows: 3, + identifier: 1, + }, vec![ - ( - (15, 20), - vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]], - ), - ((10, 15), vec![]), - ((5, 10), vec![]), - ((0, 10), vec![]), + DfRecordBatch::try_new( + schema.clone(), + vec![new_nullable_ts_array( + unit, + vec![Some(89), Some(88), Some(87)], + )], + ) + .unwrap(), ], - true, - Some(2), - vec![vec![19, 17]], ), ]; - for (identifier, (unit, input_ranged_data, descending, limit, expected_output)) in - testcases.into_iter().enumerate() - { - let schema = Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )]); - let schema = Arc::new(schema); - let opt = SortOptions { - descending, - ..Default::default() - }; + // With nulls_first=true, nulls sort before all values + // For descending, order is: null, null, 99, 98, 97 + // With limit=3, we get: null, null, 99 + let expected_output = Some( + DfRecordBatch::try_new( + schema.clone(), + vec![new_nullable_ts_array(unit, vec![None, None, Some(99)])], + ) + .unwrap(), + ); - let input_ranged_data = input_ranged_data - .into_iter() - .map(|(range, data)| { - let part = PartitionRange { - start: Timestamp::new(range.0, unit.into()), - end: Timestamp::new(range.1, unit.into()), - num_rows: data.iter().map(|b| b.len()).sum(), - identifier, - }; + run_test( + 3000, + input_ranged_data, + schema.clone(), + SortOptions { + descending: true, + nulls_first: true, + }, + Some(3), + expected_output, + Some(8), // Must read both batches to detect group boundary + false, + ) + .await; - let batches = data - .into_iter() - .map(|b| { - let arr = new_ts_array(unit, b); - DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap() - }) - .collect_vec(); - (part, batches) - }) - .collect_vec(); + // Test case 2: nulls_last=true, null values should appear last + // Group 1 (end=100): [99, 98, 97, null, null] -> with limit=3, top 3 are [99, 98, 97] + // Threshold is 97, next group end=90, since 97 >= 90, we should stop early + let input_ranged_data = vec![ + ( + PartitionRange { + start: Timestamp::new(70, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 5, + identifier: 0, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_nullable_ts_array( + unit, + vec![Some(99), Some(98), Some(97), None, None], + )], + ) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(50, unit.into()), + end: Timestamp::new(90, unit.into()), + num_rows: 3, + identifier: 1, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_nullable_ts_array( + unit, + vec![Some(89), Some(88), Some(87)], + )], + ) + .unwrap(), + ], + ), + ]; - let expected_output = expected_output + // With nulls_last=false (equivalent to nulls_first=false), values sort before nulls + // For descending, order is: 99, 98, 97, null, null + // With limit=3, we get: 99, 98, 97 + let expected_output = Some( + DfRecordBatch::try_new( + schema.clone(), + vec![new_nullable_ts_array( + unit, + vec![Some(99), Some(98), Some(97)], + )], + ) + .unwrap(), + ); + + run_test( + 3001, + input_ranged_data, + schema.clone(), + SortOptions { + descending: true, + nulls_first: false, + }, + Some(3), + expected_output, + Some(8), // Must read both batches to detect group boundary + false, + ) + .await; + } + + /// Run a comprehensive test case + async fn run_comprehensive_test(case_id: usize, test_case: TestCase) { + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(test_case.unit, None), + false, + )])); + + let opt = SortOptions { + descending: test_case.descending, + nulls_first: test_case.nulls_first, + }; + + let input_ranged_data = test_case + .input_ranges + .into_iter() + .enumerate() + .map(|(identifier, (range, data))| { + let part = PartitionRange { + start: Timestamp::new(range.0, test_case.unit.into()), + end: Timestamp::new(range.1, test_case.unit.into()), + num_rows: data.iter().map(|b| b.len()).sum(), + identifier, + }; + + let batches = data + .into_iter() + .map(|b| { + let arr = new_ts_array(test_case.unit, b); + DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap() + }) + .collect_vec(); + (part, batches) + }) + .collect_vec(); + + let expected_output = if test_case.expected_output.is_empty() { + None + } else { + let batches = test_case + .expected_output .into_iter() .map(|a| { - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]).unwrap() + let arr = new_ts_array(test_case.unit, a); + DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap() }) .collect_vec(); - let expected_output = if expected_output.is_empty() { - None - } else { - Some(concat_batches(&schema, &expected_output).unwrap()) - }; + Some(concat_batches(&schema, &batches).unwrap()) + }; - run_test( - identifier, - input_ranged_data, - schema.clone(), - opt, - limit, - expected_output, - None, - false, - ) - .await; - } + run_test( + case_id, + input_ranged_data, + schema.clone(), + opt, + test_case.limit, + expected_output, + test_case.expected_polled_rows, + false, + ) + .await; } #[allow(clippy::print_stdout)] @@ -2404,8 +3003,9 @@ mod test { /// Test early stop behavior with null values in sort column. /// Verifies that nulls are handled correctly based on nulls_first option. + /// This test covers the null handling scenarios that were previously in separate test functions. #[tokio::test] - async fn test_early_stop_with_nulls() { + async fn test_null_handling_comprehensive() { let unit = TimeUnit::Millisecond; let schema = Arc::new(Schema::new(vec![Field::new( "ts",