From 03f2fa219da7aee3aef8ff13316f39d0b054b3b7 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 29 Oct 2024 15:46:05 +0800 Subject: [PATCH] feat: optimizer rule for windowed sort (#4874) * basic impl Signed-off-by: Ruihang Xia * implement physical rule Signed-off-by: Ruihang Xia * feat: install windowed sort physical rule and optimize partition ranges Signed-off-by: Ruihang Xia * add logs and sqlness test Signed-off-by: Ruihang Xia * feat: introduce PartSortExec for partitioned sorting Signed-off-by: Ruihang Xia * tune exec nodes' properties and metrics Signed-off-by: Ruihang Xia * clean up Signed-off-by: Ruihang Xia * fix typo Signed-off-by: Ruihang Xia * debug: add more info on very wrong * debug: also print overlap ranges * feat: add check when emit PartSort Stream * dbg: info on overlap working range * feat: check batch range is inside part range * set distinguish partition range param Signed-off-by: Ruihang Xia * chore: more logs * update sqlness Signed-off-by: Ruihang Xia * tune optimizer Signed-off-by: Ruihang Xia * clean up Signed-off-by: Ruihang Xia * fix lints Signed-off-by: Ruihang Xia * fix windowed sort rule Signed-off-by: Ruihang Xia * fix: early terminate sort stream Signed-off-by: Ruihang Xia * chore: remove min/max check * chore: remove unused windowed_sort module, uuid feature and refactor region_scanner to synchronous Signed-off-by: Ruihang Xia * chore: print more fuzz log * chore: more log * fix: part sort should skip empty part * chore: remove insert logs * tests: empty PartitionRange * refactor: testcase * docs: update comment&tests: all empty * ci: enlarge etcd cpu limit --------- Signed-off-by: Ruihang Xia Co-authored-by: discord9 Co-authored-by: evenyag --- .github/actions/setup-etcd-cluster/action.yml | 2 +- Cargo.lock | 1 + src/mito2/src/engine.rs | 5 +- src/mito2/src/read.rs | 25 + src/mito2/src/read/scan_region.rs | 2 +- src/mito2/src/read/seq_scan.rs | 12 +- src/mito2/src/read/unordered_scan.rs | 11 +- src/query/Cargo.toml | 1 + src/query/src/lib.rs | 1 + src/query/src/optimizer.rs | 1 + src/query/src/optimizer/parallelize_scan.rs | 13 +- src/query/src/optimizer/windowed_sort.rs | 158 ++++++ src/query/src/part_sort.rs | 267 ++++++++-- src/query/src/query_engine/state.rs | 5 + src/query/src/window_sort.rs | 481 ++++++++++++------ src/store-api/src/region_engine.rs | 21 +- src/table/src/table/scan.rs | 43 +- .../targets/fuzz_insert_logical_table.rs | 3 + .../common/order/windowed_sort.result | 81 +++ .../standalone/common/order/windowed_sort.sql | 26 + .../common/tql-explain-analyze/explain.result | 1 + 21 files changed, 930 insertions(+), 230 deletions(-) create mode 100644 src/query/src/optimizer/windowed_sort.rs create mode 100644 tests/cases/standalone/common/order/windowed_sort.result create mode 100644 tests/cases/standalone/common/order/windowed_sort.sql diff --git a/.github/actions/setup-etcd-cluster/action.yml b/.github/actions/setup-etcd-cluster/action.yml index 7d0d8d01ab..e6abb96bc2 100644 --- a/.github/actions/setup-etcd-cluster/action.yml +++ b/.github/actions/setup-etcd-cluster/action.yml @@ -18,7 +18,7 @@ runs: --set replicaCount=${{ inputs.etcd-replicas }} \ --set resources.requests.cpu=50m \ --set resources.requests.memory=128Mi \ - --set resources.limits.cpu=1000m \ + --set resources.limits.cpu=1500m \ --set resources.limits.memory=2Gi \ --set auth.rbac.create=false \ --set auth.rbac.token.enabled=false \ diff --git a/Cargo.lock b/Cargo.lock index d4db1185f3..c5af069e7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9030,6 +9030,7 @@ dependencies = [ "table", "tokio", "tokio-stream", + "uuid", ] [[package]] diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index ed8cc92909..8e62877270 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -163,13 +163,13 @@ impl MitoEngine { } /// Returns a region scanner to scan the region for `request`. - async fn region_scanner( + fn region_scanner( &self, region_id: RegionId, request: ScanRequest, ) -> Result { let scanner = self.scanner(region_id, request)?; - scanner.region_scanner().await + scanner.region_scanner() } /// Scans a region. @@ -527,7 +527,6 @@ impl RegionEngine for MitoEngine { request: ScanRequest, ) -> Result { self.region_scanner(region_id, request) - .await .map_err(BoxedError::new) } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index f7ed4ca958..e1ce9572c4 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -532,10 +532,24 @@ impl Batch { #[derive(Default)] pub(crate) struct BatchChecker { last_batch: Option, + start: Option, + end: Option, } #[cfg(debug_assertions)] impl BatchChecker { + /// Attaches the given start timestamp to the checker. + pub(crate) fn with_start(mut self, start: Option) -> Self { + self.start = start; + self + } + + /// Attaches the given end timestamp to the checker. + pub(crate) fn with_end(mut self, end: Option) -> Self { + self.end = end; + self + } + /// Returns true if the given batch is monotonic and behind /// the last batch. pub(crate) fn check_monotonic(&mut self, batch: &Batch) -> bool { @@ -543,6 +557,17 @@ impl BatchChecker { return false; } + if let (Some(start), Some(first)) = (self.start, batch.first_timestamp()) { + if start > first { + return false; + } + } + if let (Some(end), Some(last)) = (self.end, batch.last_timestamp()) { + if end <= last { + return false; + } + } + // Checks the batch is behind the last batch. // Then Updates the last batch. let is_behind = self diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 1c0dd50d0b..2c61124180 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -74,7 +74,7 @@ impl Scanner { /// Returns a [RegionScanner] to scan the region. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] - pub(crate) async fn region_scanner(self) -> Result { + pub(crate) fn region_scanner(self) -> Result { match self { Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)), Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)), diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 8d867d7d92..5049ea8a47 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -232,7 +232,10 @@ impl SeqScan { let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); #[cfg(debug_assertions)] - let mut checker = crate::read::BatchChecker::default(); + let mut checker = crate::read::BatchChecker::default() + .with_start(Some(part_range.start)) + .with_end(Some(part_range.end)); + while let Some(batch) = reader .next_batch() .await @@ -304,8 +307,13 @@ impl RegionScanner for SeqScan { self.scan_partition_impl(partition) } - fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { + fn prepare( + &mut self, + ranges: Vec>, + distinguish_partition_range: bool, + ) -> Result<(), BoxedError> { self.properties.partitions = ranges; + self.properties.distinguish_partition_range = distinguish_partition_range; Ok(()) } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index c51f349cbd..72bf424020 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -140,7 +140,9 @@ impl UnorderedScan { let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); #[cfg(debug_assertions)] - let mut checker = crate::read::BatchChecker::default(); + let mut checker = crate::read::BatchChecker::default() + .with_start(Some(part_range.start)) + .with_end(Some(part_range.end)); let stream = Self::scan_partition_range( stream_ctx.clone(), @@ -209,8 +211,13 @@ impl RegionScanner for UnorderedScan { self.stream_ctx.input.mapper.output_schema() } - fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { + fn prepare( + &mut self, + ranges: Vec>, + distinguish_partition_range: bool, + ) -> Result<(), BoxedError> { self.properties.partitions = ranges; + self.properties.distinguish_partition_range = distinguish_partition_range; Ok(()) } diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 3f983bb9e3..863a5a1c33 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -64,6 +64,7 @@ store-api.workspace = true substrait.workspace = true table.workspace = true tokio.workspace = true +uuid.workspace = true [dev-dependencies] approx_eq = "0.1" diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 5b119115e3..18b0b22582 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -15,6 +15,7 @@ #![feature(let_chains)] #![feature(int_roundings)] #![feature(trait_upcasting)] +#![feature(try_blocks)] mod analyze; pub mod dataframe; diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 2f97a9bd32..7dbeae4b98 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -20,6 +20,7 @@ pub mod string_normalization; #[cfg(test)] pub(crate) mod test_util; pub mod type_conversion; +pub mod windowed_sort; use datafusion_common::config::ConfigOptions; use datafusion_common::Result; diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index d679702796..19f5db39d3 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -51,16 +51,27 @@ impl ParallelizeScan { let result = plan .transform_down(|plan| { if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { + if region_scan_exec.is_partition_set() { + return Ok(Transformed::no(plan)); + } + let ranges = region_scan_exec.get_partition_ranges(); let total_range_num = ranges.len(); let expected_partition_num = config.execution.target_partitions; // assign ranges to each partition - let partition_ranges = + let mut partition_ranges = Self::assign_partition_range(ranges, expected_partition_num); debug!( "Assign {total_range_num} ranges to {expected_partition_num} partitions" ); + + // sort the ranges in each partition + // TODO(ruihang): smart sort! + for ranges in partition_ranges.iter_mut() { + ranges.sort_by(|a, b| a.start.cmp(&b.start)); + } + // update the partition ranges let new_exec = region_scan_exec .with_new_partitions(partition_ranges) diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs new file mode 100644 index 0000000000..62d4495cf3 --- /dev/null +++ b/src/query/src/optimizer/windowed_sort.rs @@ -0,0 +1,158 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::Result as DataFusionResult; +use datafusion_physical_expr::expressions::Column as PhysicalColumn; +use store_api::region_engine::PartitionRange; +use table::table::scan::RegionScanExec; + +use crate::part_sort::PartSortExec; +use crate::window_sort::WindowedSortExec; + +/// Optimize rule for windowed sort. +/// +/// This is expected to run after [`ScanHint`] and [`ParallelizeScan`]. +/// It would change the original sort to a custom plan. To make sure +/// other rules are applied correctly, this rule can be run as later as +/// possible. +/// +/// [`ScanHint`]: crate::optimizer::scan_hint::ScanHintRule +/// [`ParallelizeScan`]: crate::optimizer::parallelize_scan::ParallelizeScan +pub struct WindowedSortPhysicalRule; + +impl PhysicalOptimizerRule for WindowedSortPhysicalRule { + fn optimize( + &self, + plan: Arc, + config: &datafusion::config::ConfigOptions, + ) -> DataFusionResult> { + Self::do_optimize(plan, config) + } + + fn name(&self) -> &str { + "WindowedSortRule" + } + + fn schema_check(&self) -> bool { + false + } +} + +impl WindowedSortPhysicalRule { + fn do_optimize( + plan: Arc, + _config: &datafusion::config::ConfigOptions, + ) -> DataFusionResult> { + let result = plan + .transform_down(|plan| { + if let Some(sort_exec) = plan.as_any().downcast_ref::() { + // TODO: support multiple expr in windowed sort + if !sort_exec.preserve_partitioning() || sort_exec.expr().len() != 1 { + return Ok(Transformed::no(plan)); + } + + let Some(scanner_info) = fetch_partition_range(sort_exec.input().clone())? + else { + return Ok(Transformed::no(plan)); + }; + + if let Some(first_sort_expr) = sort_exec.expr().first() + && !first_sort_expr.options.descending + && let Some(column_expr) = first_sort_expr + .expr + .as_any() + .downcast_ref::() + && column_expr.name() == scanner_info.time_index + { + } else { + return Ok(Transformed::no(plan)); + } + + let first_sort_expr = sort_exec.expr().first().unwrap().clone(); + let part_sort_exec = Arc::new(PartSortExec::new( + first_sort_expr.clone(), + scanner_info.partition_ranges.clone(), + sort_exec.input().clone(), + )); + let windowed_sort_exec = WindowedSortExec::try_new( + first_sort_expr, + sort_exec.fetch(), + scanner_info.partition_ranges, + part_sort_exec, + )?; + + return Ok(Transformed { + data: Arc::new(windowed_sort_exec), + transformed: true, + tnr: datafusion_common::tree_node::TreeNodeRecursion::Stop, + }); + } + + Ok(Transformed::no(plan)) + })? + .data; + + Ok(result) + } +} + +struct ScannerInfo { + partition_ranges: Vec>, + time_index: String, +} + +fn fetch_partition_range(input: Arc) -> DataFusionResult> { + let mut partition_ranges = None; + let mut time_index = None; + + input.transform_up(|plan| { + // Unappliable case, reset the state. + if plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + || plan.as_any().is::() + { + partition_ranges = None; + } + + if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { + partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges()); + time_index = region_scan_exec.time_index(); + + // set distinguish_partition_ranges to true, this is an incorrect workaround + region_scan_exec.with_distinguish_partition_range(true); + } + + Ok(Transformed::no(plan)) + })?; + + let result = try { + ScannerInfo { + partition_ranges: partition_ranges?, + time_index: time_index?, + } + }; + + Ok(result) +} diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 965dfa9c5a..c4b9d35b20 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use arrow::array::ArrayRef; use arrow::compute::{concat, take_record_batch}; use arrow_schema::SchemaRef; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; @@ -24,7 +25,7 @@ use datafusion::common::arrow::compute::sort_to_indices; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::{RecordBatchStream, TaskContext}; use datafusion::physical_plan::coalesce_batches::concat_batches; -use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, }; @@ -33,8 +34,9 @@ use datafusion_physical_expr::PhysicalSortExpr; use futures::Stream; use itertools::Itertools; use snafu::location; +use store_api::region_engine::PartitionRange; -use crate::error::Result; +use crate::downcast_ts_array; /// Sort input within given PartitionRange /// @@ -48,11 +50,16 @@ pub struct PartSortExec { input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, + partition_ranges: Vec>, properties: PlanProperties, } impl PartSortExec { - pub fn try_new(expression: PhysicalSortExpr, input: Arc) -> Result { + pub fn new( + expression: PhysicalSortExpr, + partition_ranges: Vec>, + input: Arc, + ) -> Self { let metrics = ExecutionPlanMetricsSet::new(); let properties = PlanProperties::new( input.equivalence_properties().clone(), @@ -60,12 +67,13 @@ impl PartSortExec { input.execution_mode(), ); - Ok(Self { + Self { expression, input, metrics, + partition_ranges, properties, - }) + } } pub fn to_stream( @@ -76,7 +84,21 @@ impl PartSortExec { let input_stream: DfSendableRecordBatchStream = self.input.execute(partition, context.clone())?; - let df_stream = Box::pin(PartSortStream::new(context, self, input_stream)) as _; + if partition >= self.partition_ranges.len() { + internal_err!( + "Partition index out of range: {} >= {}", + partition, + self.partition_ranges.len() + )?; + } + + let df_stream = Box::pin(PartSortStream::new( + context, + self, + input_stream, + self.partition_ranges[partition].clone(), + partition, + )) as _; Ok(df_stream) } @@ -114,10 +136,11 @@ impl ExecutionPlan for PartSortExec { } else { internal_err!("No children found")? }; - Ok(Arc::new(Self::try_new( + Ok(Arc::new(Self::new( self.expression.clone(), + self.partition_ranges.clone(), new_input.clone(), - )?)) + ))) } fn execute( @@ -131,6 +154,15 @@ impl ExecutionPlan for PartSortExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + /// # Explain + /// + /// This plan needs to be executed on each partition independently, + /// and is expected to run directly on storage engine's output + /// distribution / partition. + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } } struct PartSortStream { @@ -142,6 +174,10 @@ struct PartSortStream { input: DfSendableRecordBatchStream, input_complete: bool, schema: SchemaRef, + partition_ranges: Vec, + partition: usize, + cur_part_idx: usize, + metrics: BaselineMetrics, } impl PartSortStream { @@ -149,6 +185,8 @@ impl PartSortStream { context: Arc, sort: &PartSortExec, input: DfSendableRecordBatchStream, + partition_ranges: Vec, + partition: usize, ) -> Self { Self { reservation: MemoryConsumer::new("PartSortStream".to_string()) @@ -159,17 +197,85 @@ impl PartSortStream { input, input_complete: false, schema: sort.input.schema(), + partition_ranges, + partition, + cur_part_idx: 0, + metrics: BaselineMetrics::new(&sort.metrics, partition), } } } +macro_rules! array_check_helper { + ($t:ty, $unit:expr, $arr:expr, $cur_range:expr, $min_max_idx:expr) => {{ + if $cur_range.start.unit().as_arrow_time_unit() != $unit + || $cur_range.end.unit().as_arrow_time_unit() != $unit + { + internal_err!( + "PartitionRange unit mismatch, expect {:?}, found {:?}", + $cur_range.start.unit(), + $unit + )?; + } + let arr = $arr + .as_any() + .downcast_ref::>() + .unwrap(); + + let min = arr.value($min_max_idx.0); + let max = arr.value($min_max_idx.1); + let (min, max) = if min < max{ + (min, max) + } else { + (max, min) + }; + let cur_min = $cur_range.start.value(); + let cur_max = $cur_range.end.value(); + // note that PartitionRange is left inclusive and right exclusive + if !(min >= cur_min && max < cur_max) { + internal_err!( + "Sort column min/max value out of partition range: sort_column.min_max=[{:?}, {:?}] not in PartitionRange=[{:?}, {:?}]", + min, + max, + cur_min, + cur_max + )?; + } + }}; +} + impl PartSortStream { + /// check whether the sort column's min/max value is within the partition range + fn check_in_range( + &self, + sort_column: &ArrayRef, + min_max_idx: (usize, usize), + ) -> datafusion_common::Result<()> { + if self.cur_part_idx >= self.partition_ranges.len() { + internal_err!( + "Partition index out of range: {} >= {}", + self.cur_part_idx, + self.partition_ranges.len() + )?; + } + let cur_range = self.partition_ranges[self.cur_part_idx]; + + downcast_ts_array!( + sort_column.data_type() => (array_check_helper, sort_column, cur_range, min_max_idx), + _ => internal_err!( + "Unsupported data type for sort column: {:?}", + sort_column.data_type() + )?, + ); + + Ok(()) + } + /// Sort and clear the buffer and return the sorted record batch /// - /// this function should return None if RecordBatch is empty - fn sort_buffer(&mut self) -> datafusion_common::Result> { - if self.buffer.iter().map(|r| r.num_rows()).sum::() == 0 { - return Ok(None); + /// this function should return a empty record batch if the buffer is empty + fn sort_buffer(&mut self) -> datafusion_common::Result { + if self.buffer.is_empty() { + return Ok(DfRecordBatch::new_empty(self.schema.clone())); } let mut sort_columns = Vec::with_capacity(self.buffer.len()); let mut opt = None; @@ -194,6 +300,24 @@ impl PartSortStream { ) })?; + self.check_in_range( + &sort_column, + ( + indices.value(0) as usize, + indices.value(indices.len() - 1) as usize, + ), + ) + .inspect_err(|_e| { + #[cfg(debug_assertions)] + common_telemetry::error!( + "Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}", + self.partition, + self.cur_part_idx, + sort_column.len(), + _e + ); + })?; + // reserve memory for the concat input and sorted output let total_mem: usize = self.buffer.iter().map(|r| r.get_array_memory_size()).sum(); self.reservation.try_grow(total_mem * 2)?; @@ -229,7 +353,7 @@ impl PartSortStream { drop(full_input); // here remove both buffer and full_input memory self.reservation.shrink(2 * total_mem); - Ok(Some(sorted)) + Ok(sorted) } pub fn poll_next_inner( @@ -241,7 +365,7 @@ impl PartSortStream { if self.buffer.is_empty() { return Poll::Ready(None); } else { - return Poll::Ready(self.sort_buffer().transpose()); + return Poll::Ready(Some(self.sort_buffer())); } } let res = self.input.as_mut().poll_next(cx); @@ -249,7 +373,13 @@ impl PartSortStream { Poll::Ready(Some(Ok(batch))) => { if batch.num_rows() == 0 { // mark end of current PartitionRange - return Poll::Ready(self.sort_buffer().transpose()); + let sorted_batch = self.sort_buffer()?; + self.cur_part_idx += 1; + if sorted_batch.num_rows() == 0 { + // Current part is empty, continue polling next part. + continue; + } + return Poll::Ready(Some(Ok(sorted_batch))); } self.buffer.push(batch); // keep polling until boundary(a empty RecordBatch) is reached @@ -271,10 +401,11 @@ impl Stream for PartSortStream { type Item = datafusion_common::Result; fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - self.poll_next_inner(cx) + let result = self.as_mut().poll_next_inner(cx); + self.metrics.record_poll(result) } } @@ -290,7 +421,6 @@ mod test { use arrow::json::ArrayWriter; use arrow_schema::{DataType, Field, Schema, SortOptions, TimeUnit}; - use common_telemetry::error; use common_time::Timestamp; use datafusion_physical_expr::expressions::Column; use futures::StreamExt; @@ -311,6 +441,8 @@ mod test { let mut rng = fastrand::Rng::new(); rng.seed(1337); + let mut test_cases = Vec::new(); + for case_id in 0..test_cnt { let mut bound_val: Option = None; let descending = rng.bool(); @@ -359,21 +491,23 @@ mod test { }; assert!(start < end); - let mut sort_data = vec![]; + let mut per_part_sort_data = vec![]; let mut batches = vec![]; for _batch_idx in 0..rng.usize(1..batch_cnt_bound) { - let cnt = rng.usize(0..batch_size_bound) + 2; - let iter = 0..rng.usize(1..cnt); + let cnt = rng.usize(0..batch_size_bound) + 1; + let iter = 0..rng.usize(0..cnt); let data_gen = iter .map(|_| rng.i64(start.value()..end.value())) .collect_vec(); - sort_data.extend(data_gen.clone()); + if data_gen.is_empty() { + // current batch is empty, skip + continue; + } + per_part_sort_data.extend(data_gen.clone()); let arr = new_ts_array(unit.clone(), data_gen.clone()); - let batch = DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap(); batches.push(batch); } - assert!(batches.iter().all(|i| i.num_rows() >= 1)); let range = PartitionRange { start, @@ -384,12 +518,14 @@ mod test { input_ranged_data.push((range, batches)); if descending { - sort_data.sort_by(|a, b| b.cmp(a)); + per_part_sort_data.sort_by(|a, b| b.cmp(a)); } else { - sort_data.sort(); + per_part_sort_data.sort(); } - - output_data.push(sort_data); + if per_part_sort_data.is_empty() { + continue; + } + output_data.push(per_part_sort_data); } let expected_output = output_data @@ -399,8 +535,17 @@ mod test { .unwrap() }) .collect_vec(); + test_cases.push(( + case_id, + unit, + input_ranged_data, + schema, + opt, + expected_output, + )); + } - assert!(!expected_output.is_empty()); + for (case_id, _unit, input_ranged_data, schema, opt, expected_output) in test_cases { run_test(case_id, input_ranged_data, schema, opt, expected_output).await; } } @@ -412,25 +557,50 @@ mod test { TimeUnit::Millisecond, vec![ ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]), - ((5, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), + ((5, 10), vec![vec![5, 6], vec![7, 8]]), ], false, - vec![ - vec![1, 2, 3, 4, 5, 6, 7, 8, 9], - vec![1, 2, 3, 4, 5, 6, 7, 8], - ], + vec![vec![1, 2, 3, 4, 5, 6, 7, 8, 9], vec![5, 6, 7, 8]], ), ( TimeUnit::Millisecond, vec![ - ((5, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]), + ((5, 10), vec![vec![5, 6], vec![7, 8, 9]]), ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), ], true, + vec![vec![9, 8, 7, 6, 5], vec![8, 7, 6, 5, 4, 3, 2, 1]], + ), + ( + TimeUnit::Millisecond, vec![ - vec![9, 8, 7, 6, 5, 4, 3, 2, 1], - vec![8, 7, 6, 5, 4, 3, 2, 1], + ((5, 10), vec![]), + ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), ], + true, + vec![vec![8, 7, 6, 5, 4, 3, 2, 1]], + ), + ( + TimeUnit::Millisecond, + vec![ + ((15, 20), vec![vec![17, 18, 19]]), + ((10, 15), vec![]), + ((5, 10), vec![]), + ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), + ], + true, + vec![vec![19, 18, 17], vec![8, 7, 6, 5, 4, 3, 2, 1]], + ), + ( + TimeUnit::Millisecond, + vec![ + ((15, 20), vec![]), + ((10, 15), vec![]), + ((5, 10), vec![]), + ((0, 10), vec![]), + ], + true, + vec![], ), ]; @@ -487,7 +657,7 @@ mod test { opt: SortOptions, expected_output: Vec, ) { - let (_ranges, batches): (Vec<_>, Vec<_>) = input_ranged_data.clone().into_iter().unzip(); + let (ranges, batches): (Vec<_>, Vec<_>) = input_ranged_data.clone().into_iter().unzip(); let batches = batches .into_iter() @@ -498,14 +668,14 @@ mod test { .collect_vec(); let mock_input = MockInputExec::new(batches, schema.clone()); - let exec = PartSortExec::try_new( + let exec = PartSortExec::new( PhysicalSortExpr { expr: Arc::new(Column::new("ts", 0)), options: opt, }, + vec![ranges], Arc::new(mock_input), - ) - .unwrap(); + ); let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap(); @@ -513,6 +683,7 @@ mod test { // a makeshift solution for compare large data if real_output != expected_output { + let mut full_msg = String::new(); { let mut buf = Vec::with_capacity(10 * real_output.len()); for batch in &real_output { @@ -523,8 +694,9 @@ mod test { buf.append(&mut rb_json); buf.push(b','); } - let buf = String::from_utf8_lossy(&buf); - error!("case_id:{case_id}, real_output: [{buf}]"); + // TODO(discord9): better ways to print buf + let _buf = String::from_utf8_lossy(&buf); + full_msg += &format!("case_id:{case_id}, real_output"); } { let mut buf = Vec::with_capacity(10 * real_output.len()); @@ -536,10 +708,13 @@ mod test { buf.append(&mut rb_json); buf.push(b','); } - let buf = String::from_utf8_lossy(&buf); - error!("case_id:{case_id}, expected_output: [{buf}]"); + let _buf = String::from_utf8_lossy(&buf); + full_msg += &format!("case_id:{case_id}, expected_output"); } - panic!("case_{} failed, opt: {:?}", case_id, opt); + panic!( + "case_{} failed, opt: {:?}, full msg: {}", + case_id, opt, full_msg + ); } } } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 97386e9b2d..edc10e076a 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -49,6 +49,7 @@ use crate::optimizer::remove_duplicate::RemoveDuplicate; use crate::optimizer::scan_hint::ScanHintRule; use crate::optimizer::string_normalization::StringNormalizationRule; use crate::optimizer::type_conversion::TypeConversionRule; +use crate::optimizer::windowed_sort::WindowedSortPhysicalRule; use crate::optimizer::ExtensionAnalyzerRule; use crate::query_engine::options::QueryOptions; use crate::query_engine::DefaultSerializer; @@ -120,6 +121,10 @@ impl QueryEngineState { physical_optimizer .rules .insert(0, Arc::new(ParallelizeScan)); + // Add rule for windowed sort + physical_optimizer + .rules + .push(Arc::new(WindowedSortPhysicalRule)); // Add rule to remove duplicate nodes generated by other rules. Run this in the last. physical_optimizer.rules.push(Arc::new(RemoveDuplicate)); diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index 25dd4b5fb8..305585b267 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -21,10 +21,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::array::types::{ - TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, - TimestampSecondType, -}; use arrow::array::{Array, ArrayRef, PrimitiveArray}; use arrow::compute::SortColumn; use arrow_schema::{DataType, SchemaRef, SortOptions}; @@ -58,30 +54,57 @@ use crate::error::{QueryExecutionSnafu, Result}; /// /// internally, it call [`streaming_merge`] multiple times to merge multiple sorted "working ranges" /// -/// the input stream must be concated in the order of `PartitionRange` in `ranges`(and each `PartitionRange` is sorted within itself), or else the result will be incorrect +/// # Invariant Promise on Input Stream +/// 1. The input stream must be sorted by timestamp and +/// 2. in the order of `PartitionRange` in `ranges` +/// 3. Each `PartitionRange` is sorted within itself(ascending or descending) but no need to be sorted across ranges +/// 4. There can't be any RecordBatch that is cross multiple `PartitionRange` in the input stream +/// +/// TODO(discord9): fix item 4, but since only use `PartSort` as input, this might not be a problem + #[derive(Debug, Clone)] pub struct WindowedSortExec { /// Physical sort expressions(that is, sort by timestamp) expression: PhysicalSortExpr, /// Optional number of rows to fetch. Stops producing rows after this fetch fetch: Option, - /// The input ranges indicate input stream will be composed of those ranges in given order - ranges: Vec, + /// The input ranges indicate input stream will be composed of those ranges in given order. + /// + /// Each partition has one vector of `PartitionRange`. + ranges: Vec>, /// All available working ranges and their corresponding working set /// - /// working ranges promise once input stream get a value out of current range, future values will never be in this range - all_avail_working_range: Vec<(TimeRange, BTreeSet)>, + /// working ranges promise once input stream get a value out of current range, future values will never + /// be in this range. Each partition has one vector of ranges. + all_avail_working_range: Vec)>>, input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, properties: PlanProperties, } -fn check_partition_range_monotonicity(ranges: &[PartitionRange], descending: bool) -> bool { - if descending { - ranges.windows(2).all(|w| w[0].end >= w[1].end) +fn check_partition_range_monotonicity( + ranges: &[Vec], + descending: bool, +) -> Result<()> { + let is_valid = ranges.iter().all(|r| { + if descending { + r.windows(2).all(|w| w[0].end >= w[1].end) + } else { + r.windows(2).all(|w| w[0].start <= w[1].start) + } + }); + + if !is_valid { + let msg = if descending { + "Input `PartitionRange`s's upper bound is not monotonic non-increase" + } else { + "Input `PartitionRange`s's lower bound is not monotonic non-decrease" + }; + let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected); + Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {}) } else { - ranges.windows(2).all(|w| w[0].start <= w[1].start) + Ok(()) } } @@ -89,28 +112,28 @@ impl WindowedSortExec { pub fn try_new( expression: PhysicalSortExpr, fetch: Option, - ranges: Vec, + ranges: Vec>, input: Arc, ) -> Result { - if !check_partition_range_monotonicity(&ranges, expression.options.descending) { - let msg = if expression.options.descending { - "Input `PartitionRange`s's upper bound is not monotonic non-increase" - } else { - "Input `PartitionRange`s's lower bound is not monotonic non-decrease" - }; - let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected); - return Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {}); - } + check_partition_range_monotonicity(&ranges, expression.options.descending)?; let properties = PlanProperties::new( - input.equivalence_properties().clone(), + input + .equivalence_properties() + .clone() + .with_reorder(vec![expression.clone()]), input.output_partitioning().clone(), input.execution_mode(), ); - let overlap_counts = split_overlapping_ranges(&ranges); - let all_avail_working_range = - compute_all_working_ranges(&overlap_counts, expression.options.descending); + let mut all_avail_working_range = Vec::with_capacity(ranges.len()); + for r in &ranges { + let overlap_counts = split_overlapping_ranges(r); + let working_ranges = + compute_all_working_ranges(&overlap_counts, expression.options.descending); + all_avail_working_range.push(working_ranges); + } + Ok(Self { expression, fetch, @@ -123,7 +146,8 @@ impl WindowedSortExec { } /// During receiving partial-sorted RecordBatch, we need to update the working set which is the - /// `PartitionRange` we think those RecordBatch belongs to. And when we receive something outside of working set, we can merge results before whenever possible. + /// `PartitionRange` we think those RecordBatch belongs to. And when we receive something outside + /// of working set, we can merge results before whenever possible. pub fn to_stream( &self, context: Arc, @@ -132,7 +156,12 @@ impl WindowedSortExec { let input_stream: DfSendableRecordBatchStream = self.input.execute(partition, context.clone())?; - let df_stream = Box::pin(WindowedSortStream::new(context, self, input_stream)) as _; + let df_stream = Box::pin(WindowedSortStream::new( + context, + self, + input_stream, + partition, + )) as _; Ok(df_stream) } @@ -190,6 +219,15 @@ impl ExecutionPlan for WindowedSortExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + /// # Explain + /// + /// This plan needs to be executed on each partition independently, + /// and is expected to run directly on storage engine's output + /// distribution / partition. + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false; self.ranges.len()] + } } /// The core logic of merging sort multiple sorted ranges @@ -215,8 +253,8 @@ pub struct WindowedSortStream { working_idx: usize, /// input stream assumed reading in order of `PartitionRange` input: DfSendableRecordBatchStream, - /// Whether the input stream is terminated, if it did, we should poll input again - is_input_terminated: bool, + /// Whether this stream is terminated. For reasons like limit reached or input stream is done. + is_terminated: bool, /// Output Schema, which is the same as input schema, since this is a sort plan schema: SchemaRef, /// Physical sort expressions(that is, sort by timestamp) @@ -231,8 +269,10 @@ pub struct WindowedSortStream { /// /// working ranges promise once input stream get a value out of current range, future values will never be in this range all_avail_working_range: Vec<(TimeRange, BTreeSet)>, + /// The input partition ranges + ranges: Vec, /// Execution metrics - metrics: ExecutionPlanMetricsSet, + metrics: BaselineMetrics, } impl WindowedSortStream { @@ -240,6 +280,7 @@ impl WindowedSortStream { context: Arc, exec: &WindowedSortExec, input: DfSendableRecordBatchStream, + partition: usize, ) -> Self { Self { memory_pool: context.runtime_env().memory_pool.clone(), @@ -251,18 +292,79 @@ impl WindowedSortStream { working_idx: 0, schema: input.schema(), input, - is_input_terminated: false, + is_terminated: false, expression: exec.expression.clone(), fetch: exec.fetch, produced: 0, batch_size: context.session_config().batch_size(), - all_avail_working_range: exec.all_avail_working_range.clone(), - metrics: exec.metrics.clone(), + all_avail_working_range: exec.all_avail_working_range[partition].clone(), + ranges: exec.ranges[partition].clone(), + metrics: BaselineMetrics::new(&exec.metrics, partition), } } } impl WindowedSortStream { + #[cfg(debug_assertions)] + fn check_subset_ranges(&self, cur_range: &TimeRange) { + let cur_is_subset_to = self + .ranges + .iter() + .filter(|r| cur_range.is_subset(&TimeRange::from(*r))) + .collect_vec(); + if cur_is_subset_to.is_empty() { + error!("Current range is not a subset of any PartitionRange"); + // found in all ranges that are subset of current range + let subset_ranges = self + .ranges + .iter() + .filter(|r| TimeRange::from(*r).is_subset(cur_range)) + .collect_vec(); + let only_overlap = self + .ranges + .iter() + .filter(|r| { + let r = TimeRange::from(*r); + r.is_overlapping(cur_range) && !r.is_subset(cur_range) + }) + .collect_vec(); + error!( + "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}", + subset_ranges.len(), + only_overlap.len(), + subset_ranges, + only_overlap + ); + } else { + let only_overlap = self + .ranges + .iter() + .filter(|r| { + let r = TimeRange::from(*r); + r.is_overlapping(cur_range) && !cur_range.is_subset(&r) + }) + .collect_vec(); + error!( + "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}", + cur_is_subset_to.len(), + only_overlap.len(), + cur_is_subset_to, + only_overlap + ); + } + let all_overlap_working_range = self + .all_avail_working_range + .iter() + .filter(|(range, _)| range.is_overlapping(cur_range)) + .map(|(range, _)| range) + .collect_vec(); + error!( + "Found {} working ranges that overlap with current range: {:?}", + all_overlap_working_range.len(), + all_overlap_working_range + ); + } + /// Poll the next RecordBatch from the merge-sort's output stream fn poll_result_stream( &mut self, @@ -273,6 +375,7 @@ impl WindowedSortStream { Poll::Ready(Some(Ok(batch))) => { let ret = if let Some(remaining) = self.remaining_fetch() { if remaining == 0 { + self.is_terminated = true; None } else if remaining < batch.num_rows() { self.produced += remaining; @@ -304,6 +407,7 @@ impl WindowedSortStream { // if no output stream is available Poll::Ready(None) } + /// The core logic of merging sort multiple sorted ranges /// /// We try to maximize the number of sorted runs we can merge in one go, while emit the result as soon as possible. @@ -314,7 +418,7 @@ impl WindowedSortStream { // first check and send out the merge result match self.poll_result_stream(cx) { Poll::Ready(None) => { - if self.is_input_terminated { + if self.is_terminated { return Poll::Ready(None); } } @@ -322,7 +426,7 @@ impl WindowedSortStream { }; // consume input stream - while !self.is_input_terminated { + while !self.is_terminated { // then we get a new RecordBatch from input stream let new_input_rbs = match self.input.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(batch))) => { @@ -333,128 +437,114 @@ impl WindowedSortStream { } Poll::Ready(None) => { // input stream is done, we need to merge sort the remaining working set - self.is_input_terminated = true; + self.is_terminated = true; None } Poll::Pending => return Poll::Pending, }; + let Some(SortedRunSet { + runs_with_batch, + sort_column, + }) = new_input_rbs + else { + // input stream is done, we need to merge sort the remaining working set + self.build_sorted_stream()?; + self.start_new_merge_sort()?; + continue; + }; // The core logic to eargerly merge sort the working set - match new_input_rbs { - Some(SortedRunSet { - runs_with_batch, - sort_column, - }) => { - // compare with last_value to find boundary, then merge runs if needed - // iterate over runs_with_batch to merge sort, might create zero or more stream to put to `sort_partition_rbs` + // compare with last_value to find boundary, then merge runs if needed - let mut last_remaining = None; - let mut run_iter = runs_with_batch.into_iter(); - loop { - let (sorted_rb, run_info) = if let Some(r) = last_remaining.take() { - r - } else if let Some(r) = run_iter.next() { - r - } else { - break; - }; - if sorted_rb.num_rows() == 0 { - continue; - } - // determine if this batch is in current working range - let cur_range = { - match run_info.get_time_range() { - Some(r) => r, - _ => internal_err!("Found NULL in time index column")?, - } - }; - let working_range = if let Some(r) = self.get_working_range() { - r - } else { - internal_err!("No working range found")? - }; - - if sort_column.options.unwrap_or_default().descending { - if cur_range.end > working_range.end { - error!("Invalid range: {:?} > {:?}", cur_range, working_range); - internal_err!("Current batch have data on the right side of working range, something is very wrong")?; - } - } else if cur_range.start < working_range.start { - error!("Invalid range: {:?} < {:?}", cur_range, working_range); - internal_err!("Current batch have data on the left side of working range, something is very wrong")?; - } - - if cur_range.is_subset(&working_range) { - // data still in range, can't merge sort yet - // see if can concat entire sorted rb, merge sort need to wait - self.try_concat_batch( - sorted_rb.clone(), - &run_info, - sort_column.options, - )?; - } else if let Some(intersection) = cur_range.intersection(&working_range) { - // slice rb by intersection and concat it then merge sort - let cur_sort_column = - sort_column.values.slice(run_info.offset, run_info.len); - let (offset, len) = find_slice_from_range( - &SortColumn { - values: cur_sort_column.clone(), - options: sort_column.options, - }, - &intersection, - )?; - - if offset != 0 { - internal_err!("Current batch have data on the left side of working range, something is very wrong")?; - } - - let sliced_rb = sorted_rb.slice(offset, len); - - // try to concat the sliced input batch to the current `in_progress` run - self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?; - // since no more sorted data in this range will come in now, build stream now - self.build_sorted_stream()?; - - // since we are crossing working range, we need to merge sort the working set - self.start_new_merge_sort()?; - - let (r_offset, r_len) = - (offset + len, sorted_rb.num_rows() - offset - len); - if r_len != 0 { - // we have remaining data in this batch, put it back to input queue - let remaining_rb = sorted_rb.slice(r_offset, r_len); - let new_first_val = - get_timestamp_from_idx(&cur_sort_column, r_offset)?; - let new_run_info = SucRun { - offset: run_info.offset + r_offset, - len: r_len, - first_val: new_first_val, - last_val: run_info.last_val, - }; - last_remaining = Some((remaining_rb, new_run_info)); - } - // deal with remaining batch cross working range problem - // i.e: this example require more slice, and we are currently at point A - // |---1---| |---3---| - // |-------A--2------------| - // put the remaining batch back to iter and deal it in next loop - } else { - // no overlap, we can merge sort the working set - - self.build_sorted_stream()?; - self.start_new_merge_sort()?; - - // always put it back to input queue until some batch is in working range - last_remaining = Some((sorted_rb, run_info)); - } - } + // iterate over runs_with_batch to merge sort, might create zero or more stream to put to `sort_partition_rbs` + let mut last_remaining = None; + let mut run_iter = runs_with_batch.into_iter(); + loop { + let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else { + break; + }; + if sorted_rb.num_rows() == 0 { + continue; } - None => { - // input stream is done, we need to merge sort the remaining working set - // and emit the result + // determine if this batch is in current working range + let Some(cur_range) = run_info.get_time_range() else { + internal_err!("Found NULL in time index column")? + }; + let Some(working_range) = self.get_working_range() else { + internal_err!("No working range found")? + }; + + // ensure the current batch is in the working range + if sort_column.options.unwrap_or_default().descending { + if cur_range.end > working_range.end { + error!("Invalid range: {:?} > {:?}", cur_range, working_range); + #[cfg(debug_assertions)] + self.check_subset_ranges(&cur_range); + internal_err!("Current batch have data on the right side of working range, something is very wrong")?; + } + } else if cur_range.start < working_range.start { + error!("Invalid range: {:?} < {:?}", cur_range, working_range); + #[cfg(debug_assertions)] + self.check_subset_ranges(&cur_range); + internal_err!("Current batch have data on the left side of working range, something is very wrong")?; + } + + if cur_range.is_subset(&working_range) { + // data still in range, can't merge sort yet + // see if can concat entire sorted rb, merge sort need to wait + self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?; + } else if let Some(intersection) = cur_range.intersection(&working_range) { + // slice rb by intersection and concat it then merge sort + let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len); + let (offset, len) = find_slice_from_range( + &SortColumn { + values: cur_sort_column.clone(), + options: sort_column.options, + }, + &intersection, + )?; + + if offset != 0 { + internal_err!("Current batch have data on the left side of working range, something is very wrong")?; + } + + let sliced_rb = sorted_rb.slice(offset, len); + + // try to concat the sliced input batch to the current `in_progress` run + self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?; + // since no more sorted data in this range will come in now, build stream now + self.build_sorted_stream()?; + + // since we are crossing working range, we need to merge sort the working set + self.start_new_merge_sort()?; + + let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len); + if r_len != 0 { + // we have remaining data in this batch, put it back to input queue + let remaining_rb = sorted_rb.slice(r_offset, r_len); + let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?; + let new_run_info = SucRun { + offset: run_info.offset + r_offset, + len: r_len, + first_val: new_first_val, + last_val: run_info.last_val, + }; + last_remaining = Some((remaining_rb, new_run_info)); + } + // deal with remaining batch cross working range problem + // i.e: this example require more slice, and we are currently at point A + // |---1---| |---3---| + // |-------A--2------------| + // put the remaining batch back to iter and deal it in next loop + } else { + // no overlap, we can merge sort the working set + self.build_sorted_stream()?; self.start_new_merge_sort()?; + + // always put it back to input queue until some batch is in working range + last_remaining = Some((sorted_rb, run_info)); } } } @@ -540,7 +630,7 @@ impl WindowedSortStream { streams, self.schema(), &[self.expression.clone()], - BaselineMetrics::new(&self.metrics, 0), + self.metrics.clone(), self.batch_size, fetch, reservation, @@ -562,10 +652,11 @@ impl Stream for WindowedSortStream { type Item = datafusion_common::Result; fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - self.poll_next_inner(cx) + let result = self.as_mut().poll_next_inner(cx); + self.metrics.record_poll(result) } } @@ -624,21 +715,22 @@ fn split_batch_to_sorted_run( /// Downcast a temporal array to a specific type /// /// usage similar to `downcast_primitive!` in `arrow-array` crate +#[macro_export] macro_rules! downcast_ts_array { ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) => { match $data_type { arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => { - $m!(TimestampSecondType $(, $args)*) + $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*) } arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => { - $m!(TimestampMillisecondType $(, $args)*) + $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*) } arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => { - $m!(TimestampMicrosecondType $(, $args)*) + $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*) } arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => { - $m!(TimestampNanosecondType $(, $args)*) + $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*) } $($p => $fallback,)* } @@ -711,7 +803,7 @@ fn find_slice_from_range( } macro_rules! array_iter_helper { - ($t:ty, $arr:expr) => {{ + ($t:ty, $unit:expr, $arr:expr) => {{ let typed = $arr.as_any().downcast_ref::>().unwrap(); let iter = typed.iter().enumerate(); Box::new(iter) as Box)>> @@ -993,7 +1085,8 @@ fn compute_all_working_ranges( match &mut cur_range_set { None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))), Some((working_range, working_set)) => { - // if next overlap range have Partition that's is not last one in `working_set`(hence need to be read before merge sorting), and `working_set` have >1 count + // if next overlap range have Partition that's is not last one in `working_set`(hence need + // to be read before merge sorting), and `working_set` have >1 count // we have to expand current working range to cover it(and add it's `set` to `working_set`) // so that merge sort is possible let need_expand = { @@ -2410,7 +2503,7 @@ mod test { let exec = WindowedSortExec::try_new( self.expression.clone(), self.fetch, - ranges, + vec![ranges], Arc::new(mock_input), ) .unwrap(); @@ -2441,6 +2534,78 @@ mod test { vec![], vec![], ), + TestStream::new( + Column::new("ts", 0), + SortOptions { + descending: false, + nulls_first: true, + }, + None, + vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )], + vec![ + // test one empty + ( + PartitionRange { + start: Timestamp::new_millisecond(1), + end: Timestamp::new_millisecond(2), + num_rows: 1, + identifier: 0, + }, + vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))], + ), + ( + PartitionRange { + start: Timestamp::new_millisecond(1), + end: Timestamp::new_millisecond(3), + num_rows: 1, + identifier: 0, + }, + vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))], + ), + ], + vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values( + [2], + ))]], + ), + TestStream::new( + Column::new("ts", 0), + SortOptions { + descending: false, + nulls_first: true, + }, + None, + vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + )], + vec![ + // test one empty + ( + PartitionRange { + start: Timestamp::new_millisecond(1), + end: Timestamp::new_millisecond(2), + num_rows: 1, + identifier: 0, + }, + vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))], + ), + ( + PartitionRange { + start: Timestamp::new_millisecond(1), + end: Timestamp::new_millisecond(3), + num_rows: 1, + identifier: 0, + }, + vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))], + ), + ], + vec![], + ), TestStream::new( Column::new("ts", 0), SortOptions { diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 2a8a0a487f..785e66d37f 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -205,7 +205,7 @@ pub struct ScannerProperties { total_rows: usize, /// Whether to yield an empty batch to distinguish partition ranges. - distinguish_partition_range: bool, + pub distinguish_partition_range: bool, } impl ScannerProperties { @@ -227,12 +227,6 @@ impl ScannerProperties { self } - /// Sets distinguish partition range for scanner. - pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self { - self.distinguish_partition_range = distinguish_partition_range; - self - } - /// Creates a new [`ScannerProperties`] with the given partitioning. pub fn new(partitions: Vec>, append_mode: bool, total_rows: usize) -> Self { Self { @@ -274,7 +268,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Prepares the scanner with the given partition ranges. /// /// This method is for the planner to adjust the scanner's behavior based on the partition ranges. - fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError>; + fn prepare( + &mut self, + ranges: Vec>, + distinguish_partition_range: bool, + ) -> Result<(), BoxedError>; /// Scans the partition and returns a stream of record batches. /// @@ -441,8 +439,13 @@ impl RegionScanner for SinglePartitionScanner { self.schema.clone() } - fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { + fn prepare( + &mut self, + ranges: Vec>, + distinguish_partition_range: bool, + ) -> Result<(), BoxedError> { self.properties.partitions = ranges; + self.properties.distinguish_partition_range = distinguish_partition_range; Ok(()) } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index e67c6dc032..9207c1e86d 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -22,6 +22,7 @@ use common_error::ext::BoxedError; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream}; use common_telemetry::tracing::Span; use common_telemetry::tracing_context::TracingContext; +use common_telemetry::warn; use datafusion::error::Result as DfResult; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; @@ -49,6 +50,7 @@ pub struct RegionScanExec { properties: PlanProperties, append_mode: bool, total_rows: usize, + is_partition_set: bool, } impl RegionScanExec { @@ -77,15 +79,10 @@ impl RegionScanExec { properties, append_mode, total_rows, + is_partition_set: false, } } - /// Set the expected output ordering for the plan. - pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { - self.output_ordering = Some(output_ordering); - self - } - /// Get the partition ranges of the scanner. This method will collapse the ranges into /// a single vector. pub fn get_partition_ranges(&self) -> Vec { @@ -101,18 +98,33 @@ impl RegionScanExec { ranges } + /// Similar to [`Self::get_partition_ranges`] but don't collapse the ranges. + pub fn get_uncollapsed_partition_ranges(&self) -> Vec> { + let scanner = self.scanner.lock().unwrap(); + scanner.properties().partitions.clone() + } + + pub fn is_partition_set(&self) -> bool { + self.is_partition_set + } + /// Update the partition ranges of underlying scanner. pub fn with_new_partitions( &self, partitions: Vec>, ) -> Result { + if self.is_partition_set { + warn!("Setting partition ranges more than once for RegionScanExec"); + } + let num_partitions = partitions.len(); let mut properties = self.properties.clone(); properties.partitioning = Partitioning::UnknownPartitioning(num_partitions); { let mut scanner = self.scanner.lock().unwrap(); - scanner.prepare(partitions)?; + let distinguish_partition_range = scanner.properties().distinguish_partition_range(); + scanner.prepare(partitions, distinguish_partition_range)?; } Ok(Self { @@ -123,8 +135,25 @@ impl RegionScanExec { properties, append_mode: self.append_mode, total_rows: self.total_rows, + is_partition_set: true, }) } + + pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) { + let mut scanner = self.scanner.lock().unwrap(); + let partition_ranges = scanner.properties().partitions.clone(); + // set distinguish_partition_range won't fail + let _ = scanner.prepare(partition_ranges, distinguish_partition_range); + } + + pub fn time_index(&self) -> Option { + self.scanner + .lock() + .unwrap() + .schema() + .timestamp_column() + .map(|x| x.name.clone()) + } } impl ExecutionPlan for RegionScanExec { diff --git a/tests-fuzz/targets/fuzz_insert_logical_table.rs b/tests-fuzz/targets/fuzz_insert_logical_table.rs index abafdef9a8..ee2035acd6 100644 --- a/tests-fuzz/targets/fuzz_insert_logical_table.rs +++ b/tests-fuzz/targets/fuzz_insert_logical_table.rs @@ -189,6 +189,7 @@ async fn validate_values( "SELECT {} FROM {} ORDER BY {}", column_list, logical_table_ctx.name, primary_keys_column_list ); + info!("Select SQL: {select_sql}"); let fetched_rows = validator::row::fetch_values(&ctx.greptime, select_sql.as_str()).await?; let mut expected_rows = replace_default(&insert_expr.values_list, &logical_table_ctx, insert_expr); @@ -214,6 +215,8 @@ async fn insert_values( .await .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Insert values, result: {result:?}"); + ensure!( result.rows_affected() == rows as u64, error::AssertSnafu { diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result new file mode 100644 index 0000000000..1cc0ab7720 --- /dev/null +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -0,0 +1,81 @@ +CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +INSERT INTO test VALUES (1, 1), (NULL, 2), (1, 3); + +Affected Rows: 3 + +ADMIN FLUSH_TABLE('test'); + ++---------------------------+ +| ADMIN FLUSH_TABLE('test') | ++---------------------------+ +| 0 | ++---------------------------+ + +INSERT INTO test VALUES (2, 4), (2, 5), (NULL, 6); + +Affected Rows: 3 + +ADMIN FLUSH_TABLE('test'); + ++---------------------------+ +| ADMIN FLUSH_TABLE('test') | ++---------------------------+ +| 0 | ++---------------------------+ + +INSERT INTO test VALUES (3, 7), (3, 8), (3, 9); + +Affected Rows: 3 + +ADMIN FLUSH_TABLE('test'); + ++---------------------------+ +| ADMIN FLUSH_TABLE('test') | ++---------------------------+ +| 0 | ++---------------------------+ + +INSERT INTO test VALUES (4, 10), (4, 11), (4, 12); + +Affected Rows: 3 + +SELECT * FROM test ORDER BY t LIMIT 5; + ++---+-------------------------+ +| i | t | ++---+-------------------------+ +| 1 | 1970-01-01T00:00:00.001 | +| | 1970-01-01T00:00:00.002 | +| 1 | 1970-01-01T00:00:00.003 | +| 2 | 1970-01-01T00:00:00.004 | +| 2 | 1970-01-01T00:00:00.005 | ++---+-------------------------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED +|_|_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST] REDACTED +|_|_|_WindowedSortExec REDACTED +|_|_|_PartSortExec t@1 ASC NULLS LAST REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 5_| ++-+-+-+ + +DROP TABLE test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/order/windowed_sort.sql b/tests/cases/standalone/common/order/windowed_sort.sql new file mode 100644 index 0000000000..7767825e3d --- /dev/null +++ b/tests/cases/standalone/common/order/windowed_sort.sql @@ -0,0 +1,26 @@ +CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX); + +INSERT INTO test VALUES (1, 1), (NULL, 2), (1, 3); + +ADMIN FLUSH_TABLE('test'); + +INSERT INTO test VALUES (2, 4), (2, 5), (NULL, 6); + +ADMIN FLUSH_TABLE('test'); + +INSERT INTO test VALUES (3, 7), (3, 8), (3, 9); + +ADMIN FLUSH_TABLE('test'); + +INSERT INTO test VALUES (4, 10), (4, 11), (4, 12); + +SELECT * FROM test ORDER BY t LIMIT 5; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5; + +DROP TABLE test; diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index bcaf39803f..4cdb5a8790 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -183,6 +183,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_| | physical_plan after ProjectionPushdown_| SAME TEXT AS ABOVE_| | physical_plan after PipelineChecker_| SAME TEXT AS ABOVE_| +| physical_plan after WindowedSortRule_| SAME TEXT AS ABOVE_| | physical_plan after RemoveDuplicateRule_| SAME TEXT AS ABOVE_| | physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|