diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 36e4cc8463..7a6fa18836 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -559,6 +559,15 @@ impl PartSortStream { } // else check if last value in topk is not in next group range let topk_buffer = self.sort_top_buffer()?; + + // Guard against empty buffer - this can happen if TopK's internal filtering + // removed all rows, or if the buffer was cleared. In this case, we cannot + // determine if we can stop early, so continue processing. + // Fixes: https://github.com/orgs/GreptimeTeam/discussions/7457 + if topk_buffer.num_rows() == 0 { + return Ok(false); + } + let min_batch = topk_buffer.slice(topk_buffer.num_rows() - 1, 1); let min_sort_column = self.expression.evaluate_to_sort_column(&min_batch)?.values; let last_val = downcast_ts_array!( @@ -1067,6 +1076,60 @@ mod test { use super::*; use crate::test_util::{MockInputExec, new_ts_array}; + #[tokio::test] + async fn test_can_stop_early_with_empty_topk_buffer() { + let unit = TimeUnit::Millisecond; + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit, None), + false, + )])); + + // Build a minimal PartSortExec and stream, but inject a dynamic filter that + // always evaluates to false so TopK will filter out all rows internally. + let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone())); + let exec = PartSortExec::try_new( + PhysicalSortExpr { + expr: Arc::new(Column::new("ts", 0)), + options: SortOptions { + descending: true, + ..Default::default() + }, + }, + Some(3), + vec![vec![]], + mock_input.clone(), + ) + .unwrap(); + + let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( + DynamicFilterPhysicalExpr::new(vec![], lit(false)), + )))); + + let input_stream = mock_input + .execute(0, Arc::new(TaskContext::default())) + .unwrap(); + let mut stream = PartSortStream::new( + Arc::new(TaskContext::default()), + &exec, + Some(3), + input_stream, + vec![], + 0, + Some(filter), + ) + .unwrap(); + + // Push 3 rows so the external counter reaches `limit`, while TopK keeps no rows. + let batch = DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])]) + .unwrap(); + stream.push_buffer(batch).unwrap(); + + // The TopK result buffer is empty, so we cannot determine early-stop. + // Ensure this path returns `Ok(false)` (and, importantly, does not panic). + assert!(!stream.can_stop_early().unwrap()); + } + #[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"] #[tokio::test] async fn fuzzy_test() {