diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 9dc0a491e2..e6f0444e1a 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -1456,14 +1456,11 @@ mod test { expected_polled_rows: None, nullable: false, }, - // ===== Limit Tests ===== TestCase { name: "limit_multiple_batches_single_partition", unit: TimeUnit::Millisecond, - input_ranges: vec![ - ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]), - ], + input_ranges: vec![((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]])], descending: true, nulls_first: false, limit: Some(3), @@ -1488,9 +1485,7 @@ mod test { TestCase { name: "limit_ascending_sort", unit: TimeUnit::Millisecond, - input_ranges: vec![ - ((0, 10), vec![vec![7, 8, 9], vec![4, 5, 6], vec![1, 2, 3]]), - ], + input_ranges: vec![((0, 10), vec![vec![7, 8, 9], vec![4, 5, 6], vec![1, 2, 3]])], descending: false, nulls_first: false, limit: Some(2), @@ -1498,7 +1493,6 @@ mod test { expected_polled_rows: None, nullable: false, }, - // ===== Early Termination Tests ===== // Test early termination with limit - should stop after processing first partition group // Input: Three partitions with ranges (20,30), (10,20), (0,10), each with 2 batches of 5 values @@ -1508,8 +1502,14 @@ mod test { name: "early_termination_descending", unit: TimeUnit::Millisecond, input_ranges: vec![ - ((20, 30), vec![vec![21, 22, 23, 24, 25], vec![26, 27, 28, 29, 30]]), - ((10, 20), vec![vec![11, 12, 13, 14, 15], vec![16, 17, 18, 19, 20]]), + ( + (20, 30), + vec![vec![21, 22, 23, 24, 25], vec![26, 27, 28, 29, 30]], + ), + ( + (10, 20), + vec![vec![11, 12, 13, 14, 15], vec![16, 17, 18, 19, 20]], + ), ((0, 10), vec![vec![1, 2, 3, 4, 5], vec![6, 7, 8, 9, 10]]), ], descending: true, @@ -1519,7 +1519,6 @@ mod test { expected_polled_rows: Some(10), nullable: false, }, - // ===== Primary End Grouping Tests ===== TestCase { name: "primary_end_grouping_with_limit", @@ -1535,7 +1534,6 @@ mod test { expected_polled_rows: None, nullable: false, }, - // ===== Three Ranges Keep Pulling Tests ===== TestCase { name: "three_ranges_keep_pulling", @@ -1552,7 +1550,6 @@ mod test { expected_polled_rows: None, nullable: false, }, - // ===== Threshold-Based Early Termination Tests ===== // Test threshold-based early termination - when threshold >= next group's primary end // Input: Two partitions (70,100) with [94,95,96,97,98,99] and (50,90) with [85,86,87] @@ -1635,7 +1632,6 @@ mod test { expected_polled_rows: Some(11), nullable: false, }, - // ===== Null Handling Tests ===== // ===== Single Group Tests ===== @@ -1658,7 +1654,6 @@ mod test { expected_polled_rows: Some(9), nullable: false, }, - // ===== Exact Boundary Equality Tests ===== // Test exact boundary equality for descending sort - threshold equals next primary end // Input: Two partitions (70,100) with [92,91,90,89] and (50,90) with [88,87,86] @@ -1698,7 +1693,6 @@ mod test { expected_polled_rows: Some(7), nullable: false, }, - // ===== Empty Partition Tests ===== // Test early stop with empty partitions at the start // Input: Four partitions, first two are empty, third has [74-77], fourth has [58-60] @@ -1742,7 +1736,6 @@ mod test { expected_polled_rows: Some(7), nullable: false, }, - // ===== Empty Ranges at Beginning Tests ===== TestCase { name: "early_stop_empty_ranges_beginning_desc", @@ -1760,7 +1753,6 @@ mod test { expected_polled_rows: Some(7), nullable: false, }, - // ===== Empty Ranges Threshold Calculation Tests ===== TestCase { name: "early_stop_empty_ranges_threshold_calculation", @@ -1779,7 +1771,6 @@ mod test { expected_polled_rows: Some(12), nullable: false, }, - // ===== Consecutive Empty Ranges Tests ===== TestCase { name: "consecutive_empty_ranges_early_stop_desc", @@ -1798,7 +1789,6 @@ mod test { expected_polled_rows: Some(13), nullable: false, }, - // ===== Empty Ranges Exact Boundary Tests ===== TestCase { name: "empty_ranges_exact_boundary_early_stop", @@ -1815,7 +1805,6 @@ mod test { expected_polled_rows: Some(9), nullable: false, }, - // ===== Empty Ranges Middle Processing Tests ===== TestCase { name: "empty_ranges_middle_processing_early_stop", @@ -1833,7 +1822,6 @@ mod test { expected_polled_rows: Some(10), nullable: false, }, - // ===== Two Non-Overlapping Ranges Tests ===== // Test early stop with two non-overlapping ranges - limit reached in first // Input: Two partitions (85,100) with [90-94] and (65,80) with [70-74], no overlap @@ -2542,101 +2530,6 @@ mod test { .await; } - /// Test case with three ranges demonstrating the "keep pulling" behavior. - /// After processing ranges with end=100, the smallest value in top-k might still - /// be reachable by the next group. - /// - /// Ranges: [70, 100), [50, 100), [40, 95) - /// With descending sort and limit=4: - /// - Group 1 (end=100): [70, 100) and [50, 100) merged - /// - Group 2 (end=95): [40, 95) - /// After group 1, smallest in top-4 is 85. Range [40, 95) could have values >= 85, - /// so we continue to group 2. - #[tokio::test] - async fn test_three_ranges_keep_pulling() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Three ranges, two with same end (100), one with different end (95) - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 3, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![80, 90, 95])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(50, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 3, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![55, 75, 85])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(40, unit.into()), - end: Timestamp::new(95, unit.into()), - num_rows: 3, - identifier: 2, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![45, 65, 94])], - ) - .unwrap(), - ], - ), - ]; - - // All data: [80, 90, 95, 55, 75, 85, 45, 65, 94] - // Sorted descending: [95, 94, 90, 85, 80, 75, 65, 55, 45] - // With limit=4: should be top 4 largest values across all ranges: [95, 94, 90, 85] - let expected_output = Some( - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![95, 94, 90, 85])], - ) - .unwrap(), - ); - - run_test( - 2001, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(4), - expected_output, - None, - false, - ) - .await; - } - /// Test early termination based on threshold comparison with next group. /// When the threshold (smallest value for descending) is >= next group's primary end, /// we can stop early because the next group cannot have better values. @@ -2892,115 +2785,6 @@ mod test { .await; } - #[tokio::test] - async fn test_ascending_threshold_early_termination_case_two() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // For ascending: primary_end is start, ranges sorted by (start ASC, end ASC) - // Group 1 (start=0) has 4 rows, Group 2 (start=4) has 1 row, Group 3 (start=5) has 4 rows - // After reading all data: [9,10,11,12, 21, 5,6,7,8] - // Sorted ascending: [5,6,7,8, 9,10,11,12, 21] - // With limit=4, output should be smallest 4: [5,6,7,8] - // Algorithm continues reading until start=42 > threshold=8, confirming no smaller values exist - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(0, unit.into()), - end: Timestamp::new(20, unit.into()), - num_rows: 4, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![9, 10, 11, 12])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(4, unit.into()), - end: Timestamp::new(25, unit.into()), - num_rows: 1, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(5, unit.into()), - end: Timestamp::new(25, unit.into()), - num_rows: 4, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![5, 6, 7, 8])], - ) - .unwrap(), - ], - ), - // This still will be read to detect boundary, but should not contribute to output - ( - PartitionRange { - start: Timestamp::new(42, unit.into()), - end: Timestamp::new(52, unit.into()), - num_rows: 2, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![42, 51])]) - .unwrap(), - ], - ), - // This following one should not be read after boundary detected - ( - PartitionRange { - start: Timestamp::new(48, unit.into()), - end: Timestamp::new(53, unit.into()), - num_rows: 2, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![48, 51])]) - .unwrap(), - ], - ), - ]; - - // With limit=4, ascending: after processing all ranges, smallest 4 are [5, 6, 7, 8] - // Threshold is 8 (4th smallest value), algorithm reads until start=42 > threshold=8 - let expected_output = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![5, 6, 7, 8])]) - .unwrap(), - ); - - run_test( - 2005, - input_ranged_data, - schema.clone(), - SortOptions { - descending: false, - ..Default::default() - }, - Some(4), - expected_output, - Some(11), // Read first 4 ranges to confirm threshold boundary - false, - ) - .await; - } - /// Test early stop behavior with null values in sort column. /// Verifies that nulls are handled correctly based on nulls_first option. /// This test covers the null handling scenarios that were previously in separate test functions. @@ -3162,978 +2946,4 @@ mod test { ) .await; } - - /// Test early stop behavior when there's only one group (no next group). - /// In this case, can_stop_early should return false and we should process all data. - #[tokio::test] - async fn test_can_early_stop_single_group() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Only one group (all ranges have the same end), no next group to compare against - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 6, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![94, 95, 96, 97, 98, 99])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(50, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 3, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![85, 86, 87])], - ) - .unwrap(), - ], - ), - ]; - - // Even though we have enough data in first range, we must process all - // because there's no next group to compare threshold against - let expected_output = Some( - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![99, 98, 97, 96])], - ) - .unwrap(), - ); - - run_test( - 3002, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(4), - expected_output, - Some(9), // Must read all batches since no early stop is possible - false, - ) - .await; - } - - /// Test early stop behavior when threshold exactly equals next group's boundary. - #[tokio::test] - async fn test_early_stop_exact_boundary_equality() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Test case 1: Descending sort, threshold == next_group_end - // Group 1 (end=100): data up to 90, threshold = 90, next_group_end = 90 - // Since 90 >= 90, we should stop early - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 4, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![92, 91, 90, 89])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(50, unit.into()), - end: Timestamp::new(90, unit.into()), - num_rows: 3, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![88, 87, 86])], - ) - .unwrap(), - ], - ), - ]; - - let expected_output = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![92, 91, 90])]) - .unwrap(), - ); - - run_test( - 3003, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(3), - expected_output, - Some(7), // Must read both batches to detect boundary - false, - ) - .await; - - // Test case 2: Ascending sort, threshold == next_group_start - // Group 1 (start=10): data from 10, threshold = 20, next_group_start = 20 - // Since 20 < 20 is false, we should continue - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(10, unit.into()), - end: Timestamp::new(50, unit.into()), - num_rows: 4, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![10, 15, 20, 25])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(20, unit.into()), - end: Timestamp::new(60, unit.into()), - num_rows: 3, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![21, 22, 23])], - ) - .unwrap(), - ], - ), - ]; - - let expected_output = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![10, 15, 20])]) - .unwrap(), - ); - - run_test( - 3004, - input_ranged_data, - schema.clone(), - SortOptions { - descending: false, - ..Default::default() - }, - Some(3), - expected_output, - Some(7), // Must read both batches since 20 is not < 20 - false, - ) - .await; - } - - /// Test early stop behavior with empty partition groups. - #[tokio::test] - async fn test_early_stop_with_empty_partitions() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Test case 1: First group is empty, second group has data - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 0, - identifier: 0, - }, - vec![ - // Empty batch for first range - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(50, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 0, - identifier: 1, - }, - vec![ - // Empty batch for second range - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(30, unit.into()), - end: Timestamp::new(80, unit.into()), - num_rows: 4, - identifier: 2, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![74, 75, 76, 77])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(10, unit.into()), - end: Timestamp::new(60, unit.into()), - num_rows: 3, - identifier: 3, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![58, 59, 60])], - ) - .unwrap(), - ], - ), - ]; - - // Group 1 (end=100) is empty, Group 2 (end=80) has data - // Should continue to Group 2 since Group 1 has no data - let expected_output = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![77, 76])]).unwrap(), - ); - - run_test( - 3005, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(2), - expected_output, - Some(7), // Must read until finding actual data - false, - ) - .await; - - // Test case 2: Empty partitions between data groups - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 4, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![96, 97, 98, 99])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(50, unit.into()), - end: Timestamp::new(90, unit.into()), - num_rows: 0, - identifier: 1, - }, - vec![ - // Empty range - should be skipped - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(30, unit.into()), - end: Timestamp::new(70, unit.into()), - num_rows: 0, - identifier: 2, - }, - vec![ - // Another empty range - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(10, unit.into()), - end: Timestamp::new(50, unit.into()), - num_rows: 3, - identifier: 3, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![48, 49, 50])], - ) - .unwrap(), - ], - ), - ]; - - // With limit=2 from group 1: [99, 98], threshold=98, next group end=50 - // Since 98 >= 50, we should stop early - let expected_output = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![99, 98])]).unwrap(), - ); - - run_test( - 3006, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(2), - expected_output, - Some(7), // Must read to detect early stop condition - false, - ) - .await; - } - - /// Test early stop behavior with empty partition ranges at the beginning. - /// This specifically tests the desc ordering case where empty ranges - /// should not interfere with early termination logic. - #[tokio::test] - async fn test_early_stop_empty_ranges_beginning_desc() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Test case: Empty ranges at the beginning, then data ranges - // For descending: primary_end is end, ranges should be ordered by end DESC - // Group 1 (end=100): empty ranges [70,100) and [50,100) - both empty - // Group 2 (end=80): has data [75,76,77,78] - // With limit=2, we should get [78,77] and stop early since threshold=77 >= next_group_end - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 0, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(50, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 0, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(60, unit.into()), - end: Timestamp::new(80, unit.into()), - num_rows: 4, - identifier: 2, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![75, 76, 77, 78])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(40, unit.into()), - end: Timestamp::new(70, unit.into()), - num_rows: 3, - identifier: 3, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![65, 66, 67])], - ) - .unwrap(), - ], - ), - ]; - - // With limit=2, descending: top 2 from group 2 are [78, 77] - // Threshold is 77, next group's primary_end is 70 - // Since 77 >= 70, we should stop early after group 2 - let expected_output = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![78, 77])]).unwrap(), - ); - - run_test( - 4000, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(2), - expected_output, - Some(7), // Must read until finding actual data and detecting early stop - false, - ) - .await; - } - - /// Test early stop with empty ranges affecting threshold calculation. - /// Empty ranges should be skipped and not affect the threshold-based early termination. - #[tokio::test] - async fn test_early_stop_empty_ranges_threshold_calculation() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Test case: Mix of empty and non-empty ranges - // Group 1 (end=100): [70,100) has data, [50,100) is empty - // Group 2 (end=90): [60,90) has data, [40,90) is empty - // Group 3 (end=80): [30,80) has data - // With limit=3 from group 1+2, threshold should be calculated correctly - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 5, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(50, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 0, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(60, unit.into()), - end: Timestamp::new(90, unit.into()), - num_rows: 4, - identifier: 2, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![84, 85, 86, 87])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(40, unit.into()), - end: Timestamp::new(90, unit.into()), - num_rows: 0, - identifier: 3, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(30, unit.into()), - end: Timestamp::new(80, unit.into()), - num_rows: 3, - identifier: 4, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![75, 76, 77])], - ) - .unwrap(), - ], - ), - ]; - - // Combined data from groups 1+2: [94,95,96,97,98, 84,85,86,87] - // Sorted descending: [98,97,96,95,94, 87,86,85,84] - // With limit=3: [98,97,96], threshold=96 - // Next group's primary_end is 80, since 96 >= 80, stop early - let expected_output = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![98, 97, 96])]) - .unwrap(), - ); - - run_test( - 4001, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(3), - expected_output, - Some(12), // Read through all groups to detect boundaries correctly - false, - ) - .await; - } - - /// Test consecutive empty ranges with desc ordering that should trigger early stop. - /// This tests the scenario where multiple empty ranges are processed before finding - /// data, and then early stop kicks in correctly. - #[tokio::test] - async fn test_consecutive_empty_ranges_early_stop_desc() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Test case: Multiple consecutive empty ranges, then data, then early stop - // Groups with empty ranges should be processed quickly - // Group 1 (end=120): empty - // Group 2 (end=110): empty - // Group 3 (end=100): has data [95,96,97,98,99] - // Group 4 (end=90): has data [85,86,87,88,89] - // With limit=4, we should get [99,98,97,96] from group 3 and stop early - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(90, unit.into()), - end: Timestamp::new(120, unit.into()), - num_rows: 0, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(80, unit.into()), - end: Timestamp::new(110, unit.into()), - num_rows: 0, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 5, - identifier: 2, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![95, 96, 97, 98, 99])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(60, unit.into()), - end: Timestamp::new(90, unit.into()), - num_rows: 5, - identifier: 3, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(40, unit.into()), - end: Timestamp::new(80, unit.into()), - num_rows: 3, - identifier: 4, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![75, 76, 77])], - ) - .unwrap(), - ], - ), - ]; - - // With limit=4, descending: top 4 from group 3 are [99,98,97,96] - // Threshold is 96, next group's primary_end is 80 - // Since 96 >= 80, we should stop early after group 3 - let expected_output = Some( - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![99, 98, 97, 96])], - ) - .unwrap(), - ); - - run_test( - 4002, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(4), - expected_output, - Some(13), // Read through all groups to detect boundaries correctly - false, - ) - .await; - } - - /// Test empty ranges with exact boundary equality for early stop. - /// This verifies that empty ranges don't interfere with exact boundary conditions. - #[tokio::test] - async fn test_empty_ranges_exact_boundary_early_stop() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Test case: Empty ranges with exact boundary equality - // Group 1 (end=100): empty range [80,100) and data range [70,100) with values up to 90 - // Group 2 (end=90): has data [85,86,87,88,89] - // With limit=3, threshold=98, next_group_end=90, since 98 >= 90, stop early - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(80, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 0, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(70, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 4, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![96, 97, 98, 99])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(60, unit.into()), - end: Timestamp::new(90, unit.into()), - num_rows: 5, - identifier: 2, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])], - ) - .unwrap(), - ], - ), - ]; - - // With limit=3, descending: top 3 from group 1 are [99,98,97] - // Threshold is 98, next group's primary_end is 90 - // Since 98 >= 90, we should stop early - let expected_output = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![99, 98, 97])]) - .unwrap(), - ); - - run_test( - 4003, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(3), - expected_output, - Some(9), // Read through both groups to detect boundaries correctly - false, - ) - .await; - } - - /// Test that empty ranges in the middle of processing don't break early stop logic. - /// This ensures that empty ranges are properly skipped and don't affect threshold calculation. - #[tokio::test] - async fn test_empty_ranges_middle_processing_early_stop() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Test case: Data, then empty ranges, then more data, then early stop - // Group 1 (end=100): has data [94,95,96,97,98] - // Group 2 (end=90): empty ranges [60,90) and [50,90) - // Group 3 (end=80): has data [75,76,77,78,79] - // With limit=4, we should get [98,97,96,95] from group 1 and stop early - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(90, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 5, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(60, unit.into()), - end: Timestamp::new(90, unit.into()), - num_rows: 0, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(50, unit.into()), - end: Timestamp::new(90, unit.into()), - num_rows: 0, - identifier: 2, - }, - vec![ - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(40, unit.into()), - end: Timestamp::new(80, unit.into()), - num_rows: 5, - identifier: 3, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![75, 76, 77, 78, 79])], - ) - .unwrap(), - ], - ), - ]; - - // With limit=4, descending: top 4 from group 1 are [98,97,96,95] - // Threshold is 95, next group's primary_end after empty groups is 80 - // Since 95 >= 80, we should stop early after group 1 - let expected_output = Some( - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![98, 97, 96, 95])], - ) - .unwrap(), - ); - - run_test( - 4004, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(4), - expected_output, - Some(10), // Read through all groups because it can't detect boundaries due to data distribution - false, - ) - .await; - } - - /// Test if two group of non overlapping ranges with limit works as expected. - /// Which is still output correct number of rows whether or not limit is reached in first group. - #[tokio::test] - async fn test_two_no_overlap_range_two_limit_output() { - let unit = TimeUnit::Millisecond; - let schema = Arc::new(Schema::new(vec![Field::new( - "ts", - DataType::Timestamp(unit, None), - false, - )])); - - // Test case: Two non-overlapping groups with limit - // Group 1 (end=100): has data [90, 91, 92, 93, 94] - 5 rows - // Group 2 (end=80): has data [70, 71, 72, 73, 74] - 5 rows - // With limit=3, we should get [94, 93, 92] from group 1 (limit reached in first group) - // With limit=7, we should get [94, 93, 92, 91, 90, 74, 73] from both groups - let input_ranged_data = vec![ - ( - PartitionRange { - start: Timestamp::new(85, unit.into()), - end: Timestamp::new(100, unit.into()), - num_rows: 5, - identifier: 0, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![90, 91, 92, 93, 94])], - ) - .unwrap(), - ], - ), - ( - PartitionRange { - start: Timestamp::new(65, unit.into()), - end: Timestamp::new(80, unit.into()), - num_rows: 5, - identifier: 1, - }, - vec![ - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![70, 71, 72, 73, 74])], - ) - .unwrap(), - ], - ), - ]; - - // Test case 1: Limit reached in first group - let expected_output_case1 = Some( - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![94, 93, 92])]) - .unwrap(), - ); - - run_test( - 4005, - input_ranged_data.clone(), - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(3), - expected_output_case1, - Some(10), // Should read through first&second group and detect boundary - false, - ) - .await; - - // Test case 2: Limit not reached in first group, need data from second group - let expected_output_case2 = Some( - DfRecordBatch::try_new( - schema.clone(), - vec![new_ts_array(unit, vec![94, 93, 92, 91, 90, 74, 73])], - ) - .unwrap(), - ); - - run_test( - 4006, - input_ranged_data, - schema.clone(), - SortOptions { - descending: true, - ..Default::default() - }, - Some(7), - expected_output_case2, - Some(10), // Should read through both groups - false, - ) - .await; - } }