refactor: add list test

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-12-22 21:05:13 +08:00
parent 709ccd3e31
commit 4419e0254f

View File

@@ -1340,72 +1340,107 @@ mod test {
} }
} }
/// Comprehensive test case structure that encapsulates all test parameters
#[derive(Debug, Clone)]
struct TestCase {
#[allow(dead_code)]
name: &'static str,
unit: TimeUnit,
input_ranges: Vec<((i64, i64), Vec<Vec<i64>>)>, // (start, end) -> data batches
descending: bool,
nulls_first: bool,
limit: Option<usize>,
expected_output: Vec<Vec<i64>>,
expected_polled_rows: Option<usize>,
#[allow(dead_code)]
nullable: bool,
}
/// Comprehensive test suite covering all PartSort functionality
#[tokio::test] #[tokio::test]
async fn simple_cases() { async fn comprehensive_part_sort_tests() {
let testcases = vec![ let test_cases = vec![
( // ===== Basic Cases =====
TimeUnit::Millisecond, // Test basic ascending sort with overlapping ranges
vec![ TestCase {
name: "basic_ascending_overlapping_ranges",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]), ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]),
((5, 10), vec![vec![5, 6], vec![7, 8]]), ((5, 10), vec![vec![5, 6], vec![7, 8]]),
], ],
false, descending: false,
None, nulls_first: false,
vec![vec![1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]], limit: None,
), expected_output: vec![vec![1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]],
// Case 1: Descending sort with overlapping ranges that have the same primary end (end=10). expected_polled_rows: None,
// Ranges [5,10) and [0,10) are grouped together, so their data is merged before sorting. nullable: false,
( },
TimeUnit::Millisecond, // Test descending sort with overlapping ranges that have same primary end
vec![ TestCase {
name: "descending_overlapping_same_primary_end",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((5, 10), vec![vec![5, 6], vec![7, 8, 9]]), ((5, 10), vec![vec![5, 6], vec![7, 8, 9]]),
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
], ],
true, descending: true,
None, nulls_first: false,
vec![vec![9, 8, 8, 7, 7, 6, 6, 5, 5, 4, 3, 2, 1]], limit: None,
), expected_output: vec![vec![9, 8, 8, 7, 7, 6, 6, 5, 5, 4, 3, 2, 1]],
( expected_polled_rows: None,
TimeUnit::Millisecond, nullable: false,
vec![ },
TestCase {
name: "empty_first_range",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((5, 10), vec![]), ((5, 10), vec![]),
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
], ],
true, descending: true,
None, nulls_first: false,
vec![vec![8, 7, 6, 5, 4, 3, 2, 1]], limit: None,
), expected_output: vec![vec![8, 7, 6, 5, 4, 3, 2, 1]],
( expected_polled_rows: None,
TimeUnit::Millisecond, nullable: false,
vec![ },
TestCase {
name: "multiple_ranges_with_empty_middle",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((15, 20), vec![vec![17, 18, 19]]), ((15, 20), vec![vec![17, 18, 19]]),
((10, 15), vec![]), ((10, 15), vec![]),
((5, 10), vec![]), ((5, 10), vec![]),
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
], ],
true, descending: true,
None, nulls_first: false,
vec![vec![19, 18, 17], vec![8, 7, 6, 5, 4, 3, 2, 1]], limit: None,
), expected_output: vec![vec![19, 18, 17], vec![8, 7, 6, 5, 4, 3, 2, 1]],
( expected_polled_rows: None,
TimeUnit::Millisecond, nullable: false,
vec![ },
TestCase {
name: "all_empty_ranges",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((15, 20), vec![]), ((15, 20), vec![]),
((10, 15), vec![]), ((10, 15), vec![]),
((5, 10), vec![]), ((5, 10), vec![]),
((0, 10), vec![]), ((0, 10), vec![]),
], ],
true, descending: true,
None, nulls_first: false,
vec![], limit: None,
), expected_output: vec![],
// Case 5: Data from one batch spans multiple ranges. Ranges with same end are grouped. expected_polled_rows: None,
// Ranges: [15,20) end=20, [10,15) end=15, [5,10) end=10, [0,10) end=10 nullable: false,
// Groups: {[15,20)}, {[10,15)}, {[5,10), [0,10)} },
// The last two ranges are merged because they share end=10. TestCase {
( name: "data_spanning_multiple_ranges_with_limit",
TimeUnit::Millisecond, unit: TimeUnit::Millisecond,
vec![ input_ranges: vec![
( (
(15, 20), (15, 20),
vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]], vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]],
@@ -1414,90 +1449,654 @@ mod test {
((5, 10), vec![]), ((5, 10), vec![]),
((0, 10), vec![]), ((0, 10), vec![]),
], ],
true, descending: true,
None, nulls_first: false,
limit: Some(2),
expected_output: vec![vec![19, 17]],
expected_polled_rows: None,
nullable: false,
},
// ===== Limit Tests =====
TestCase {
name: "limit_multiple_batches_single_partition",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]),
],
descending: true,
nulls_first: false,
limit: Some(3),
expected_output: vec![vec![9, 8, 7]],
expected_polled_rows: None,
nullable: false,
},
TestCase {
name: "limit_multiple_partitions",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((10, 20), vec![vec![10, 11, 12], vec![13, 14, 15]]),
((0, 10), vec![vec![1, 2, 3], vec![4, 5]]),
],
descending: true,
nulls_first: false,
limit: Some(2),
expected_output: vec![vec![15, 14]],
expected_polled_rows: None,
nullable: false,
},
TestCase {
name: "limit_ascending_sort",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((0, 10), vec![vec![7, 8, 9], vec![4, 5, 6], vec![1, 2, 3]]),
],
descending: false,
nulls_first: false,
limit: Some(2),
expected_output: vec![vec![1, 2]],
expected_polled_rows: None,
nullable: false,
},
// ===== Early Termination Tests =====
// Test early termination with limit - should stop after processing first partition group
// Input: Three partitions with ranges (20,30), (10,20), (0,10), each with 2 batches of 5 values
// With limit=2 and descending sort, we only need top 2 values [29,28] from the first partition
// expected_polled_rows=10: Only processes first partition (10 rows) before early termination
TestCase {
name: "early_termination_descending",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((20, 30), vec![vec![21, 22, 23, 24, 25], vec![26, 27, 28, 29, 30]]),
((10, 20), vec![vec![11, 12, 13, 14, 15], vec![16, 17, 18, 19, 20]]),
((0, 10), vec![vec![1, 2, 3, 4, 5], vec![6, 7, 8, 9, 10]]),
],
descending: true,
nulls_first: false,
limit: Some(2),
expected_output: vec![vec![29, 28]],
expected_polled_rows: Some(10),
nullable: false,
},
// ===== Primary End Grouping Tests =====
TestCase {
name: "primary_end_grouping_with_limit",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![80, 90, 95]]),
((50, 100), vec![vec![55, 65, 75, 85, 95]]),
],
descending: true,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![95, 95, 90, 85]],
expected_polled_rows: None,
nullable: false,
},
// ===== Three Ranges Keep Pulling Tests =====
TestCase {
name: "three_ranges_keep_pulling",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![80, 90, 95]]),
((50, 100), vec![vec![55, 75, 85]]),
((40, 95), vec![vec![45, 65, 94]]),
],
descending: true,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![95, 94, 90, 85]],
expected_polled_rows: None,
nullable: false,
},
// ===== Threshold-Based Early Termination Tests =====
// Test threshold-based early termination - when threshold >= next group's primary end
// Input: Two partitions (70,100) with [94,95,96,97,98,99] and (50,90) with [85,86,87]
// With limit=4, threshold becomes 96 (4th largest value), next_primary_end=90
// Since 96 >= 90, we can stop early and skip the second partition
// expected_polled_rows=9: Only processes first partition (6 rows) + partial second (3 rows)
TestCase {
name: "threshold_based_early_termination_desc",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![94, 95, 96, 97, 98, 99]]),
((50, 90), vec![vec![85, 86, 87]]),
],
descending: true,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![99, 98, 97, 96]],
expected_polled_rows: Some(9),
nullable: false,
},
// Test continuing when threshold falls within next group's range
// Input: Two partitions (70,100) with [94,95,96,97,98,99] and (50,98) with [55,60,65]
// With limit=4, threshold=96, next_primary_end=98
// Since 96 < 98, we cannot stop early - next group might have values > 96
// expected_polled_rows=9: Processes first partition (6) + second partition (3)
TestCase {
name: "continue_when_threshold_in_next_group_range",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![94, 95, 96, 97, 98, 99]]),
((50, 98), vec![vec![55, 60, 65]]),
],
descending: true,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![99, 98, 97, 96]],
expected_polled_rows: Some(9),
nullable: false,
},
// Test ascending threshold early termination
// Input: Three partitions (10,50), (20,60), (60,70) with values [10-15], [25,30,35], [60,61]
// With limit=4, threshold=13 (4th smallest value), next_primary_end=20
// Since 13 < 20, we need to check next group, but 13 < 25 (smallest in next), we can stop
// expected_polled_rows=11: Processes first partition (6) + second partition (3) + third (2)
TestCase {
name: "ascending_threshold_early_termination",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((10, 50), vec![vec![10, 11, 12, 13, 14, 15]]),
((20, 60), vec![vec![25, 30, 35]]),
((60, 70), vec![vec![60, 61]]),
((80, 90), vec![vec![80, 81]]),
],
descending: false,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![10, 11, 12, 13]],
expected_polled_rows: Some(11),
nullable: false,
},
// Test complex ascending threshold early termination case
// Input: Multiple partitions with overlapping ranges and various values
// With limit=4, we need smallest 4 values. After processing (5,25) with [5,6,7,8],
// threshold=8, and remaining partitions have min values > 8, so we can stop early
// expected_polled_rows=11: Processes necessary partitions until threshold condition met
TestCase {
name: "ascending_threshold_early_termination_case_two",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((0, 20), vec![vec![9, 10, 11, 12]]),
((4, 25), vec![vec![21]]),
((5, 25), vec![vec![5, 6, 7, 8]]),
((42, 52), vec![vec![42, 51]]),
((48, 53), vec![vec![48, 51]]),
],
descending: false,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![5, 6, 7, 8]],
expected_polled_rows: Some(11),
nullable: false,
},
// ===== Null Handling Tests =====
// ===== Single Group Tests =====
// Test early stop within a single group - partitions share same primary end
// Input: Two partitions both ending at 100, first has [94-99], second has [85-87]
// With limit=4, we get [99,98,97,96] from first partition, threshold=96
// Since both partitions have same primary end (100), we cannot stop early based on range
// expected_polled_rows=9: Processes both partitions completely (6 + 3 rows)
TestCase {
name: "can_early_stop_single_group",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![94, 95, 96, 97, 98, 99]]),
((50, 100), vec![vec![85, 86, 87]]),
],
descending: true,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![99, 98, 97, 96]],
expected_polled_rows: Some(9),
nullable: false,
},
// ===== Exact Boundary Equality Tests =====
// Test exact boundary equality for descending sort - threshold equals next primary end
// Input: Two partitions (70,100) with [92,91,90,89] and (50,90) with [88,87,86]
// With limit=3, threshold=90, next_primary_end=90
// Since threshold == next_primary_end, we can stop early (90 >= 90)
// expected_polled_rows=7: Processes first partition (4) + partial second (3 rows)
TestCase {
name: "exact_boundary_equality_desc",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![92, 91, 90, 89]]),
((50, 90), vec![vec![88, 87, 86]]),
],
descending: true,
nulls_first: false,
limit: Some(3),
expected_output: vec![vec![92, 91, 90]],
expected_polled_rows: Some(7),
nullable: false,
},
// Test exact boundary equality for ascending sort - threshold equals next primary start
// Input: Two partitions (10,50) with [10,15,20,25] and (20,60) with [21,22,23]
// With limit=3, threshold=20, next_primary_start=20
// Since threshold == next_primary_start, we can stop early (20 <= 20)
// expected_polled_rows=7: Processes first partition (4) + partial second (3 rows)
TestCase {
name: "exact_boundary_equality_asc",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((10, 50), vec![vec![10, 15, 20, 25]]),
((20, 60), vec![vec![21, 22, 23]]),
],
descending: false,
nulls_first: false,
limit: Some(3),
expected_output: vec![vec![10, 15, 20]],
expected_polled_rows: Some(7),
nullable: false,
},
// ===== Empty Partition Tests =====
// Test early stop with empty partitions at the start
// Input: Four partitions, first two are empty, third has [74-77], fourth has [58-60]
// With limit=2, we get [77,76] from third partition, threshold=76, next_primary_end=60
// Since 76 >= 60, we can stop early and skip the fourth partition
// expected_polled_rows=7: Processes empty partitions (0) + third partition (4) + partial fourth (3)
TestCase {
name: "early_stop_with_empty_partitions_at_start",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![]]),
((50, 100), vec![vec![]]),
((30, 80), vec![vec![74, 75, 76, 77]]),
((10, 60), vec![vec![58, 59, 60]]),
],
descending: true,
nulls_first: false,
limit: Some(2),
expected_output: vec![vec![77, 76]],
expected_polled_rows: Some(7),
nullable: false,
},
// Test early stop with empty partitions between data partitions
// Input: Four partitions with data in first and last, middle two are empty
// First partition has [96-99], last has [48-50], limit=2 gives [99,98]
// Threshold=98, next_primary_end=50, since 98 >= 50, we can stop early
// expected_polled_rows=7: Processes first partition (4) + empty partitions (0) + partial last (3)
TestCase {
name: "early_stop_with_empty_partitions_between_data",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![96, 97, 98, 99]]),
((50, 90), vec![vec![]]),
((30, 70), vec![vec![]]),
((10, 50), vec![vec![48, 49, 50]]),
],
descending: true,
nulls_first: false,
limit: Some(2),
expected_output: vec![vec![99, 98]],
expected_polled_rows: Some(7),
nullable: false,
},
// ===== Empty Ranges at Beginning Tests =====
TestCase {
name: "early_stop_empty_ranges_beginning_desc",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![]]),
((50, 100), vec![vec![]]),
((60, 80), vec![vec![75, 76, 77, 78]]),
((40, 70), vec![vec![65, 66, 67]]),
],
descending: true,
nulls_first: false,
limit: Some(2),
expected_output: vec![vec![78, 77]],
expected_polled_rows: Some(7),
nullable: false,
},
// ===== Empty Ranges Threshold Calculation Tests =====
TestCase {
name: "early_stop_empty_ranges_threshold_calculation",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((70, 100), vec![vec![94, 95, 96, 97, 98]]),
((50, 100), vec![vec![]]),
((60, 90), vec![vec![84, 85, 86, 87]]),
((40, 90), vec![vec![]]),
((30, 80), vec![vec![75, 76, 77]]),
],
descending: true,
nulls_first: false,
limit: Some(3),
expected_output: vec![vec![98, 97, 96]],
expected_polled_rows: Some(12),
nullable: false,
},
// ===== Consecutive Empty Ranges Tests =====
TestCase {
name: "consecutive_empty_ranges_early_stop_desc",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((90, 120), vec![vec![]]),
((80, 110), vec![vec![]]),
((70, 100), vec![vec![95, 96, 97, 98, 99]]),
((60, 90), vec![vec![85, 86, 87, 88, 89]]),
((40, 80), vec![vec![75, 76, 77]]),
],
descending: true,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![99, 98, 97, 96]],
expected_polled_rows: Some(13),
nullable: false,
},
// ===== Empty Ranges Exact Boundary Tests =====
TestCase {
name: "empty_ranges_exact_boundary_early_stop",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((80, 100), vec![vec![]]),
((70, 100), vec![vec![96, 97, 98, 99]]),
((60, 90), vec![vec![85, 86, 87, 88, 89]]),
],
descending: true,
nulls_first: false,
limit: Some(3),
expected_output: vec![vec![99, 98, 97]],
expected_polled_rows: Some(9),
nullable: false,
},
// ===== Empty Ranges Middle Processing Tests =====
TestCase {
name: "empty_ranges_middle_processing_early_stop",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((90, 100), vec![vec![94, 95, 96, 97, 98]]),
((60, 90), vec![vec![]]),
((50, 90), vec![vec![]]),
((40, 80), vec![vec![75, 76, 77, 78, 79]]),
],
descending: true,
nulls_first: false,
limit: Some(4),
expected_output: vec![vec![98, 97, 96, 95]],
expected_polled_rows: Some(10),
nullable: false,
},
// ===== Two Non-Overlapping Ranges Tests =====
// Test early stop with two non-overlapping ranges - limit reached in first
// Input: Two partitions (85,100) with [90-94] and (65,80) with [70-74], no overlap
// With limit=3, we get [94,93,92] from first partition, threshold=92
// Since ranges don't overlap and we have enough values, we can stop early
// expected_polled_rows=10: Processes first partition completely (5) + second partition (5)(which is needed to make sure first partition is completed)
TestCase {
name: "two_no_overlap_range_limit_reached_in_first",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((85, 100), vec![vec![90, 91, 92, 93, 94]]),
((65, 80), vec![vec![70, 71, 72, 73, 74]]),
],
descending: true,
nulls_first: false,
limit: Some(3),
expected_output: vec![vec![94, 93, 92]],
expected_polled_rows: Some(10),
nullable: false,
},
TestCase {
name: "two_no_overlap_range_need_both_groups",
unit: TimeUnit::Millisecond,
input_ranges: vec![
((85, 100), vec![vec![90, 91, 92, 93, 94]]),
((65, 80), vec![vec![70, 71, 72, 73, 74]]),
],
descending: true,
nulls_first: false,
limit: Some(7),
expected_output: vec![vec![94, 93, 92, 91, 90, 74, 73]],
expected_polled_rows: Some(10),
nullable: false,
},
];
for (case_id, test_case) in test_cases.into_iter().enumerate() {
run_comprehensive_test(case_id, test_case).await;
}
}
/// Test early stop behavior with null values in sort column.
/// Verifies that nulls are handled correctly based on nulls_first option.
#[tokio::test]
async fn test_early_stop_with_nulls() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
true, // nullable
)]));
// Helper function to create nullable timestamp arrays
let new_nullable_ts_array = |unit: TimeUnit, arr: Vec<Option<i64>>| -> ArrayRef {
match unit {
TimeUnit::Second => Arc::new(TimestampSecondArray::from(arr)) as ArrayRef,
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(arr)) as ArrayRef,
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(arr)) as ArrayRef,
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(arr)) as ArrayRef,
}
};
// Test case 1: nulls_first=true, null values should appear first
// Group 1 (end=100): [null, null, 99, 98, 97] -> with limit=3, top 3 are [null, null, 99]
// Threshold is 99, next group end=90, since 99 >= 90, we should stop early
let input_ranged_data = vec![
(
PartitionRange {
start: Timestamp::new(70, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 5,
identifier: 0,
},
vec![ vec![
vec![19, 17, 15], DfRecordBatch::try_new(
vec![12, 11, 10], schema.clone(),
vec![9, 8, 7, 6, 5, 4, 3, 2, 1], vec![new_nullable_ts_array(
unit,
vec![Some(99), Some(98), None, Some(97), None],
)],
)
.unwrap(),
], ],
), ),
( (
TimeUnit::Millisecond, PartitionRange {
start: Timestamp::new(50, unit.into()),
end: Timestamp::new(90, unit.into()),
num_rows: 3,
identifier: 1,
},
vec![ vec![
( DfRecordBatch::try_new(
(15, 20), schema.clone(),
vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]], vec![new_nullable_ts_array(
), unit,
((10, 15), vec![]), vec![Some(89), Some(88), Some(87)],
((5, 10), vec![]), )],
((0, 10), vec![]), )
.unwrap(),
], ],
true,
Some(2),
vec![vec![19, 17]],
), ),
]; ];
for (identifier, (unit, input_ranged_data, descending, limit, expected_output)) in // With nulls_first=true, nulls sort before all values
testcases.into_iter().enumerate() // For descending, order is: null, null, 99, 98, 97
{ // With limit=3, we get: null, null, 99
let schema = Schema::new(vec![Field::new( let expected_output = Some(
"ts", DfRecordBatch::try_new(
DataType::Timestamp(unit, None), schema.clone(),
false, vec![new_nullable_ts_array(unit, vec![None, None, Some(99)])],
)]); )
let schema = Arc::new(schema); .unwrap(),
let opt = SortOptions { );
descending,
..Default::default()
};
let input_ranged_data = input_ranged_data run_test(
.into_iter() 3000,
.map(|(range, data)| { input_ranged_data,
let part = PartitionRange { schema.clone(),
start: Timestamp::new(range.0, unit.into()), SortOptions {
end: Timestamp::new(range.1, unit.into()), descending: true,
num_rows: data.iter().map(|b| b.len()).sum(), nulls_first: true,
identifier, },
}; Some(3),
expected_output,
Some(8), // Must read both batches to detect group boundary
false,
)
.await;
let batches = data // Test case 2: nulls_last=true, null values should appear last
.into_iter() // Group 1 (end=100): [99, 98, 97, null, null] -> with limit=3, top 3 are [99, 98, 97]
.map(|b| { // Threshold is 97, next group end=90, since 97 >= 90, we should stop early
let arr = new_ts_array(unit, b); let input_ranged_data = vec![
DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap() (
}) PartitionRange {
.collect_vec(); start: Timestamp::new(70, unit.into()),
(part, batches) end: Timestamp::new(100, unit.into()),
}) num_rows: 5,
.collect_vec(); identifier: 0,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_nullable_ts_array(
unit,
vec![Some(99), Some(98), Some(97), None, None],
)],
)
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(50, unit.into()),
end: Timestamp::new(90, unit.into()),
num_rows: 3,
identifier: 1,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_nullable_ts_array(
unit,
vec![Some(89), Some(88), Some(87)],
)],
)
.unwrap(),
],
),
];
let expected_output = expected_output // With nulls_last=false (equivalent to nulls_first=false), values sort before nulls
// For descending, order is: 99, 98, 97, null, null
// With limit=3, we get: 99, 98, 97
let expected_output = Some(
DfRecordBatch::try_new(
schema.clone(),
vec![new_nullable_ts_array(
unit,
vec![Some(99), Some(98), Some(97)],
)],
)
.unwrap(),
);
run_test(
3001,
input_ranged_data,
schema.clone(),
SortOptions {
descending: true,
nulls_first: false,
},
Some(3),
expected_output,
Some(8), // Must read both batches to detect group boundary
false,
)
.await;
}
/// Run a comprehensive test case
async fn run_comprehensive_test(case_id: usize, test_case: TestCase) {
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(test_case.unit, None),
false,
)]));
let opt = SortOptions {
descending: test_case.descending,
nulls_first: test_case.nulls_first,
};
let input_ranged_data = test_case
.input_ranges
.into_iter()
.enumerate()
.map(|(identifier, (range, data))| {
let part = PartitionRange {
start: Timestamp::new(range.0, test_case.unit.into()),
end: Timestamp::new(range.1, test_case.unit.into()),
num_rows: data.iter().map(|b| b.len()).sum(),
identifier,
};
let batches = data
.into_iter()
.map(|b| {
let arr = new_ts_array(test_case.unit, b);
DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap()
})
.collect_vec();
(part, batches)
})
.collect_vec();
let expected_output = if test_case.expected_output.is_empty() {
None
} else {
let batches = test_case
.expected_output
.into_iter() .into_iter()
.map(|a| { .map(|a| {
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]).unwrap() let arr = new_ts_array(test_case.unit, a);
DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap()
}) })
.collect_vec(); .collect_vec();
let expected_output = if expected_output.is_empty() { Some(concat_batches(&schema, &batches).unwrap())
None };
} else {
Some(concat_batches(&schema, &expected_output).unwrap())
};
run_test( run_test(
identifier, case_id,
input_ranged_data, input_ranged_data,
schema.clone(), schema.clone(),
opt, opt,
limit, test_case.limit,
expected_output, expected_output,
None, test_case.expected_polled_rows,
false, false,
) )
.await; .await;
}
} }
#[allow(clippy::print_stdout)] #[allow(clippy::print_stdout)]
@@ -2404,8 +3003,9 @@ mod test {
/// Test early stop behavior with null values in sort column. /// Test early stop behavior with null values in sort column.
/// Verifies that nulls are handled correctly based on nulls_first option. /// Verifies that nulls are handled correctly based on nulls_first option.
/// This test covers the null handling scenarios that were previously in separate test functions.
#[tokio::test] #[tokio::test]
async fn test_early_stop_with_nulls() { async fn test_null_handling_comprehensive() {
let unit = TimeUnit::Millisecond; let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new( let schema = Arc::new(Schema::new(vec![Field::new(
"ts", "ts",