mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ac57368af |
@@ -869,8 +869,16 @@ impl PartSortStream {
|
|||||||
|
|
||||||
// If we've processed all partitions, mark completion.
|
// If we've processed all partitions, mark completion.
|
||||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||||
debug_assert!(remaining_range.num_rows() == 0);
|
// If there is remaining data here, it means the input data doesn't match the
|
||||||
|
// provided `PartitionRange`s (e.g. out-of-order input or mismatched ranges).
|
||||||
|
// In release builds, the previous `debug_assert!` would silently drop data and
|
||||||
|
// could lead to incorrect empty results. To keep query correctness, fall back
|
||||||
|
// to consuming the remaining data as part of the last range.
|
||||||
|
if remaining_range.num_rows() != 0 {
|
||||||
|
self.push_buffer(remaining_range)?;
|
||||||
|
}
|
||||||
self.input_complete = true;
|
self.input_complete = true;
|
||||||
|
self.evaluating_batch = None;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -937,7 +945,12 @@ impl PartSortStream {
|
|||||||
// If we've processed all partitions, sort and output
|
// If we've processed all partitions, sort and output
|
||||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||||
// assert there is no data beyond the last partition range (remaining is empty).
|
// assert there is no data beyond the last partition range (remaining is empty).
|
||||||
debug_assert!(remaining_range.num_rows() == 0);
|
// Similar to the TopK path, do not silently drop remaining data in release builds.
|
||||||
|
// If this happens, the input stream doesn't match `PartitionRange`s; include the
|
||||||
|
// remaining data for correctness.
|
||||||
|
if remaining_range.num_rows() != 0 {
|
||||||
|
self.push_buffer(remaining_range)?;
|
||||||
|
}
|
||||||
|
|
||||||
// Sort and output the final group
|
// Sort and output the final group
|
||||||
return self.sorted_buffer_if_non_empty();
|
return self.sorted_buffer_if_non_empty();
|
||||||
@@ -999,11 +1012,11 @@ impl PartSortStream {
|
|||||||
{
|
{
|
||||||
// Check if we've already processed all partitions
|
// Check if we've already processed all partitions
|
||||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||||
// All partitions processed, discard remaining data
|
// All partitions processed but we still have remaining data in-flight.
|
||||||
if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
|
// Don't discard it, otherwise we may incorrectly return an empty result.
|
||||||
return Poll::Ready(Some(Ok(sorted_batch)));
|
self.push_buffer(evaluating_batch)?;
|
||||||
}
|
self.input_complete = true;
|
||||||
return Poll::Ready(None);
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
|
if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
|
||||||
@@ -1431,6 +1444,47 @@ mod test {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn topk_does_not_silently_drop_out_of_range_data() {
|
||||||
|
let unit = TimeUnit::Millisecond;
|
||||||
|
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||||
|
"ts",
|
||||||
|
DataType::Timestamp(unit, None),
|
||||||
|
false,
|
||||||
|
)]));
|
||||||
|
|
||||||
|
// The input data is outside the provided PartitionRange.
|
||||||
|
// Historically this could lead to an empty result in release builds due to
|
||||||
|
// `debug_assert!`-only checks dropping the remaining batch.
|
||||||
|
let input_ranged_data = vec![(
|
||||||
|
PartitionRange {
|
||||||
|
start: Timestamp::new(0, common_time::timestamp::TimeUnit::from(&unit)),
|
||||||
|
end: Timestamp::new(10, common_time::timestamp::TimeUnit::from(&unit)),
|
||||||
|
num_rows: 1,
|
||||||
|
identifier: 0,
|
||||||
|
},
|
||||||
|
vec![
|
||||||
|
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])])
|
||||||
|
.unwrap(),
|
||||||
|
],
|
||||||
|
)];
|
||||||
|
|
||||||
|
let expected_output = Some(
|
||||||
|
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])]).unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
|
run_test(
|
||||||
|
0,
|
||||||
|
input_ranged_data,
|
||||||
|
schema,
|
||||||
|
SortOptions::default(),
|
||||||
|
Some(10),
|
||||||
|
expected_output,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::print_stdout)]
|
#[allow(clippy::print_stdout)]
|
||||||
async fn run_test(
|
async fn run_test(
|
||||||
case_id: usize,
|
case_id: usize,
|
||||||
|
|||||||
Reference in New Issue
Block a user