mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
@@ -2467,7 +2467,7 @@ mod test {
|
||||
/// Test early stop behavior when there's only one group (no next group).
|
||||
/// In this case, can_stop_early should return false and we should process all data.
|
||||
#[tokio::test]
|
||||
async fn test_early_stop_single_group() {
|
||||
async fn test_can_early_stop_single_group() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
@@ -2825,4 +2825,511 @@ mod test {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test early stop behavior with empty partition ranges at the beginning.
|
||||
/// This specifically tests the desc ordering case where empty ranges
|
||||
/// should not interfere with early termination logic.
|
||||
#[tokio::test]
|
||||
async fn test_early_stop_empty_ranges_beginning_desc() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Empty ranges at the beginning, then data ranges
|
||||
// For descending: primary_end is end, ranges should be ordered by end DESC
|
||||
// Group 1 (end=100): empty ranges [70,100) and [50,100) - both empty
|
||||
// Group 2 (end=80): has data [75,76,77,78]
|
||||
// With limit=2, we should get [78,77] and stop early since threshold=77 >= next_group_end
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(50, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(80, unit.into()),
|
||||
num_rows: 4,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![75, 76, 77, 78])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(40, unit.into()),
|
||||
end: Timestamp::new(70, unit.into()),
|
||||
num_rows: 3,
|
||||
identifier: 3,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![65, 66, 67])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// With limit=2, descending: top 2 from group 2 are [78, 77]
|
||||
// Threshold is 77, next group's primary_end is 70
|
||||
// Since 77 >= 70, we should stop early after group 2
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![78, 77])]).unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4000,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(2),
|
||||
expected_output,
|
||||
Some(7), // Must read until finding actual data and detecting early stop
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test early stop with empty ranges affecting threshold calculation.
|
||||
/// Empty ranges should be skipped and not affect the threshold-based early termination.
|
||||
#[tokio::test]
|
||||
async fn test_early_stop_empty_ranges_threshold_calculation() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Mix of empty and non-empty ranges
|
||||
// Group 1 (end=100): [70,100) has data, [50,100) is empty
|
||||
// Group 2 (end=90): [60,90) has data, [40,90) is empty
|
||||
// Group 3 (end=80): [30,80) has data
|
||||
// With limit=3 from group 1+2, threshold should be calculated correctly
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(50, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 4,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![84, 85, 86, 87])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(40, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 3,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(30, unit.into()),
|
||||
end: Timestamp::new(80, unit.into()),
|
||||
num_rows: 3,
|
||||
identifier: 4,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![75, 76, 77])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// Combined data from groups 1+2: [94,95,96,97,98, 84,85,86,87]
|
||||
// Sorted descending: [98,97,96,95,94, 87,86,85,84]
|
||||
// With limit=3: [98,97,96], threshold=96
|
||||
// Next group's primary_end is 80, since 96 >= 80, stop early
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![98, 97, 96])])
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4001,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(3),
|
||||
expected_output,
|
||||
Some(12), // Read through all groups to detect boundaries correctly
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test consecutive empty ranges with desc ordering that should trigger early stop.
|
||||
/// This tests the scenario where multiple empty ranges are processed before finding
|
||||
/// data, and then early stop kicks in correctly.
|
||||
#[tokio::test]
|
||||
async fn test_consecutive_empty_ranges_early_stop_desc() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Multiple consecutive empty ranges, then data, then early stop
|
||||
// Groups with empty ranges should be processed quickly
|
||||
// Group 1 (end=120): empty
|
||||
// Group 2 (end=110): empty
|
||||
// Group 3 (end=100): has data [95,96,97,98,99]
|
||||
// Group 4 (end=90): has data [85,86,87,88,89]
|
||||
// With limit=4, we should get [99,98,97,96] from group 3 and stop early
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(90, unit.into()),
|
||||
end: Timestamp::new(120, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(80, unit.into()),
|
||||
end: Timestamp::new(110, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![95, 96, 97, 98, 99])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 3,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(40, unit.into()),
|
||||
end: Timestamp::new(80, unit.into()),
|
||||
num_rows: 3,
|
||||
identifier: 4,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![75, 76, 77])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// With limit=4, descending: top 4 from group 3 are [99,98,97,96]
|
||||
// Threshold is 96, next group's primary_end is 80
|
||||
// Since 96 >= 80, we should stop early after group 3
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![99, 98, 97, 96])],
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4002,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(4),
|
||||
expected_output,
|
||||
Some(13), // Read through all groups to detect boundaries correctly
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test empty ranges with exact boundary equality for early stop.
|
||||
/// This verifies that empty ranges don't interfere with exact boundary conditions.
|
||||
#[tokio::test]
|
||||
async fn test_empty_ranges_exact_boundary_early_stop() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Empty ranges with exact boundary equality
|
||||
// Group 1 (end=100): empty range [80,100) and data range [70,100) with values up to 90
|
||||
// Group 2 (end=90): has data [85,86,87,88,89]
|
||||
// With limit=3, threshold=98, next_group_end=90, since 98 >= 90, stop early
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(80, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 4,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![96, 97, 98, 99])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// With limit=3, descending: top 3 from group 1 are [99,98,97]
|
||||
// Threshold is 98, next group's primary_end is 90
|
||||
// Since 98 >= 90, we should stop early
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![99, 98, 97])])
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4003,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(3),
|
||||
expected_output,
|
||||
Some(9), // Read through both groups to detect boundaries correctly
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test that empty ranges in the middle of processing don't break early stop logic.
|
||||
/// This ensures that empty ranges are properly skipped and don't affect threshold calculation.
|
||||
#[tokio::test]
|
||||
async fn test_empty_ranges_middle_processing_early_stop() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Data, then empty ranges, then more data, then early stop
|
||||
// Group 1 (end=100): has data [94,95,96,97,98]
|
||||
// Group 2 (end=90): empty ranges [60,90) and [50,90)
|
||||
// Group 3 (end=80): has data [75,76,77,78,79]
|
||||
// With limit=4, we should get [98,97,96,95] from group 1 and stop early
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(50, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(40, unit.into()),
|
||||
end: Timestamp::new(80, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 3,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![75, 76, 77, 78, 79])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// With limit=4, descending: top 4 from group 1 are [98,97,96,95]
|
||||
// Threshold is 95, next group's primary_end after empty groups is 80
|
||||
// Since 95 >= 80, we should stop early after group 1
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![98, 97, 96, 95])],
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4004,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(4),
|
||||
expected_output,
|
||||
Some(10), // Read through all groups because it can't detect boundaries due to data distribution
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user