diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 36e4cc8463..3fcafb6886 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -2467,7 +2467,7 @@ mod test { /// Test early stop behavior when there's only one group (no next group). /// In this case, can_stop_early should return false and we should process all data. #[tokio::test] - async fn test_early_stop_single_group() { + async fn test_can_early_stop_single_group() { let unit = TimeUnit::Millisecond; let schema = Arc::new(Schema::new(vec![Field::new( "ts", @@ -2825,4 +2825,511 @@ mod test { ) .await; } + + /// Test early stop behavior with empty partition ranges at the beginning. + /// This specifically tests the desc ordering case where empty ranges + /// should not interfere with early termination logic. + #[tokio::test] + async fn test_early_stop_empty_ranges_beginning_desc() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + + // Test case: Empty ranges at the beginning, then data ranges + // For descending: primary_end is end, ranges should be ordered by end DESC + // Group 1 (end=100): empty ranges [70,100) and [50,100) - both empty + // Group 2 (end=80): has data [75,76,77,78] + // With limit=2, we should get [78,77] and stop early since threshold=77 >= next_group_end + let input_ranged_data = vec![ + ( + PartitionRange { + start: Timestamp::new(70, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 0, + identifier: 0, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(50, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 0, + identifier: 1, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(60, unit.into()), + end: Timestamp::new(80, unit.into()), + num_rows: 4, + identifier: 2, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![75, 76, 77, 78])], + ) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(40, unit.into()), + end: Timestamp::new(70, unit.into()), + num_rows: 3, + identifier: 3, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![65, 66, 67])], + ) + .unwrap(), + ], + ), + ]; + + // With limit=2, descending: top 2 from group 2 are [78, 77] + // Threshold is 77, next group's primary_end is 70 + // Since 77 >= 70, we should stop early after group 2 + let expected_output = Some( + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![78, 77])]).unwrap(), + ); + + run_test( + 4000, + input_ranged_data, + schema.clone(), + SortOptions { + descending: true, + ..Default::default() + }, + Some(2), + expected_output, + Some(7), // Must read until finding actual data and detecting early stop + ) + .await; + } + + /// Test early stop with empty ranges affecting threshold calculation. + /// Empty ranges should be skipped and not affect the threshold-based early termination. + #[tokio::test] + async fn test_early_stop_empty_ranges_threshold_calculation() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + + // Test case: Mix of empty and non-empty ranges + // Group 1 (end=100): [70,100) has data, [50,100) is empty + // Group 2 (end=90): [60,90) has data, [40,90) is empty + // Group 3 (end=80): [30,80) has data + // With limit=3 from group 1+2, threshold should be calculated correctly + let input_ranged_data = vec![ + ( + PartitionRange { + start: Timestamp::new(70, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 5, + identifier: 0, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])], + ) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(50, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 0, + identifier: 1, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(60, unit.into()), + end: Timestamp::new(90, unit.into()), + num_rows: 4, + identifier: 2, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![84, 85, 86, 87])], + ) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(40, unit.into()), + end: Timestamp::new(90, unit.into()), + num_rows: 0, + identifier: 3, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(30, unit.into()), + end: Timestamp::new(80, unit.into()), + num_rows: 3, + identifier: 4, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![75, 76, 77])], + ) + .unwrap(), + ], + ), + ]; + + // Combined data from groups 1+2: [94,95,96,97,98, 84,85,86,87] + // Sorted descending: [98,97,96,95,94, 87,86,85,84] + // With limit=3: [98,97,96], threshold=96 + // Next group's primary_end is 80, since 96 >= 80, stop early + let expected_output = Some( + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![98, 97, 96])]) + .unwrap(), + ); + + run_test( + 4001, + input_ranged_data, + schema.clone(), + SortOptions { + descending: true, + ..Default::default() + }, + Some(3), + expected_output, + Some(12), // Read through all groups to detect boundaries correctly + ) + .await; + } + + /// Test consecutive empty ranges with desc ordering that should trigger early stop. + /// This tests the scenario where multiple empty ranges are processed before finding + /// data, and then early stop kicks in correctly. + #[tokio::test] + async fn test_consecutive_empty_ranges_early_stop_desc() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + + // Test case: Multiple consecutive empty ranges, then data, then early stop + // Groups with empty ranges should be processed quickly + // Group 1 (end=120): empty + // Group 2 (end=110): empty + // Group 3 (end=100): has data [95,96,97,98,99] + // Group 4 (end=90): has data [85,86,87,88,89] + // With limit=4, we should get [99,98,97,96] from group 3 and stop early + let input_ranged_data = vec![ + ( + PartitionRange { + start: Timestamp::new(90, unit.into()), + end: Timestamp::new(120, unit.into()), + num_rows: 0, + identifier: 0, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(80, unit.into()), + end: Timestamp::new(110, unit.into()), + num_rows: 0, + identifier: 1, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(70, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 5, + identifier: 2, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![95, 96, 97, 98, 99])], + ) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(60, unit.into()), + end: Timestamp::new(90, unit.into()), + num_rows: 5, + identifier: 3, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])], + ) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(40, unit.into()), + end: Timestamp::new(80, unit.into()), + num_rows: 3, + identifier: 4, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![75, 76, 77])], + ) + .unwrap(), + ], + ), + ]; + + // With limit=4, descending: top 4 from group 3 are [99,98,97,96] + // Threshold is 96, next group's primary_end is 80 + // Since 96 >= 80, we should stop early after group 3 + let expected_output = Some( + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![99, 98, 97, 96])], + ) + .unwrap(), + ); + + run_test( + 4002, + input_ranged_data, + schema.clone(), + SortOptions { + descending: true, + ..Default::default() + }, + Some(4), + expected_output, + Some(13), // Read through all groups to detect boundaries correctly + ) + .await; + } + + /// Test empty ranges with exact boundary equality for early stop. + /// This verifies that empty ranges don't interfere with exact boundary conditions. + #[tokio::test] + async fn test_empty_ranges_exact_boundary_early_stop() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + + // Test case: Empty ranges with exact boundary equality + // Group 1 (end=100): empty range [80,100) and data range [70,100) with values up to 90 + // Group 2 (end=90): has data [85,86,87,88,89] + // With limit=3, threshold=98, next_group_end=90, since 98 >= 90, stop early + let input_ranged_data = vec![ + ( + PartitionRange { + start: Timestamp::new(80, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 0, + identifier: 0, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(70, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 4, + identifier: 1, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![96, 97, 98, 99])], + ) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(60, unit.into()), + end: Timestamp::new(90, unit.into()), + num_rows: 5, + identifier: 2, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])], + ) + .unwrap(), + ], + ), + ]; + + // With limit=3, descending: top 3 from group 1 are [99,98,97] + // Threshold is 98, next group's primary_end is 90 + // Since 98 >= 90, we should stop early + let expected_output = Some( + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![99, 98, 97])]) + .unwrap(), + ); + + run_test( + 4003, + input_ranged_data, + schema.clone(), + SortOptions { + descending: true, + ..Default::default() + }, + Some(3), + expected_output, + Some(9), // Read through both groups to detect boundaries correctly + ) + .await; + } + + /// Test that empty ranges in the middle of processing don't break early stop logic. + /// This ensures that empty ranges are properly skipped and don't affect threshold calculation. + #[tokio::test] + async fn test_empty_ranges_middle_processing_early_stop() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + + // Test case: Data, then empty ranges, then more data, then early stop + // Group 1 (end=100): has data [94,95,96,97,98] + // Group 2 (end=90): empty ranges [60,90) and [50,90) + // Group 3 (end=80): has data [75,76,77,78,79] + // With limit=4, we should get [98,97,96,95] from group 1 and stop early + let input_ranged_data = vec![ + ( + PartitionRange { + start: Timestamp::new(70, unit.into()), + end: Timestamp::new(100, unit.into()), + num_rows: 5, + identifier: 0, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])], + ) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(60, unit.into()), + end: Timestamp::new(90, unit.into()), + num_rows: 0, + identifier: 1, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(50, unit.into()), + end: Timestamp::new(90, unit.into()), + num_rows: 0, + identifier: 2, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])]) + .unwrap(), + ], + ), + ( + PartitionRange { + start: Timestamp::new(40, unit.into()), + end: Timestamp::new(80, unit.into()), + num_rows: 5, + identifier: 3, + }, + vec![ + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![75, 76, 77, 78, 79])], + ) + .unwrap(), + ], + ), + ]; + + // With limit=4, descending: top 4 from group 1 are [98,97,96,95] + // Threshold is 95, next group's primary_end after empty groups is 80 + // Since 95 >= 80, we should stop early after group 1 + let expected_output = Some( + DfRecordBatch::try_new( + schema.clone(), + vec![new_ts_array(unit, vec![98, 97, 96, 95])], + ) + .unwrap(), + ); + + run_test( + 4004, + input_ranged_data, + schema.clone(), + SortOptions { + descending: true, + ..Default::default() + }, + Some(4), + expected_output, + Some(10), // Read through all groups because it can't detect boundaries due to data distribution + ) + .await; + } }