From 8ac57368af98d2fc4e3d89999b1b3420cb584376 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 17 Dec 2025 20:36:54 +0800 Subject: [PATCH] fix: handle remaining buf Signed-off-by: Ruihang Xia --- src/query/src/part_sort.rs | 68 ++++++++++++++++++++++++++++++++++---- 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 36e4cc8463..7203130230 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -869,8 +869,16 @@ impl PartSortStream { // If we've processed all partitions, mark completion. if self.cur_part_idx >= self.partition_ranges.len() { - debug_assert!(remaining_range.num_rows() == 0); + // If there is remaining data here, it means the input data doesn't match the + // provided `PartitionRange`s (e.g. out-of-order input or mismatched ranges). + // In release builds, the previous `debug_assert!` would silently drop data and + // could lead to incorrect empty results. To keep query correctness, fall back + // to consuming the remaining data as part of the last range. + if remaining_range.num_rows() != 0 { + self.push_buffer(remaining_range)?; + } self.input_complete = true; + self.evaluating_batch = None; return Ok(()); } @@ -937,7 +945,12 @@ impl PartSortStream { // If we've processed all partitions, sort and output if self.cur_part_idx >= self.partition_ranges.len() { // assert there is no data beyond the last partition range (remaining is empty). - debug_assert!(remaining_range.num_rows() == 0); + // Similar to the TopK path, do not silently drop remaining data in release builds. + // If this happens, the input stream doesn't match `PartitionRange`s; include the + // remaining data for correctness. + if remaining_range.num_rows() != 0 { + self.push_buffer(remaining_range)?; + } // Sort and output the final group return self.sorted_buffer_if_non_empty(); @@ -999,11 +1012,11 @@ impl PartSortStream { { // Check if we've already processed all partitions if self.cur_part_idx >= self.partition_ranges.len() { - // All partitions processed, discard remaining data - if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? { - return Poll::Ready(Some(Ok(sorted_batch))); - } - return Poll::Ready(None); + // All partitions processed but we still have remaining data in-flight. + // Don't discard it, otherwise we may incorrectly return an empty result. + self.push_buffer(evaluating_batch)?; + self.input_complete = true; + continue; } if let Some(sorted_batch) = self.split_batch(evaluating_batch)? { @@ -1431,6 +1444,47 @@ mod test { } } + #[tokio::test] + async fn topk_does_not_silently_drop_out_of_range_data() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + + // The input data is outside the provided PartitionRange. + // Historically this could lead to an empty result in release builds due to + // `debug_assert!`-only checks dropping the remaining batch. + let input_ranged_data = vec![( + PartitionRange { + start: Timestamp::new(0, common_time::timestamp::TimeUnit::from(&unit)), + end: Timestamp::new(10, common_time::timestamp::TimeUnit::from(&unit)), + num_rows: 1, + identifier: 0, + }, + vec![ + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])]) + .unwrap(), + ], + )]; + + let expected_output = Some( + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])]).unwrap(), + ); + + run_test( + 0, + input_ranged_data, + schema, + SortOptions::default(), + Some(10), + expected_output, + None, + ) + .await; + } + #[allow(clippy::print_stdout)] async fn run_test( case_id: usize,