diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 08fb755ae5..cebfd877f1 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -37,7 +37,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Par use datafusion_common::{DataFusionError, Result, Statistics}; use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::PhysicalSortExpr; -use datatypes::schema::Schema; +use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; use snafu::ResultExt; @@ -113,6 +113,7 @@ pub struct MergeScanExec { table: TableName, peers: Vec, substrait_plan: Bytes, + schema: SchemaRef, arrow_schema: ArrowSchemaRef, clients: Arc, metric: ExecutionPlanMetricsSet, @@ -123,19 +124,21 @@ impl MergeScanExec { table: TableName, peers: Vec, substrait_plan: Bytes, - arrow_schema: ArrowSchemaRef, + arrow_schema: &ArrowSchema, clients: Arc, - ) -> Self { - // remove all metadata - let arrow_schema = Arc::new(ArrowSchema::new(arrow_schema.fields().to_vec())); - Self { + ) -> Result { + let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema); + let schema_without_metadata = + Self::arrow_schema_to_schema(arrow_schema_without_metadata.clone())?; + Ok(Self { table, peers, substrait_plan, - arrow_schema, + schema: schema_without_metadata, + arrow_schema: arrow_schema_without_metadata, clients, metric: ExecutionPlanMetricsSet::new(), - } + }) } pub fn to_stream(&self, context: Arc) -> Result { @@ -187,24 +190,37 @@ impl MergeScanExec { }; Ok(Box::pin(RecordBatchStreamAdaptor { - schema: Arc::new( - self.arrow_schema - .clone() - .try_into() - .context(ConvertSchemaSnafu)?, - ), + schema: self.schema.clone(), stream: Box::pin(stream), output_ordering: None, })) } fn remove_metadata_from_record_batch(batch: RecordBatch) -> RecordBatch { - let schema = ArrowSchema::new(batch.schema.arrow_schema().fields().to_vec()); - RecordBatch::new( - Arc::new(Schema::try_from(schema).unwrap()), - batch.columns().iter().cloned(), - ) - .unwrap() + let arrow_schema = batch.schema.arrow_schema().as_ref(); + let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema); + let schema_without_metadata = + Self::arrow_schema_to_schema(arrow_schema_without_metadata).unwrap(); + RecordBatch::new(schema_without_metadata, batch.columns().iter().cloned()).unwrap() + } + + fn arrow_schema_without_metadata(arrow_schema: &ArrowSchema) -> ArrowSchemaRef { + Arc::new(ArrowSchema::new( + arrow_schema + .fields() + .iter() + .map(|field| { + let field = field.as_ref().clone(); + let field_without_metadata = field.with_metadata(Default::default()); + Arc::new(field_without_metadata) + }) + .collect::>(), + )) + } + + fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result { + let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?; + Ok(Arc::new(schema)) } } diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index f24e2cfe61..a5261e1757 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; -use common_base::bytes::Bytes; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::peer::Peer; use common_meta::table_name::TableName; @@ -72,66 +71,63 @@ impl ExtensionPlanner for DistExtensionPlanner { _physical_inputs: &[Arc], session_state: &SessionState, ) -> Result>> { - let maybe_merge_scan = { node.as_any().downcast_ref::() }; - if let Some(merge_scan) = maybe_merge_scan { - if merge_scan.is_placeholder() { - // ignore placeholder - planner - .create_physical_plan(merge_scan.input(), session_state) - .await - .map(Some) - } else { - // TODO(ruihang): generate different execution plans for different variant merge operation - let input_plan = merge_scan.input(); - let optimized_input = - self.optimize_input_logical_plan(session_state, input_plan)?; - let input_physical_plan = planner - .create_physical_plan(&optimized_input, session_state) - .await?; - let Some(table_name) = self.get_table_name(input_plan)? else { - // no relation found in input plan, going to execute them locally - return Ok(Some(input_physical_plan)); - }; + let Some(merge_scan) = node.as_any().downcast_ref::() else { + return Ok(None); + }; - let input_schema = input_physical_plan.schema().clone(); - let input_plan = self.set_table_name(&table_name, input_plan.clone())?; - let substrait_plan: Bytes = DFLogicalSubstraitConvertor - .encode(&input_plan) - .context(error::EncodeSubstraitLogicalPlanSnafu)? - .into(); - let peers = self.get_peers(&table_name).await; - match peers { - Ok(peers) => { - let exec = MergeScanExec::new( - table_name, - peers, - substrait_plan, - input_schema, - self.clients.clone(), - ); + let input_plan = merge_scan.input(); + let fallback = |logical_plan| async move { + planner + .create_physical_plan(logical_plan, session_state) + .await + .map(Some) + }; - Ok(Some(Arc::new(exec) as _)) - } - Err(_) => Ok(Some(input_physical_plan)), - } - } - } else { - Ok(None) + if merge_scan.is_placeholder() { + // ignore placeholder + return fallback(input_plan).await; } + + let optimized_plan = self.optimize_input_logical_plan(session_state, input_plan)?; + let Some(table_name) = Self::extract_full_table_name(input_plan)? else { + // no relation found in input plan, going to execute them locally + return fallback(&optimized_plan).await; + }; + + let Ok(peers) = self.get_peers(&table_name).await else { + // no peers found, going to execute them locally + return fallback(&optimized_plan).await; + }; + + // TODO(ruihang): generate different execution plans for different variant merge operation + let schema = optimized_plan.schema().as_ref().into(); + // Pass down the original plan, allow execution nodes to do their optimization + let amended_plan = Self::plan_with_full_table_name(input_plan.clone(), &table_name)?; + let substrait_plan = DFLogicalSubstraitConvertor + .encode(&amended_plan) + .context(error::EncodeSubstraitLogicalPlanSnafu)? + .into(); + let merge_scan_plan = MergeScanExec::new( + table_name, + peers, + substrait_plan, + &schema, + self.clients.clone(), + )?; + Ok(Some(Arc::new(merge_scan_plan) as _)) } } impl DistExtensionPlanner { - /// Extract table name from logical plan - fn get_table_name(&self, plan: &LogicalPlan) -> Result> { + /// Extract fully resolved table name from logical plan + fn extract_full_table_name(plan: &LogicalPlan) -> Result> { let mut extractor = TableNameExtractor::default(); let _ = plan.visit(&mut extractor)?; Ok(extractor.table_name) } - /// Set the fully resolved table name to TableScan plan - fn set_table_name(&self, name: &TableName, plan: LogicalPlan) -> Result { - // let mut rewriter + /// Apply the fully resolved table name to the TableScan plan + fn plan_with_full_table_name(plan: LogicalPlan, name: &TableName) -> Result { plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name)) }