feat: poll result stream more often (#6599)

* feat: poll result stream more often

Signed-off-by: discord9 <discord9@163.com>

* refactor: cleanup match

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-07-29 11:59:59 +08:00
committed by GitHub
parent 086777d938
commit 4cfc878067

View File

@@ -445,31 +445,24 @@ impl WindowedSortStream {
// consume input stream
while !self.is_terminated {
// then we get a new RecordBatch from input stream
let new_input_rbs = match self.input.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => {
Some(split_batch_to_sorted_run(batch, &self.expression)?)
}
let SortedRunSet {
runs_with_batch,
sort_column,
} = match self.input.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
// input stream is done, we need to merge sort the remaining working set
self.is_terminated = true;
None
self.build_sorted_stream()?;
self.start_new_merge_sort()?;
break;
}
Poll::Pending => return Poll::Pending,
};
let Some(SortedRunSet {
runs_with_batch,
sort_column,
}) = new_input_rbs
else {
// input stream is done, we need to merge sort the remaining working set
self.build_sorted_stream()?;
self.start_new_merge_sort()?;
continue;
};
// The core logic to eargerly merge sort the working set
// compare with last_value to find boundary, then merge runs if needed
@@ -564,8 +557,18 @@ impl WindowedSortStream {
last_remaining = Some((sorted_rb, run_info));
}
}
// poll result stream again to see if we can emit more results
match self.poll_result_stream(cx) {
Poll::Ready(None) => {
if self.is_terminated {
return Poll::Ready(None);
}
}
x => return x,
};
}
// emit the merge result
// emit the merge result after terminated(all input stream is done)
self.poll_result_stream(cx)
}