feat: optimizer rule for windowed sort (#4874)

* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement physical rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: install windowed sort physical rule and optimize partition ranges

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add logs and sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: introduce PartSortExec for partitioned sorting

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tune exec nodes' properties and metrics

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* debug: add more info on very wrong

* debug: also print overlap ranges

* feat: add check when emit PartSort Stream

* dbg: info on overlap working range

* feat: check batch range is inside part range

* set distinguish partition range param

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: more logs

* update sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tune optimizer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix lints

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix windowed sort rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: early terminate sort stream

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: remove min/max check

* chore: remove unused windowed_sort module, uuid feature and refactor region_scanner to synchronous

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: print more fuzz log

* chore: more log

* fix: part sort should skip empty part

* chore: remove insert logs

* tests: empty PartitionRange

* refactor: testcase

* docs: update comment&tests: all empty

* ci: enlarge etcd cpu limit

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: discord9 <discord9@163.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2024-10-29 15:46:05 +08:00
committed by GitHub
parent 0ee455a980
commit 03f2fa219d
21 changed files with 930 additions and 230 deletions

View File

@@ -17,6 +17,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use arrow::array::ArrayRef;
use arrow::compute::{concat, take_record_batch};
use arrow_schema::SchemaRef;
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
@@ -24,7 +25,7 @@ use datafusion::common::arrow::compute::sort_to_indices;
use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion::execution::{RecordBatchStream, TaskContext};
use datafusion::physical_plan::coalesce_batches::concat_batches;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
@@ -33,8 +34,9 @@ use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;
use itertools::Itertools;
use snafu::location;
use store_api::region_engine::PartitionRange;
use crate::error::Result;
use crate::downcast_ts_array;
/// Sort input within given PartitionRange
///
@@ -48,11 +50,16 @@ pub struct PartSortExec {
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
partition_ranges: Vec<Vec<PartitionRange>>,
properties: PlanProperties,
}
impl PartSortExec {
pub fn try_new(expression: PhysicalSortExpr, input: Arc<dyn ExecutionPlan>) -> Result<Self> {
pub fn new(
expression: PhysicalSortExpr,
partition_ranges: Vec<Vec<PartitionRange>>,
input: Arc<dyn ExecutionPlan>,
) -> Self {
let metrics = ExecutionPlanMetricsSet::new();
let properties = PlanProperties::new(
input.equivalence_properties().clone(),
@@ -60,12 +67,13 @@ impl PartSortExec {
input.execution_mode(),
);
Ok(Self {
Self {
expression,
input,
metrics,
partition_ranges,
properties,
})
}
}
pub fn to_stream(
@@ -76,7 +84,21 @@ impl PartSortExec {
let input_stream: DfSendableRecordBatchStream =
self.input.execute(partition, context.clone())?;
let df_stream = Box::pin(PartSortStream::new(context, self, input_stream)) as _;
if partition >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
partition,
self.partition_ranges.len()
)?;
}
let df_stream = Box::pin(PartSortStream::new(
context,
self,
input_stream,
self.partition_ranges[partition].clone(),
partition,
)) as _;
Ok(df_stream)
}
@@ -114,10 +136,11 @@ impl ExecutionPlan for PartSortExec {
} else {
internal_err!("No children found")?
};
Ok(Arc::new(Self::try_new(
Ok(Arc::new(Self::new(
self.expression.clone(),
self.partition_ranges.clone(),
new_input.clone(),
)?))
)))
}
fn execute(
@@ -131,6 +154,15 @@ impl ExecutionPlan for PartSortExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
/// # Explain
///
/// This plan needs to be executed on each partition independently,
/// and is expected to run directly on storage engine's output
/// distribution / partition.
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
}
struct PartSortStream {
@@ -142,6 +174,10 @@ struct PartSortStream {
input: DfSendableRecordBatchStream,
input_complete: bool,
schema: SchemaRef,
partition_ranges: Vec<PartitionRange>,
partition: usize,
cur_part_idx: usize,
metrics: BaselineMetrics,
}
impl PartSortStream {
@@ -149,6 +185,8 @@ impl PartSortStream {
context: Arc<TaskContext>,
sort: &PartSortExec,
input: DfSendableRecordBatchStream,
partition_ranges: Vec<PartitionRange>,
partition: usize,
) -> Self {
Self {
reservation: MemoryConsumer::new("PartSortStream".to_string())
@@ -159,17 +197,85 @@ impl PartSortStream {
input,
input_complete: false,
schema: sort.input.schema(),
partition_ranges,
partition,
cur_part_idx: 0,
metrics: BaselineMetrics::new(&sort.metrics, partition),
}
}
}
macro_rules! array_check_helper {
($t:ty, $unit:expr, $arr:expr, $cur_range:expr, $min_max_idx:expr) => {{
if $cur_range.start.unit().as_arrow_time_unit() != $unit
|| $cur_range.end.unit().as_arrow_time_unit() != $unit
{
internal_err!(
"PartitionRange unit mismatch, expect {:?}, found {:?}",
$cur_range.start.unit(),
$unit
)?;
}
let arr = $arr
.as_any()
.downcast_ref::<arrow::array::PrimitiveArray<$t>>()
.unwrap();
let min = arr.value($min_max_idx.0);
let max = arr.value($min_max_idx.1);
let (min, max) = if min < max{
(min, max)
} else {
(max, min)
};
let cur_min = $cur_range.start.value();
let cur_max = $cur_range.end.value();
// note that PartitionRange is left inclusive and right exclusive
if !(min >= cur_min && max < cur_max) {
internal_err!(
"Sort column min/max value out of partition range: sort_column.min_max=[{:?}, {:?}] not in PartitionRange=[{:?}, {:?}]",
min,
max,
cur_min,
cur_max
)?;
}
}};
}
impl PartSortStream {
/// check whether the sort column's min/max value is within the partition range
fn check_in_range(
&self,
sort_column: &ArrayRef,
min_max_idx: (usize, usize),
) -> datafusion_common::Result<()> {
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
self.cur_part_idx,
self.partition_ranges.len()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];
downcast_ts_array!(
sort_column.data_type() => (array_check_helper, sort_column, cur_range, min_max_idx),
_ => internal_err!(
"Unsupported data type for sort column: {:?}",
sort_column.data_type()
)?,
);
Ok(())
}
/// Sort and clear the buffer and return the sorted record batch
///
/// this function should return None if RecordBatch is empty
fn sort_buffer(&mut self) -> datafusion_common::Result<Option<DfRecordBatch>> {
if self.buffer.iter().map(|r| r.num_rows()).sum::<usize>() == 0 {
return Ok(None);
/// this function should return a empty record batch if the buffer is empty
fn sort_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
if self.buffer.is_empty() {
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
}
let mut sort_columns = Vec::with_capacity(self.buffer.len());
let mut opt = None;
@@ -194,6 +300,24 @@ impl PartSortStream {
)
})?;
self.check_in_range(
&sort_column,
(
indices.value(0) as usize,
indices.value(indices.len() - 1) as usize,
),
)
.inspect_err(|_e| {
#[cfg(debug_assertions)]
common_telemetry::error!(
"Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}",
self.partition,
self.cur_part_idx,
sort_column.len(),
_e
);
})?;
// reserve memory for the concat input and sorted output
let total_mem: usize = self.buffer.iter().map(|r| r.get_array_memory_size()).sum();
self.reservation.try_grow(total_mem * 2)?;
@@ -229,7 +353,7 @@ impl PartSortStream {
drop(full_input);
// here remove both buffer and full_input memory
self.reservation.shrink(2 * total_mem);
Ok(Some(sorted))
Ok(sorted)
}
pub fn poll_next_inner(
@@ -241,7 +365,7 @@ impl PartSortStream {
if self.buffer.is_empty() {
return Poll::Ready(None);
} else {
return Poll::Ready(self.sort_buffer().transpose());
return Poll::Ready(Some(self.sort_buffer()));
}
}
let res = self.input.as_mut().poll_next(cx);
@@ -249,7 +373,13 @@ impl PartSortStream {
Poll::Ready(Some(Ok(batch))) => {
if batch.num_rows() == 0 {
// mark end of current PartitionRange
return Poll::Ready(self.sort_buffer().transpose());
let sorted_batch = self.sort_buffer()?;
self.cur_part_idx += 1;
if sorted_batch.num_rows() == 0 {
// Current part is empty, continue polling next part.
continue;
}
return Poll::Ready(Some(Ok(sorted_batch)));
}
self.buffer.push(batch);
// keep polling until boundary(a empty RecordBatch) is reached
@@ -271,10 +401,11 @@ impl Stream for PartSortStream {
type Item = datafusion_common::Result<DfRecordBatch>;
fn poll_next(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
self.poll_next_inner(cx)
let result = self.as_mut().poll_next_inner(cx);
self.metrics.record_poll(result)
}
}
@@ -290,7 +421,6 @@ mod test {
use arrow::json::ArrayWriter;
use arrow_schema::{DataType, Field, Schema, SortOptions, TimeUnit};
use common_telemetry::error;
use common_time::Timestamp;
use datafusion_physical_expr::expressions::Column;
use futures::StreamExt;
@@ -311,6 +441,8 @@ mod test {
let mut rng = fastrand::Rng::new();
rng.seed(1337);
let mut test_cases = Vec::new();
for case_id in 0..test_cnt {
let mut bound_val: Option<i64> = None;
let descending = rng.bool();
@@ -359,21 +491,23 @@ mod test {
};
assert!(start < end);
let mut sort_data = vec![];
let mut per_part_sort_data = vec![];
let mut batches = vec![];
for _batch_idx in 0..rng.usize(1..batch_cnt_bound) {
let cnt = rng.usize(0..batch_size_bound) + 2;
let iter = 0..rng.usize(1..cnt);
let cnt = rng.usize(0..batch_size_bound) + 1;
let iter = 0..rng.usize(0..cnt);
let data_gen = iter
.map(|_| rng.i64(start.value()..end.value()))
.collect_vec();
sort_data.extend(data_gen.clone());
if data_gen.is_empty() {
// current batch is empty, skip
continue;
}
per_part_sort_data.extend(data_gen.clone());
let arr = new_ts_array(unit.clone(), data_gen.clone());
let batch = DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap();
batches.push(batch);
}
assert!(batches.iter().all(|i| i.num_rows() >= 1));
let range = PartitionRange {
start,
@@ -384,12 +518,14 @@ mod test {
input_ranged_data.push((range, batches));
if descending {
sort_data.sort_by(|a, b| b.cmp(a));
per_part_sort_data.sort_by(|a, b| b.cmp(a));
} else {
sort_data.sort();
per_part_sort_data.sort();
}
output_data.push(sort_data);
if per_part_sort_data.is_empty() {
continue;
}
output_data.push(per_part_sort_data);
}
let expected_output = output_data
@@ -399,8 +535,17 @@ mod test {
.unwrap()
})
.collect_vec();
test_cases.push((
case_id,
unit,
input_ranged_data,
schema,
opt,
expected_output,
));
}
assert!(!expected_output.is_empty());
for (case_id, _unit, input_ranged_data, schema, opt, expected_output) in test_cases {
run_test(case_id, input_ranged_data, schema, opt, expected_output).await;
}
}
@@ -412,25 +557,50 @@ mod test {
TimeUnit::Millisecond,
vec![
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]),
((5, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
((5, 10), vec![vec![5, 6], vec![7, 8]]),
],
false,
vec![
vec![1, 2, 3, 4, 5, 6, 7, 8, 9],
vec![1, 2, 3, 4, 5, 6, 7, 8],
],
vec![vec![1, 2, 3, 4, 5, 6, 7, 8, 9], vec![5, 6, 7, 8]],
),
(
TimeUnit::Millisecond,
vec![
((5, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]),
((5, 10), vec![vec![5, 6], vec![7, 8, 9]]),
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
],
true,
vec![vec![9, 8, 7, 6, 5], vec![8, 7, 6, 5, 4, 3, 2, 1]],
),
(
TimeUnit::Millisecond,
vec![
vec![9, 8, 7, 6, 5, 4, 3, 2, 1],
vec![8, 7, 6, 5, 4, 3, 2, 1],
((5, 10), vec![]),
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
],
true,
vec![vec![8, 7, 6, 5, 4, 3, 2, 1]],
),
(
TimeUnit::Millisecond,
vec![
((15, 20), vec![vec![17, 18, 19]]),
((10, 15), vec![]),
((5, 10), vec![]),
((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
],
true,
vec![vec![19, 18, 17], vec![8, 7, 6, 5, 4, 3, 2, 1]],
),
(
TimeUnit::Millisecond,
vec![
((15, 20), vec![]),
((10, 15), vec![]),
((5, 10), vec![]),
((0, 10), vec![]),
],
true,
vec![],
),
];
@@ -487,7 +657,7 @@ mod test {
opt: SortOptions,
expected_output: Vec<DfRecordBatch>,
) {
let (_ranges, batches): (Vec<_>, Vec<_>) = input_ranged_data.clone().into_iter().unzip();
let (ranges, batches): (Vec<_>, Vec<_>) = input_ranged_data.clone().into_iter().unzip();
let batches = batches
.into_iter()
@@ -498,14 +668,14 @@ mod test {
.collect_vec();
let mock_input = MockInputExec::new(batches, schema.clone());
let exec = PartSortExec::try_new(
let exec = PartSortExec::new(
PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)),
options: opt,
},
vec![ranges],
Arc::new(mock_input),
)
.unwrap();
);
let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
@@ -513,6 +683,7 @@ mod test {
// a makeshift solution for compare large data
if real_output != expected_output {
let mut full_msg = String::new();
{
let mut buf = Vec::with_capacity(10 * real_output.len());
for batch in &real_output {
@@ -523,8 +694,9 @@ mod test {
buf.append(&mut rb_json);
buf.push(b',');
}
let buf = String::from_utf8_lossy(&buf);
error!("case_id:{case_id}, real_output: [{buf}]");
// TODO(discord9): better ways to print buf
let _buf = String::from_utf8_lossy(&buf);
full_msg += &format!("case_id:{case_id}, real_output");
}
{
let mut buf = Vec::with_capacity(10 * real_output.len());
@@ -536,10 +708,13 @@ mod test {
buf.append(&mut rb_json);
buf.push(b',');
}
let buf = String::from_utf8_lossy(&buf);
error!("case_id:{case_id}, expected_output: [{buf}]");
let _buf = String::from_utf8_lossy(&buf);
full_msg += &format!("case_id:{case_id}, expected_output");
}
panic!("case_{} failed, opt: {:?}", case_id, opt);
panic!(
"case_{} failed, opt: {:?}, full msg: {}",
case_id, opt, full_msg
);
}
}
}