Compare commits

...

2 Commits

Author SHA1 Message Date
discord9
ec77a5d53a sanity check
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 13:54:19 +08:00
discord9
dbad96eb80 more test
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 13:44:00 +08:00

View File

@@ -535,6 +535,7 @@ impl PartSortStream {
}
fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
rb_sanity_check(&batch)?;
match &mut self.buffer {
PartSortBuffer::All(v) => v.push(batch),
PartSortBuffer::Top(top, cnt) => {
@@ -559,7 +560,11 @@ impl PartSortStream {
}
// else check if last value in topk is not in next group range
let topk_buffer = self.sort_top_buffer()?;
assert_eq!(topk_buffer.num_rows(), self.limit.unwrap());
assert!(topk_buffer.num_rows() >= 1);
rb_sanity_check(&topk_buffer)?;
let min_batch = topk_buffer.slice(topk_buffer.num_rows() - 1, 1);
rb_sanity_check(&min_batch)?;
let min_sort_column = self.expression.evaluate_to_sort_column(&min_batch)?.values;
let last_val = downcast_ts_array!(
min_sort_column.data_type() => (ts_to_timestamp, min_sort_column),
@@ -1049,6 +1054,22 @@ impl RecordBatchStream for PartSortStream {
}
}
fn rb_sanity_check(batch: &DfRecordBatch) -> datafusion_common::Result<()> {
let row_cnt = batch.num_rows();
for column in batch.columns() {
if column.len() != row_cnt {
internal_err!(
"RecordBatch column length mismatch: expected {}, found {} at {} for column {:?}",
row_cnt,
column.len(),
snafu::location!(),
column
)?;
}
}
Ok(())
}
#[cfg(test)]
mod test {
use std::sync::Arc;
@@ -2467,7 +2488,7 @@ mod test {
/// Test early stop behavior when there's only one group (no next group).
/// In this case, can_stop_early should return false and we should process all data.
#[tokio::test]
async fn test_early_stop_single_group() {
async fn test_can_early_stop_single_group() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
@@ -2825,4 +2846,511 @@ mod test {
)
.await;
}
/// Test early stop behavior with empty partition ranges at the beginning.
/// This specifically tests the desc ordering case where empty ranges
/// should not interfere with early termination logic.
#[tokio::test]
async fn test_early_stop_empty_ranges_beginning_desc() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// Test case: Empty ranges at the beginning, then data ranges
// For descending: primary_end is end, ranges should be ordered by end DESC
// Group 1 (end=100): empty ranges [70,100) and [50,100) - both empty
// Group 2 (end=80): has data [75,76,77,78]
// With limit=2, we should get [78,77] and stop early since threshold=77 >= next_group_end
let input_ranged_data = vec![
(
PartitionRange {
start: Timestamp::new(70, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 0,
identifier: 0,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(50, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 0,
identifier: 1,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(60, unit.into()),
end: Timestamp::new(80, unit.into()),
num_rows: 4,
identifier: 2,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![75, 76, 77, 78])],
)
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(40, unit.into()),
end: Timestamp::new(70, unit.into()),
num_rows: 3,
identifier: 3,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![65, 66, 67])],
)
.unwrap(),
],
),
];
// With limit=2, descending: top 2 from group 2 are [78, 77]
// Threshold is 77, next group's primary_end is 70
// Since 77 >= 70, we should stop early after group 2
let expected_output = Some(
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![78, 77])]).unwrap(),
);
run_test(
4000,
input_ranged_data,
schema.clone(),
SortOptions {
descending: true,
..Default::default()
},
Some(2),
expected_output,
Some(7), // Must read until finding actual data and detecting early stop
)
.await;
}
/// Test early stop with empty ranges affecting threshold calculation.
/// Empty ranges should be skipped and not affect the threshold-based early termination.
#[tokio::test]
async fn test_early_stop_empty_ranges_threshold_calculation() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// Test case: Mix of empty and non-empty ranges
// Group 1 (end=100): [70,100) has data, [50,100) is empty
// Group 2 (end=90): [60,90) has data, [40,90) is empty
// Group 3 (end=80): [30,80) has data
// With limit=3 from group 1+2, threshold should be calculated correctly
let input_ranged_data = vec![
(
PartitionRange {
start: Timestamp::new(70, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 5,
identifier: 0,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])],
)
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(50, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 0,
identifier: 1,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(60, unit.into()),
end: Timestamp::new(90, unit.into()),
num_rows: 4,
identifier: 2,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![84, 85, 86, 87])],
)
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(40, unit.into()),
end: Timestamp::new(90, unit.into()),
num_rows: 0,
identifier: 3,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(30, unit.into()),
end: Timestamp::new(80, unit.into()),
num_rows: 3,
identifier: 4,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![75, 76, 77])],
)
.unwrap(),
],
),
];
// Combined data from groups 1+2: [94,95,96,97,98, 84,85,86,87]
// Sorted descending: [98,97,96,95,94, 87,86,85,84]
// With limit=3: [98,97,96], threshold=96
// Next group's primary_end is 80, since 96 >= 80, stop early
let expected_output = Some(
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![98, 97, 96])])
.unwrap(),
);
run_test(
4001,
input_ranged_data,
schema.clone(),
SortOptions {
descending: true,
..Default::default()
},
Some(3),
expected_output,
Some(12), // Read through all groups to detect boundaries correctly
)
.await;
}
/// Test consecutive empty ranges with desc ordering that should trigger early stop.
/// This tests the scenario where multiple empty ranges are processed before finding
/// data, and then early stop kicks in correctly.
#[tokio::test]
async fn test_consecutive_empty_ranges_early_stop_desc() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// Test case: Multiple consecutive empty ranges, then data, then early stop
// Groups with empty ranges should be processed quickly
// Group 1 (end=120): empty
// Group 2 (end=110): empty
// Group 3 (end=100): has data [95,96,97,98,99]
// Group 4 (end=90): has data [85,86,87,88,89]
// With limit=4, we should get [99,98,97,96] from group 3 and stop early
let input_ranged_data = vec![
(
PartitionRange {
start: Timestamp::new(90, unit.into()),
end: Timestamp::new(120, unit.into()),
num_rows: 0,
identifier: 0,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(80, unit.into()),
end: Timestamp::new(110, unit.into()),
num_rows: 0,
identifier: 1,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(70, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 5,
identifier: 2,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![95, 96, 97, 98, 99])],
)
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(60, unit.into()),
end: Timestamp::new(90, unit.into()),
num_rows: 5,
identifier: 3,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])],
)
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(40, unit.into()),
end: Timestamp::new(80, unit.into()),
num_rows: 3,
identifier: 4,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![75, 76, 77])],
)
.unwrap(),
],
),
];
// With limit=4, descending: top 4 from group 3 are [99,98,97,96]
// Threshold is 96, next group's primary_end is 80
// Since 96 >= 80, we should stop early after group 3
let expected_output = Some(
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![99, 98, 97, 96])],
)
.unwrap(),
);
run_test(
4002,
input_ranged_data,
schema.clone(),
SortOptions {
descending: true,
..Default::default()
},
Some(4),
expected_output,
Some(13), // Read through all groups to detect boundaries correctly
)
.await;
}
/// Test empty ranges with exact boundary equality for early stop.
/// This verifies that empty ranges don't interfere with exact boundary conditions.
#[tokio::test]
async fn test_empty_ranges_exact_boundary_early_stop() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// Test case: Empty ranges with exact boundary equality
// Group 1 (end=100): empty range [80,100) and data range [70,100) with values up to 90
// Group 2 (end=90): has data [85,86,87,88,89]
// With limit=3, threshold=98, next_group_end=90, since 98 >= 90, stop early
let input_ranged_data = vec![
(
PartitionRange {
start: Timestamp::new(80, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 0,
identifier: 0,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(70, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 4,
identifier: 1,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![96, 97, 98, 99])],
)
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(60, unit.into()),
end: Timestamp::new(90, unit.into()),
num_rows: 5,
identifier: 2,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])],
)
.unwrap(),
],
),
];
// With limit=3, descending: top 3 from group 1 are [99,98,97]
// Threshold is 98, next group's primary_end is 90
// Since 98 >= 90, we should stop early
let expected_output = Some(
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![99, 98, 97])])
.unwrap(),
);
run_test(
4003,
input_ranged_data,
schema.clone(),
SortOptions {
descending: true,
..Default::default()
},
Some(3),
expected_output,
Some(9), // Read through both groups to detect boundaries correctly
)
.await;
}
/// Test that empty ranges in the middle of processing don't break early stop logic.
/// This ensures that empty ranges are properly skipped and don't affect threshold calculation.
#[tokio::test]
async fn test_empty_ranges_middle_processing_early_stop() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// Test case: Data, then empty ranges, then more data, then early stop
// Group 1 (end=100): has data [94,95,96,97,98]
// Group 2 (end=90): empty ranges [60,90) and [50,90)
// Group 3 (end=80): has data [75,76,77,78,79]
// With limit=4, we should get [98,97,96,95] from group 1 and stop early
let input_ranged_data = vec![
(
PartitionRange {
start: Timestamp::new(70, unit.into()),
end: Timestamp::new(100, unit.into()),
num_rows: 5,
identifier: 0,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])],
)
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(60, unit.into()),
end: Timestamp::new(90, unit.into()),
num_rows: 0,
identifier: 1,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(50, unit.into()),
end: Timestamp::new(90, unit.into()),
num_rows: 0,
identifier: 2,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
.unwrap(),
],
),
(
PartitionRange {
start: Timestamp::new(40, unit.into()),
end: Timestamp::new(80, unit.into()),
num_rows: 5,
identifier: 3,
},
vec![
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![75, 76, 77, 78, 79])],
)
.unwrap(),
],
),
];
// With limit=4, descending: top 4 from group 1 are [98,97,96,95]
// Threshold is 95, next group's primary_end after empty groups is 80
// Since 95 >= 80, we should stop early after group 1
let expected_output = Some(
DfRecordBatch::try_new(
schema.clone(),
vec![new_ts_array(unit, vec![98, 97, 96, 95])],
)
.unwrap(),
);
run_test(
4004,
input_ranged_data,
schema.clone(),
SortOptions {
descending: true,
..Default::default()
},
Some(4),
expected_output,
Some(10), // Read through all groups because it can't detect boundaries due to data distribution
)
.await;
}
}