diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 7b07870dcb..b3d207a8de 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -276,7 +276,14 @@ impl PlanRewriter { on_node = on_node.rewrite(&mut rewriter)?.data; // add merge scan as the new root - let mut node = MergeScanLogicalPlan::new(on_node, false).into_logical_plan(); + let mut node = MergeScanLogicalPlan::new( + on_node, + false, + // at this stage, the partition cols should be set + // treat it as non-partitioned if None + self.partition_cols.clone().unwrap_or_default(), + ) + .into_logical_plan(); // expand stages for new_stage in self.stage.drain(..) { diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index dd42406b5f..edbbedc8b7 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::sync::{Arc, Mutex}; use std::time::Duration; -use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions}; use async_stream::stream; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::BoxedError; @@ -28,16 +28,17 @@ use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream, }; use common_telemetry::tracing_context::TracingContext; -use datafusion::execution::TaskContext; +use datafusion::execution::{SessionState, TaskContext}; use datafusion::physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time, }; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, }; +use datafusion::physical_planner::DefaultPhysicalPlanner; use datafusion_common::Result; use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; @@ -59,6 +60,7 @@ pub struct MergeScanLogicalPlan { input: LogicalPlan, /// If this plan is a placeholder is_placeholder: bool, + partition_cols: Vec, } impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan { @@ -95,10 +97,11 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan { } impl MergeScanLogicalPlan { - pub fn new(input: LogicalPlan, is_placeholder: bool) -> Self { + pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec) -> Self { Self { input, is_placeholder, + partition_cols, } } @@ -149,6 +152,7 @@ impl std::fmt::Debug for MergeScanExec { impl MergeScanExec { pub fn new( + session_state: &SessionState, table: TableName, regions: Vec, plan: LogicalPlan, @@ -157,13 +161,52 @@ impl MergeScanExec { query_ctx: QueryContextRef, target_partition: usize, ) -> Result { + common_telemetry::info!("[DEBUG] input plan: {:?}", plan); // TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to // keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs. // Reconsider if it's possible to remove it. let arrow_schema = Arc::new(arrow_schema.clone()); + + // todo: fetch nearest output requirement, and partition columns (rule 8) + let mut sort_columns = vec![]; + + let eq_properties = if let LogicalPlan::Sort(sort) = &plan + && target_partition >= regions.len() + { + let lex_ordering = sort + .expr + .iter() + .map(|sort_expr| { + let physical_expr = session_state + .create_physical_expr(sort_expr.expr.clone(), plan.schema()) + .unwrap(); + sort_columns.push(physical_expr.clone()); + PhysicalSortExpr::new( + physical_expr, + SortOptions { + descending: !sort_expr.asc, + nulls_first: sort_expr.nulls_first, + }, + ) + }) + .collect(); + common_telemetry::info!("[DEBUG] lex_ordering: {:?}", lex_ordering); + EquivalenceProperties::new_with_orderings( + arrow_schema.clone(), + &[LexOrdering::new(lex_ordering)], + ) + } else { + EquivalenceProperties::new(arrow_schema.clone()) + }; + + sort_columns.pop(); + let partitioning = Partitioning::Hash(sort_columns, target_partition); + let properties = PlanProperties::new( - EquivalenceProperties::new(arrow_schema.clone()), - Partitioning::UnknownPartitioning(target_partition), + // EquivalenceProperties::new(arrow_schema.clone()), + eq_properties, + // Partitioning::UnknownPartitioning(target_partition), + partitioning, ExecutionMode::Bounded, ); let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?; diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 6f4f7831a9..f430bca7d1 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -22,6 +22,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datafusion::common::Result; use datafusion::datasource::DefaultTableSource; use datafusion::execution::context::SessionState; +use datafusion::execution::session_state; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; @@ -162,6 +163,7 @@ impl ExtensionPlanner for DistExtensionPlanner { .get_extension() .unwrap_or_else(QueryContext::arc); let merge_scan_plan = MergeScanExec::new( + session_state, table_name, regions, input_plan.clone(),