From 4cfc878067bb674b5cbe52f19900b08fca2dd3f3 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 29 Jul 2025 11:59:59 +0800 Subject: [PATCH] feat: poll result stream more often (#6599) * feat: poll result stream more often Signed-off-by: discord9 * refactor: cleanup match Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/query/src/window_sort.rs | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index c8b9ece3a1..dab90c1da8 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -445,31 +445,24 @@ impl WindowedSortStream { // consume input stream while !self.is_terminated { // then we get a new RecordBatch from input stream - let new_input_rbs = match self.input.as_mut().poll_next(cx) { - Poll::Ready(Some(Ok(batch))) => { - Some(split_batch_to_sorted_run(batch, &self.expression)?) - } + let SortedRunSet { + runs_with_batch, + sort_column, + } = match self.input.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?, Poll::Ready(Some(Err(e))) => { return Poll::Ready(Some(Err(e))); } Poll::Ready(None) => { // input stream is done, we need to merge sort the remaining working set self.is_terminated = true; - None + self.build_sorted_stream()?; + self.start_new_merge_sort()?; + break; } Poll::Pending => return Poll::Pending, }; - let Some(SortedRunSet { - runs_with_batch, - sort_column, - }) = new_input_rbs - else { - // input stream is done, we need to merge sort the remaining working set - self.build_sorted_stream()?; - self.start_new_merge_sort()?; - continue; - }; // The core logic to eargerly merge sort the working set // compare with last_value to find boundary, then merge runs if needed @@ -564,8 +557,18 @@ impl WindowedSortStream { last_remaining = Some((sorted_rb, run_info)); } } + + // poll result stream again to see if we can emit more results + match self.poll_result_stream(cx) { + Poll::Ready(None) => { + if self.is_terminated { + return Poll::Ready(None); + } + } + x => return x, + }; } - // emit the merge result + // emit the merge result after terminated(all input stream is done) self.poll_result_stream(cx) }