fix: close issue #7457 guard against empty buffer (#7458)

* fix: close issue #7457 guard against empty buffer

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: add unittests for it

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

---------

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
This commit is contained in:
yihong
2025-12-23 11:11:00 +08:00
committed by GitHub
parent 6a6b34c709
commit 342eb47e19

View File

@@ -559,6 +559,15 @@ impl PartSortStream {
}
// else check if last value in topk is not in next group range
let topk_buffer = self.sort_top_buffer()?;
// Guard against empty buffer - this can happen if TopK's internal filtering
// removed all rows, or if the buffer was cleared. In this case, we cannot
// determine if we can stop early, so continue processing.
// Fixes: https://github.com/orgs/GreptimeTeam/discussions/7457
if topk_buffer.num_rows() == 0 {
return Ok(false);
}
let min_batch = topk_buffer.slice(topk_buffer.num_rows() - 1, 1);
let min_sort_column = self.expression.evaluate_to_sort_column(&min_batch)?.values;
let last_val = downcast_ts_array!(
@@ -1067,6 +1076,60 @@ mod test {
use super::*;
use crate::test_util::{MockInputExec, new_ts_array};
#[tokio::test]
async fn test_can_stop_early_with_empty_topk_buffer() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// Build a minimal PartSortExec and stream, but inject a dynamic filter that
// always evaluates to false so TopK will filter out all rows internally.
let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone()));
let exec = PartSortExec::try_new(
PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)),
options: SortOptions {
descending: true,
..Default::default()
},
},
Some(3),
vec![vec![]],
mock_input.clone(),
)
.unwrap();
let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(false)),
))));
let input_stream = mock_input
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
let mut stream = PartSortStream::new(
Arc::new(TaskContext::default()),
&exec,
Some(3),
input_stream,
vec![],
0,
Some(filter),
)
.unwrap();
// Push 3 rows so the external counter reaches `limit`, while TopK keeps no rows.
let batch = DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
.unwrap();
stream.push_buffer(batch).unwrap();
// The TopK result buffer is empty, so we cannot determine early-stop.
// Ensure this path returns `Ok(false)` (and, importantly, does not panic).
assert!(!stream.can_stop_early().unwrap());
}
#[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"]
#[tokio::test]
async fn fuzzy_test() {