mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
Compare commits
2 Commits
feat/inges
...
ec77a5d53a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec77a5d53a | ||
|
|
dbad96eb80 |
@@ -535,6 +535,7 @@ impl PartSortStream {
|
||||
}
|
||||
|
||||
fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
|
||||
rb_sanity_check(&batch)?;
|
||||
match &mut self.buffer {
|
||||
PartSortBuffer::All(v) => v.push(batch),
|
||||
PartSortBuffer::Top(top, cnt) => {
|
||||
@@ -559,7 +560,11 @@ impl PartSortStream {
|
||||
}
|
||||
// else check if last value in topk is not in next group range
|
||||
let topk_buffer = self.sort_top_buffer()?;
|
||||
assert_eq!(topk_buffer.num_rows(), self.limit.unwrap());
|
||||
assert!(topk_buffer.num_rows() >= 1);
|
||||
rb_sanity_check(&topk_buffer)?;
|
||||
let min_batch = topk_buffer.slice(topk_buffer.num_rows() - 1, 1);
|
||||
rb_sanity_check(&min_batch)?;
|
||||
let min_sort_column = self.expression.evaluate_to_sort_column(&min_batch)?.values;
|
||||
let last_val = downcast_ts_array!(
|
||||
min_sort_column.data_type() => (ts_to_timestamp, min_sort_column),
|
||||
@@ -1049,6 +1054,22 @@ impl RecordBatchStream for PartSortStream {
|
||||
}
|
||||
}
|
||||
|
||||
fn rb_sanity_check(batch: &DfRecordBatch) -> datafusion_common::Result<()> {
|
||||
let row_cnt = batch.num_rows();
|
||||
for column in batch.columns() {
|
||||
if column.len() != row_cnt {
|
||||
internal_err!(
|
||||
"RecordBatch column length mismatch: expected {}, found {} at {} for column {:?}",
|
||||
row_cnt,
|
||||
column.len(),
|
||||
snafu::location!(),
|
||||
column
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
@@ -2467,7 +2488,7 @@ mod test {
|
||||
/// Test early stop behavior when there's only one group (no next group).
|
||||
/// In this case, can_stop_early should return false and we should process all data.
|
||||
#[tokio::test]
|
||||
async fn test_early_stop_single_group() {
|
||||
async fn test_can_early_stop_single_group() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
@@ -2825,4 +2846,511 @@ mod test {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test early stop behavior with empty partition ranges at the beginning.
|
||||
/// This specifically tests the desc ordering case where empty ranges
|
||||
/// should not interfere with early termination logic.
|
||||
#[tokio::test]
|
||||
async fn test_early_stop_empty_ranges_beginning_desc() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Empty ranges at the beginning, then data ranges
|
||||
// For descending: primary_end is end, ranges should be ordered by end DESC
|
||||
// Group 1 (end=100): empty ranges [70,100) and [50,100) - both empty
|
||||
// Group 2 (end=80): has data [75,76,77,78]
|
||||
// With limit=2, we should get [78,77] and stop early since threshold=77 >= next_group_end
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(50, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(80, unit.into()),
|
||||
num_rows: 4,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![75, 76, 77, 78])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(40, unit.into()),
|
||||
end: Timestamp::new(70, unit.into()),
|
||||
num_rows: 3,
|
||||
identifier: 3,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![65, 66, 67])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// With limit=2, descending: top 2 from group 2 are [78, 77]
|
||||
// Threshold is 77, next group's primary_end is 70
|
||||
// Since 77 >= 70, we should stop early after group 2
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![78, 77])]).unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4000,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(2),
|
||||
expected_output,
|
||||
Some(7), // Must read until finding actual data and detecting early stop
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test early stop with empty ranges affecting threshold calculation.
|
||||
/// Empty ranges should be skipped and not affect the threshold-based early termination.
|
||||
#[tokio::test]
|
||||
async fn test_early_stop_empty_ranges_threshold_calculation() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Mix of empty and non-empty ranges
|
||||
// Group 1 (end=100): [70,100) has data, [50,100) is empty
|
||||
// Group 2 (end=90): [60,90) has data, [40,90) is empty
|
||||
// Group 3 (end=80): [30,80) has data
|
||||
// With limit=3 from group 1+2, threshold should be calculated correctly
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(50, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 4,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![84, 85, 86, 87])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(40, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 3,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(30, unit.into()),
|
||||
end: Timestamp::new(80, unit.into()),
|
||||
num_rows: 3,
|
||||
identifier: 4,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![75, 76, 77])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// Combined data from groups 1+2: [94,95,96,97,98, 84,85,86,87]
|
||||
// Sorted descending: [98,97,96,95,94, 87,86,85,84]
|
||||
// With limit=3: [98,97,96], threshold=96
|
||||
// Next group's primary_end is 80, since 96 >= 80, stop early
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![98, 97, 96])])
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4001,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(3),
|
||||
expected_output,
|
||||
Some(12), // Read through all groups to detect boundaries correctly
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test consecutive empty ranges with desc ordering that should trigger early stop.
|
||||
/// This tests the scenario where multiple empty ranges are processed before finding
|
||||
/// data, and then early stop kicks in correctly.
|
||||
#[tokio::test]
|
||||
async fn test_consecutive_empty_ranges_early_stop_desc() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Multiple consecutive empty ranges, then data, then early stop
|
||||
// Groups with empty ranges should be processed quickly
|
||||
// Group 1 (end=120): empty
|
||||
// Group 2 (end=110): empty
|
||||
// Group 3 (end=100): has data [95,96,97,98,99]
|
||||
// Group 4 (end=90): has data [85,86,87,88,89]
|
||||
// With limit=4, we should get [99,98,97,96] from group 3 and stop early
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(90, unit.into()),
|
||||
end: Timestamp::new(120, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(80, unit.into()),
|
||||
end: Timestamp::new(110, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![95, 96, 97, 98, 99])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 3,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(40, unit.into()),
|
||||
end: Timestamp::new(80, unit.into()),
|
||||
num_rows: 3,
|
||||
identifier: 4,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![75, 76, 77])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// With limit=4, descending: top 4 from group 3 are [99,98,97,96]
|
||||
// Threshold is 96, next group's primary_end is 80
|
||||
// Since 96 >= 80, we should stop early after group 3
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![99, 98, 97, 96])],
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4002,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(4),
|
||||
expected_output,
|
||||
Some(13), // Read through all groups to detect boundaries correctly
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test empty ranges with exact boundary equality for early stop.
|
||||
/// This verifies that empty ranges don't interfere with exact boundary conditions.
|
||||
#[tokio::test]
|
||||
async fn test_empty_ranges_exact_boundary_early_stop() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Empty ranges with exact boundary equality
|
||||
// Group 1 (end=100): empty range [80,100) and data range [70,100) with values up to 90
|
||||
// Group 2 (end=90): has data [85,86,87,88,89]
|
||||
// With limit=3, threshold=98, next_group_end=90, since 98 >= 90, stop early
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(80, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 4,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![96, 97, 98, 99])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![85, 86, 87, 88, 89])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// With limit=3, descending: top 3 from group 1 are [99,98,97]
|
||||
// Threshold is 98, next group's primary_end is 90
|
||||
// Since 98 >= 90, we should stop early
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![99, 98, 97])])
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4003,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(3),
|
||||
expected_output,
|
||||
Some(9), // Read through both groups to detect boundaries correctly
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Test that empty ranges in the middle of processing don't break early stop logic.
|
||||
/// This ensures that empty ranges are properly skipped and don't affect threshold calculation.
|
||||
#[tokio::test]
|
||||
async fn test_empty_ranges_middle_processing_early_stop() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// Test case: Data, then empty ranges, then more data, then early stop
|
||||
// Group 1 (end=100): has data [94,95,96,97,98]
|
||||
// Group 2 (end=90): empty ranges [60,90) and [50,90)
|
||||
// Group 3 (end=80): has data [75,76,77,78,79]
|
||||
// With limit=4, we should get [98,97,96,95] from group 1 and stop early
|
||||
let input_ranged_data = vec![
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(70, unit.into()),
|
||||
end: Timestamp::new(100, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![94, 95, 96, 97, 98])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(60, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 1,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(50, unit.into()),
|
||||
end: Timestamp::new(90, unit.into()),
|
||||
num_rows: 0,
|
||||
identifier: 2,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(40, unit.into()),
|
||||
end: Timestamp::new(80, unit.into()),
|
||||
num_rows: 5,
|
||||
identifier: 3,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![75, 76, 77, 78, 79])],
|
||||
)
|
||||
.unwrap(),
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
// With limit=4, descending: top 4 from group 1 are [98,97,96,95]
|
||||
// Threshold is 95, next group's primary_end after empty groups is 80
|
||||
// Since 95 >= 80, we should stop early after group 1
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![new_ts_array(unit, vec![98, 97, 96, 95])],
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
4004,
|
||||
input_ranged_data,
|
||||
schema.clone(),
|
||||
SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
Some(4),
|
||||
expected_output,
|
||||
Some(10), // Read through all groups because it can't detect boundaries due to data distribution
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user