From d4aa4159d4f451100df90bf2cfa82403f691c54a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 4 Nov 2024 19:33:07 +0800 Subject: [PATCH] feat: support windowed sort with where condition Signed-off-by: Ruihang Xia --- src/query/src/optimizer/windowed_sort.rs | 10 ++- src/query/src/part_sort.rs | 88 ++++++++++++++++++++++-- src/query/src/window_sort.rs | 11 ++- 3 files changed, 101 insertions(+), 8 deletions(-) diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index eb6c6c2f66..638c3d9d39 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -152,11 +152,11 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() - || plan.as_any().is::() || plan.as_any().is::() || plan.as_any().is::() || plan.as_any().is::() @@ -164,13 +164,19 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() { + is_batch_coalesced = true; + } + if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges()); time_index = Some(region_scan_exec.time_index()); tag_columns = Some(region_scan_exec.tag_columns()); // set distinguish_partition_ranges to true, this is an incorrect workaround - region_scan_exec.with_distinguish_partition_range(true); + if !is_batch_coalesced { + region_scan_exec.with_distinguish_partition_range(true); + } } Ok(Transformed::no(plan)) diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 2828db202d..3df31a64e5 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -12,6 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Module for sorting input data within each [`PartitionRange`]. +//! +//! This module defines the [`PartSortExec`] execution plan, which sorts each +//! partition ([`PartitionRange`]) independently based on the provided physical +//! sort expressions. + use std::any::Any; use std::pin::Pin; use std::sync::Arc; @@ -36,7 +42,7 @@ use itertools::Itertools; use snafu::location; use store_api::region_engine::PartitionRange; -use crate::downcast_ts_array; +use crate::{array_iter_helper, downcast_ts_array}; /// Sort input within given PartitionRange /// @@ -288,9 +294,51 @@ impl PartSortStream { Ok(()) } + /// Try find data whose value exceeds the current partition range. + /// + /// Returns `None` if no such data is found, and `Some(idx)` where idx points to + /// the first data that exceeds the current partition range. + fn try_find_next_range( + &self, + sort_column: &ArrayRef, + ) -> datafusion_common::Result> { + if sort_column.len() == 0 { + return Ok(Some(0)); + } + + // check if the current partition index is out of range + if self.cur_part_idx >= self.partition_ranges.len() { + internal_err!( + "Partition index out of range: {} >= {}", + self.cur_part_idx, + self.partition_ranges.len() + )?; + } + let cur_range = self.partition_ranges[self.cur_part_idx]; + + let sort_column_iter = downcast_ts_array!( + sort_column.data_type() => (array_iter_helper, sort_column), + _ => internal_err!( + "Unsupported data type for sort column: {:?}", + sort_column.data_type() + )?, + ); + + for (idx, val) in sort_column_iter { + // ignore vacant time index data + if let Some(val) = val { + if val >= cur_range.end.value() || val < cur_range.start.value() { + return Ok(Some(idx)); + } + } + } + + Ok(None) + } + /// Sort and clear the buffer and return the sorted record batch /// - /// this function should return a empty record batch if the buffer is empty + /// this function will return a empty record batch if the buffer is empty fn sort_buffer(&mut self) -> datafusion_common::Result { if self.buffer.is_empty() { return Ok(DfRecordBatch::new_empty(self.schema.clone())); @@ -317,6 +365,9 @@ impl PartSortStream { Some(format!("Fail to sort to indices at {}", location!())), ) })?; + if indices.is_empty() { + return Ok(DfRecordBatch::new_empty(self.schema.clone())); + } self.check_in_range( &sort_column, @@ -379,6 +430,7 @@ impl PartSortStream { cx: &mut Context<'_>, ) -> Poll>> { loop { + // no more input, sort the buffer and return if self.input_complete { if self.buffer.is_empty() { return Poll::Ready(None); @@ -386,19 +438,47 @@ impl PartSortStream { return Poll::Ready(Some(self.sort_buffer())); } } + + // fetch next batch from input let res = self.input.as_mut().poll_next(cx); match res { Poll::Ready(Some(Ok(batch))) => { - if batch.num_rows() == 0 { + let sort_column = self + .expression + .expr + .evaluate(&batch)? + .into_array(batch.num_rows())?; + let next_range_idx = self.try_find_next_range(&sort_column)?; + // `Some` means the current range is finished, split the batch into two parts and sort + if let Some(idx) = next_range_idx { + let this_range = batch.slice(0, idx); + let next_range = batch.slice(idx, batch.num_rows() - idx); + if this_range.num_rows() != 0 { + self.buffer.push(this_range); + } // mark end of current PartitionRange let sorted_batch = self.sort_buffer()?; - self.cur_part_idx += 1; + let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx); + // step to next proper PartitionRange + loop { + self.cur_part_idx += 1; + if next_sort_column.is_empty() + || self.try_find_next_range(&next_sort_column)?.is_none() + { + break; + } + } + // push the next range to the buffer + if next_range.num_rows() != 0 { + self.buffer.push(next_range); + } if sorted_batch.num_rows() == 0 { // Current part is empty, continue polling next part. continue; } return Poll::Ready(Some(Ok(sorted_batch))); } + self.buffer.push(batch); // keep polling until boundary(a empty RecordBatch) is reached continue; diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index 435a255beb..1d03eedc13 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -21,7 +21,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::array::{Array, ArrayRef, PrimitiveArray}; +use arrow::array::{Array, ArrayRef}; use arrow::compute::SortColumn; use arrow_schema::{DataType, SchemaRef, SortOptions}; use common_error::ext::{BoxedError, PlainError}; @@ -812,9 +812,16 @@ fn find_slice_from_range( Ok((start, end - start)) } +/// Get an iterator from a primitive array. +/// +/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`. +#[macro_export] macro_rules! array_iter_helper { ($t:ty, $unit:expr, $arr:expr) => {{ - let typed = $arr.as_any().downcast_ref::>().unwrap(); + let typed = $arr + .as_any() + .downcast_ref::>() + .unwrap(); let iter = typed.iter().enumerate(); Box::new(iter) as Box)>> }};