feat: support windowed sort with where condition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-11-04 19:33:07 +08:00
parent 960f6d821b
commit d4aa4159d4
3 changed files with 101 additions and 8 deletions

View File

@@ -152,11 +152,11 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
let mut partition_ranges = None;
let mut time_index = None;
let mut tag_columns = None;
let mut is_batch_coalesced = false;
input.transform_up(|plan| {
// Unappliable case, reset the state.
if plan.as_any().is::<RepartitionExec>()
|| plan.as_any().is::<CoalesceBatchesExec>()
|| plan.as_any().is::<CoalescePartitionsExec>()
|| plan.as_any().is::<SortExec>()
|| plan.as_any().is::<WindowedSortExec>()
@@ -164,13 +164,19 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
partition_ranges = None;
}
if plan.as_any().is::<CoalesceBatchesExec>() {
is_batch_coalesced = true;
}
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = Some(region_scan_exec.time_index());
tag_columns = Some(region_scan_exec.tag_columns());
// set distinguish_partition_ranges to true, this is an incorrect workaround
region_scan_exec.with_distinguish_partition_range(true);
if !is_batch_coalesced {
region_scan_exec.with_distinguish_partition_range(true);
}
}
Ok(Transformed::no(plan))

View File

@@ -12,6 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Module for sorting input data within each [`PartitionRange`].
//!
//! This module defines the [`PartSortExec`] execution plan, which sorts each
//! partition ([`PartitionRange`]) independently based on the provided physical
//! sort expressions.
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
@@ -36,7 +42,7 @@ use itertools::Itertools;
use snafu::location;
use store_api::region_engine::PartitionRange;
use crate::downcast_ts_array;
use crate::{array_iter_helper, downcast_ts_array};
/// Sort input within given PartitionRange
///
@@ -288,9 +294,51 @@ impl PartSortStream {
Ok(())
}
/// Try find data whose value exceeds the current partition range.
///
/// Returns `None` if no such data is found, and `Some(idx)` where idx points to
/// the first data that exceeds the current partition range.
fn try_find_next_range(
&self,
sort_column: &ArrayRef,
) -> datafusion_common::Result<Option<usize>> {
if sort_column.len() == 0 {
return Ok(Some(0));
}
// check if the current partition index is out of range
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
self.cur_part_idx,
self.partition_ranges.len()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];
let sort_column_iter = downcast_ts_array!(
sort_column.data_type() => (array_iter_helper, sort_column),
_ => internal_err!(
"Unsupported data type for sort column: {:?}",
sort_column.data_type()
)?,
);
for (idx, val) in sort_column_iter {
// ignore vacant time index data
if let Some(val) = val {
if val >= cur_range.end.value() || val < cur_range.start.value() {
return Ok(Some(idx));
}
}
}
Ok(None)
}
/// Sort and clear the buffer and return the sorted record batch
///
/// this function should return a empty record batch if the buffer is empty
/// this function will return a empty record batch if the buffer is empty
fn sort_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
if self.buffer.is_empty() {
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
@@ -317,6 +365,9 @@ impl PartSortStream {
Some(format!("Fail to sort to indices at {}", location!())),
)
})?;
if indices.is_empty() {
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
}
self.check_in_range(
&sort_column,
@@ -379,6 +430,7 @@ impl PartSortStream {
cx: &mut Context<'_>,
) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
loop {
// no more input, sort the buffer and return
if self.input_complete {
if self.buffer.is_empty() {
return Poll::Ready(None);
@@ -386,19 +438,47 @@ impl PartSortStream {
return Poll::Ready(Some(self.sort_buffer()));
}
}
// fetch next batch from input
let res = self.input.as_mut().poll_next(cx);
match res {
Poll::Ready(Some(Ok(batch))) => {
if batch.num_rows() == 0 {
let sort_column = self
.expression
.expr
.evaluate(&batch)?
.into_array(batch.num_rows())?;
let next_range_idx = self.try_find_next_range(&sort_column)?;
// `Some` means the current range is finished, split the batch into two parts and sort
if let Some(idx) = next_range_idx {
let this_range = batch.slice(0, idx);
let next_range = batch.slice(idx, batch.num_rows() - idx);
if this_range.num_rows() != 0 {
self.buffer.push(this_range);
}
// mark end of current PartitionRange
let sorted_batch = self.sort_buffer()?;
self.cur_part_idx += 1;
let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
// step to next proper PartitionRange
loop {
self.cur_part_idx += 1;
if next_sort_column.is_empty()
|| self.try_find_next_range(&next_sort_column)?.is_none()
{
break;
}
}
// push the next range to the buffer
if next_range.num_rows() != 0 {
self.buffer.push(next_range);
}
if sorted_batch.num_rows() == 0 {
// Current part is empty, continue polling next part.
continue;
}
return Poll::Ready(Some(Ok(sorted_batch)));
}
self.buffer.push(batch);
// keep polling until boundary(a empty RecordBatch) is reached
continue;

View File

@@ -21,7 +21,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use arrow::array::{Array, ArrayRef, PrimitiveArray};
use arrow::array::{Array, ArrayRef};
use arrow::compute::SortColumn;
use arrow_schema::{DataType, SchemaRef, SortOptions};
use common_error::ext::{BoxedError, PlainError};
@@ -812,9 +812,16 @@ fn find_slice_from_range(
Ok((start, end - start))
}
/// Get an iterator from a primitive array.
///
/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`.
#[macro_export]
macro_rules! array_iter_helper {
($t:ty, $unit:expr, $arr:expr) => {{
let typed = $arr.as_any().downcast_ref::<PrimitiveArray<$t>>().unwrap();
let typed = $arr
.as_any()
.downcast_ref::<arrow::array::PrimitiveArray<$t>>()
.unwrap();
let iter = typed.iter().enumerate();
Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
}};