mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
1 Commits
create-tab
...
fix-topk
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ac57368af |
@@ -869,8 +869,16 @@ impl PartSortStream {
|
||||
|
||||
// If we've processed all partitions, mark completion.
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
debug_assert!(remaining_range.num_rows() == 0);
|
||||
// If there is remaining data here, it means the input data doesn't match the
|
||||
// provided `PartitionRange`s (e.g. out-of-order input or mismatched ranges).
|
||||
// In release builds, the previous `debug_assert!` would silently drop data and
|
||||
// could lead to incorrect empty results. To keep query correctness, fall back
|
||||
// to consuming the remaining data as part of the last range.
|
||||
if remaining_range.num_rows() != 0 {
|
||||
self.push_buffer(remaining_range)?;
|
||||
}
|
||||
self.input_complete = true;
|
||||
self.evaluating_batch = None;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -937,7 +945,12 @@ impl PartSortStream {
|
||||
// If we've processed all partitions, sort and output
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
// assert there is no data beyond the last partition range (remaining is empty).
|
||||
debug_assert!(remaining_range.num_rows() == 0);
|
||||
// Similar to the TopK path, do not silently drop remaining data in release builds.
|
||||
// If this happens, the input stream doesn't match `PartitionRange`s; include the
|
||||
// remaining data for correctness.
|
||||
if remaining_range.num_rows() != 0 {
|
||||
self.push_buffer(remaining_range)?;
|
||||
}
|
||||
|
||||
// Sort and output the final group
|
||||
return self.sorted_buffer_if_non_empty();
|
||||
@@ -999,11 +1012,11 @@ impl PartSortStream {
|
||||
{
|
||||
// Check if we've already processed all partitions
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
// All partitions processed, discard remaining data
|
||||
if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
|
||||
return Poll::Ready(Some(Ok(sorted_batch)));
|
||||
}
|
||||
return Poll::Ready(None);
|
||||
// All partitions processed but we still have remaining data in-flight.
|
||||
// Don't discard it, otherwise we may incorrectly return an empty result.
|
||||
self.push_buffer(evaluating_batch)?;
|
||||
self.input_complete = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
|
||||
@@ -1431,6 +1444,47 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn topk_does_not_silently_drop_out_of_range_data() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// The input data is outside the provided PartitionRange.
|
||||
// Historically this could lead to an empty result in release builds due to
|
||||
// `debug_assert!`-only checks dropping the remaining batch.
|
||||
let input_ranged_data = vec![(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(0, common_time::timestamp::TimeUnit::from(&unit)),
|
||||
end: Timestamp::new(10, common_time::timestamp::TimeUnit::from(&unit)),
|
||||
num_rows: 1,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])])
|
||||
.unwrap(),
|
||||
],
|
||||
)];
|
||||
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])]).unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
0,
|
||||
input_ranged_data,
|
||||
schema,
|
||||
SortOptions::default(),
|
||||
Some(10),
|
||||
expected_output,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[allow(clippy::print_stdout)]
|
||||
async fn run_test(
|
||||
case_id: usize,
|
||||
|
||||
Reference in New Issue
Block a user