Compare commits

...

1 Commits

Author SHA1 Message Date
Ruihang Xia
8ac57368af fix: handle remaining buf
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-17 20:36:54 +08:00

View File

@@ -869,8 +869,16 @@ impl PartSortStream {
// If we've processed all partitions, mark completion.
if self.cur_part_idx >= self.partition_ranges.len() {
debug_assert!(remaining_range.num_rows() == 0);
// If there is remaining data here, it means the input data doesn't match the
// provided `PartitionRange`s (e.g. out-of-order input or mismatched ranges).
// In release builds, the previous `debug_assert!` would silently drop data and
// could lead to incorrect empty results. To keep query correctness, fall back
// to consuming the remaining data as part of the last range.
if remaining_range.num_rows() != 0 {
self.push_buffer(remaining_range)?;
}
self.input_complete = true;
self.evaluating_batch = None;
return Ok(());
}
@@ -937,7 +945,12 @@ impl PartSortStream {
// If we've processed all partitions, sort and output
if self.cur_part_idx >= self.partition_ranges.len() {
// assert there is no data beyond the last partition range (remaining is empty).
debug_assert!(remaining_range.num_rows() == 0);
// Similar to the TopK path, do not silently drop remaining data in release builds.
// If this happens, the input stream doesn't match `PartitionRange`s; include the
// remaining data for correctness.
if remaining_range.num_rows() != 0 {
self.push_buffer(remaining_range)?;
}
// Sort and output the final group
return self.sorted_buffer_if_non_empty();
@@ -999,11 +1012,11 @@ impl PartSortStream {
{
// Check if we've already processed all partitions
if self.cur_part_idx >= self.partition_ranges.len() {
// All partitions processed, discard remaining data
if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
return Poll::Ready(Some(Ok(sorted_batch)));
}
return Poll::Ready(None);
// All partitions processed but we still have remaining data in-flight.
// Don't discard it, otherwise we may incorrectly return an empty result.
self.push_buffer(evaluating_batch)?;
self.input_complete = true;
continue;
}
if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
@@ -1431,6 +1444,47 @@ mod test {
}
}
#[tokio::test]
async fn topk_does_not_silently_drop_out_of_range_data() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// The input data is outside the provided PartitionRange.
// Historically this could lead to an empty result in release builds due to
// `debug_assert!`-only checks dropping the remaining batch.
let input_ranged_data = vec![(
PartitionRange {
start: Timestamp::new(0, common_time::timestamp::TimeUnit::from(&unit)),
end: Timestamp::new(10, common_time::timestamp::TimeUnit::from(&unit)),
num_rows: 1,
identifier: 0,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])])
.unwrap(),
],
)];
let expected_output = Some(
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])]).unwrap(),
);
run_test(
0,
input_ranged_data,
schema,
SortOptions::default(),
Some(10),
expected_output,
None,
)
.await;
}
#[allow(clippy::print_stdout)]
async fn run_test(
case_id: usize,