mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
feat: enhance windowed-sort optimizer rule (#4910)
* add RegionScanner::metadata Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * skip PartSort when there is no tag column Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * add more sqlness test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * handle desc Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: should keep part sort on DESC Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix clippy Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -91,8 +91,9 @@ impl RegionEngine for FileRegionEngine {
|
||||
request: ScanRequest,
|
||||
) -> Result<RegionScannerRef, BoxedError> {
|
||||
let stream = self.handle_query(region_id, request).await?;
|
||||
let metadata = self.get_metadata(region_id).await?;
|
||||
// We don't support enabling append mode for file engine.
|
||||
let scanner = Box::new(SinglePartitionScanner::new(stream, false));
|
||||
let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
|
||||
Ok(scanner)
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ use common_telemetry::tracing;
|
||||
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
|
||||
use store_api::storage::TimeSeriesRowSelector;
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -321,6 +322,10 @@ impl RegionScanner for SeqScan {
|
||||
let predicate = self.stream_ctx.input.predicate();
|
||||
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
|
||||
}
|
||||
|
||||
fn metadata(&self) -> RegionMetadataRef {
|
||||
self.stream_ctx.input.mapper.metadata().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SeqScan {
|
||||
|
||||
@@ -26,6 +26,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures::{Stream, StreamExt};
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
|
||||
|
||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||
@@ -229,6 +230,10 @@ impl RegionScanner for UnorderedScan {
|
||||
let predicate = self.stream_ctx.input.predicate();
|
||||
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
|
||||
}
|
||||
|
||||
fn metadata(&self) -> RegionMetadataRef {
|
||||
self.stream_ctx.input.mapper.metadata().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for UnorderedScan {
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::sync::Arc;
|
||||
use common_telemetry::debug;
|
||||
use datafusion::config::ConfigOptions;
|
||||
use datafusion::physical_optimizer::PhysicalOptimizerRule;
|
||||
use datafusion::physical_plan::sorts::sort::SortExec;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_common::{DataFusionError, Result};
|
||||
@@ -48,9 +49,16 @@ impl ParallelizeScan {
|
||||
plan: Arc<dyn ExecutionPlan>,
|
||||
config: &ConfigOptions,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let mut first_order_expr = None;
|
||||
|
||||
let result = plan
|
||||
.transform_down(|plan| {
|
||||
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
|
||||
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
|
||||
// save the first order expr
|
||||
first_order_expr = sort_exec.expr().first().cloned();
|
||||
} else if let Some(region_scan_exec) =
|
||||
plan.as_any().downcast_ref::<RegionScanExec>()
|
||||
{
|
||||
if region_scan_exec.is_partition_set() {
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
@@ -66,10 +74,21 @@ impl ParallelizeScan {
|
||||
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
|
||||
);
|
||||
|
||||
// sort the ranges in each partition
|
||||
// TODO(ruihang): smart sort!
|
||||
for ranges in partition_ranges.iter_mut() {
|
||||
ranges.sort_by(|a, b| a.start.cmp(&b.start));
|
||||
// Sort the ranges in each partition based on the order expr
|
||||
//
|
||||
// This optimistically assumes that the first order expr is on the time index column
|
||||
// to skip the validation of the order expr. As it's not harmful if this condition
|
||||
// is not met.
|
||||
if let Some(order_expr) = &first_order_expr
|
||||
&& order_expr.options.descending
|
||||
{
|
||||
for ranges in partition_ranges.iter_mut() {
|
||||
ranges.sort_by(|a, b| b.end.cmp(&a.end));
|
||||
}
|
||||
} else {
|
||||
for ranges in partition_ranges.iter_mut() {
|
||||
ranges.sort_by(|a, b| a.start.cmp(&b.start));
|
||||
}
|
||||
}
|
||||
|
||||
// update the partition ranges
|
||||
|
||||
@@ -77,7 +77,6 @@ impl WindowedSortPhysicalRule {
|
||||
};
|
||||
|
||||
if let Some(first_sort_expr) = sort_exec.expr().first()
|
||||
&& !first_sort_expr.options.descending
|
||||
&& let Some(column_expr) = first_sort_expr
|
||||
.expr
|
||||
.as_any()
|
||||
@@ -87,18 +86,28 @@ impl WindowedSortPhysicalRule {
|
||||
} else {
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
|
||||
let first_sort_expr = sort_exec.expr().first().unwrap().clone();
|
||||
let part_sort_exec = Arc::new(PartSortExec::new(
|
||||
first_sort_expr.clone(),
|
||||
scanner_info.partition_ranges.clone(),
|
||||
sort_exec.input().clone(),
|
||||
));
|
||||
|
||||
// PartSortExec is unnecessary if:
|
||||
// - there is no tag column, and
|
||||
// - the sort is ascending on the time index column
|
||||
let new_input = if scanner_info.tag_columns.is_empty()
|
||||
&& !first_sort_expr.options.descending
|
||||
{
|
||||
sort_exec.input().clone()
|
||||
} else {
|
||||
Arc::new(PartSortExec::new(
|
||||
first_sort_expr.clone(),
|
||||
scanner_info.partition_ranges.clone(),
|
||||
sort_exec.input().clone(),
|
||||
))
|
||||
};
|
||||
|
||||
let windowed_sort_exec = WindowedSortExec::try_new(
|
||||
first_sort_expr,
|
||||
sort_exec.fetch(),
|
||||
scanner_info.partition_ranges,
|
||||
part_sort_exec,
|
||||
new_input,
|
||||
)?;
|
||||
|
||||
return Ok(Transformed {
|
||||
@@ -119,11 +128,13 @@ impl WindowedSortPhysicalRule {
|
||||
struct ScannerInfo {
|
||||
partition_ranges: Vec<Vec<PartitionRange>>,
|
||||
time_index: String,
|
||||
tag_columns: Vec<String>,
|
||||
}
|
||||
|
||||
fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
|
||||
let mut partition_ranges = None;
|
||||
let mut time_index = None;
|
||||
let mut tag_columns = None;
|
||||
|
||||
input.transform_up(|plan| {
|
||||
// Unappliable case, reset the state.
|
||||
@@ -139,6 +150,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
||||
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
|
||||
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
|
||||
time_index = region_scan_exec.time_index();
|
||||
tag_columns = Some(region_scan_exec.tag_columns());
|
||||
|
||||
// set distinguish_partition_ranges to true, this is an incorrect workaround
|
||||
region_scan_exec.with_distinguish_partition_range(true);
|
||||
@@ -151,6 +163,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
||||
ScannerInfo {
|
||||
partition_ranges: partition_ranges?,
|
||||
time_index: time_index?,
|
||||
tag_columns: tag_columns?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -175,6 +175,7 @@ struct PartSortStream {
|
||||
input_complete: bool,
|
||||
schema: SchemaRef,
|
||||
partition_ranges: Vec<PartitionRange>,
|
||||
#[allow(dead_code)] // this is used under #[debug_assertions]
|
||||
partition: usize,
|
||||
cur_part_idx: usize,
|
||||
metrics: BaselineMetrics,
|
||||
|
||||
@@ -270,6 +270,7 @@ pub struct WindowedSortStream {
|
||||
/// working ranges promise once input stream get a value out of current range, future values will never be in this range
|
||||
all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
|
||||
/// The input partition ranges
|
||||
#[allow(dead_code)] // this is used under #[debug_assertions]
|
||||
ranges: Vec<PartitionRange>,
|
||||
/// Execution metrics
|
||||
metrics: BaselineMetrics,
|
||||
|
||||
@@ -265,6 +265,9 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
|
||||
/// Returns the schema of the record batches.
|
||||
fn schema(&self) -> SchemaRef;
|
||||
|
||||
/// Returns the metadata of the region.
|
||||
fn metadata(&self) -> RegionMetadataRef;
|
||||
|
||||
/// Prepares the scanner with the given partition ranges.
|
||||
///
|
||||
/// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
|
||||
@@ -414,11 +417,16 @@ pub struct SinglePartitionScanner {
|
||||
stream: Mutex<Option<SendableRecordBatchStream>>,
|
||||
schema: SchemaRef,
|
||||
properties: ScannerProperties,
|
||||
metadata: RegionMetadataRef,
|
||||
}
|
||||
|
||||
impl SinglePartitionScanner {
|
||||
/// Creates a new [SinglePartitionScanner] with the given stream.
|
||||
pub fn new(stream: SendableRecordBatchStream, append_mode: bool) -> Self {
|
||||
/// Creates a new [SinglePartitionScanner] with the given stream and metadata.
|
||||
pub fn new(
|
||||
stream: SendableRecordBatchStream,
|
||||
append_mode: bool,
|
||||
metadata: RegionMetadataRef,
|
||||
) -> Self {
|
||||
let schema = stream.schema();
|
||||
Self {
|
||||
stream: Mutex::new(Some(stream)),
|
||||
@@ -426,6 +434,7 @@ impl SinglePartitionScanner {
|
||||
properties: ScannerProperties::default()
|
||||
.with_parallelism(1)
|
||||
.with_append_mode(append_mode),
|
||||
metadata,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -468,6 +477,10 @@ impl RegionScanner for SinglePartitionScanner {
|
||||
fn has_predicate(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn metadata(&self) -> RegionMetadataRef {
|
||||
self.metadata.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for SinglePartitionScanner {
|
||||
|
||||
@@ -154,6 +154,16 @@ impl RegionScanExec {
|
||||
.timestamp_column()
|
||||
.map(|x| x.name.clone())
|
||||
}
|
||||
|
||||
pub fn tag_columns(&self) -> Vec<String> {
|
||||
self.scanner
|
||||
.lock()
|
||||
.unwrap()
|
||||
.metadata()
|
||||
.primary_key_columns()
|
||||
.map(|col| col.column_schema.name.clone())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecutionPlan for RegionScanExec {
|
||||
@@ -301,33 +311,45 @@ impl DfRecordBatchStream for StreamWithMetricWrapper {
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use datafusion::prelude::SessionContext;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::Int32Vector;
|
||||
use datatypes::vectors::{Int32Vector, TimestampMillisecondVector};
|
||||
use futures::TryStreamExt;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::region_engine::SinglePartitionScanner;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_table_scan() {
|
||||
let ctx = SessionContext::new();
|
||||
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
|
||||
"a",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
false,
|
||||
)]));
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
"b",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
]));
|
||||
|
||||
let batch1 = RecordBatch::new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Vector::from_slice([1, 2])) as _],
|
||||
vec![
|
||||
Arc::new(Int32Vector::from_slice([1, 2])) as _,
|
||||
Arc::new(TimestampMillisecondVector::from_slice([1000, 2000])) as _,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let batch2 = RecordBatch::new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Vector::from_slice([3, 4, 5])) as _],
|
||||
vec![
|
||||
Arc::new(Int32Vector::from_slice([3, 4, 5])) as _,
|
||||
Arc::new(TimestampMillisecondVector::from_slice([3000, 4000, 5000])) as _,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -335,7 +357,26 @@ mod test {
|
||||
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
|
||||
let stream = recordbatches.as_stream();
|
||||
|
||||
let scanner = Box::new(SinglePartitionScanner::new(stream, false));
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"b",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 2,
|
||||
})
|
||||
.primary_key(vec![1]);
|
||||
let region_metadata = Arc::new(builder.build().unwrap());
|
||||
|
||||
let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata));
|
||||
let plan = RegionScanExec::new(scanner);
|
||||
let actual: SchemaRef = Arc::new(
|
||||
plan.properties
|
||||
|
||||
Reference in New Issue
Block a user