mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 17:30:41 +00:00
refactor: prevent dist table from invoking scan (#2179)
* refactor: prevent dist table from invoking `scan` * refactor: reorg code Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: add comment Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -37,7 +37,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Par
|
||||
use datafusion_common::{DataFusionError, Result, Statistics};
|
||||
use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
|
||||
use datafusion_physical_expr::PhysicalSortExpr;
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use futures_util::StreamExt;
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -113,6 +113,7 @@ pub struct MergeScanExec {
|
||||
table: TableName,
|
||||
peers: Vec<Peer>,
|
||||
substrait_plan: Bytes,
|
||||
schema: SchemaRef,
|
||||
arrow_schema: ArrowSchemaRef,
|
||||
clients: Arc<DatanodeClients>,
|
||||
metric: ExecutionPlanMetricsSet,
|
||||
@@ -123,19 +124,21 @@ impl MergeScanExec {
|
||||
table: TableName,
|
||||
peers: Vec<Peer>,
|
||||
substrait_plan: Bytes,
|
||||
arrow_schema: ArrowSchemaRef,
|
||||
arrow_schema: &ArrowSchema,
|
||||
clients: Arc<DatanodeClients>,
|
||||
) -> Self {
|
||||
// remove all metadata
|
||||
let arrow_schema = Arc::new(ArrowSchema::new(arrow_schema.fields().to_vec()));
|
||||
Self {
|
||||
) -> Result<Self> {
|
||||
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
|
||||
let schema_without_metadata =
|
||||
Self::arrow_schema_to_schema(arrow_schema_without_metadata.clone())?;
|
||||
Ok(Self {
|
||||
table,
|
||||
peers,
|
||||
substrait_plan,
|
||||
arrow_schema,
|
||||
schema: schema_without_metadata,
|
||||
arrow_schema: arrow_schema_without_metadata,
|
||||
clients,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
|
||||
@@ -187,24 +190,37 @@ impl MergeScanExec {
|
||||
};
|
||||
|
||||
Ok(Box::pin(RecordBatchStreamAdaptor {
|
||||
schema: Arc::new(
|
||||
self.arrow_schema
|
||||
.clone()
|
||||
.try_into()
|
||||
.context(ConvertSchemaSnafu)?,
|
||||
),
|
||||
schema: self.schema.clone(),
|
||||
stream: Box::pin(stream),
|
||||
output_ordering: None,
|
||||
}))
|
||||
}
|
||||
|
||||
fn remove_metadata_from_record_batch(batch: RecordBatch) -> RecordBatch {
|
||||
let schema = ArrowSchema::new(batch.schema.arrow_schema().fields().to_vec());
|
||||
RecordBatch::new(
|
||||
Arc::new(Schema::try_from(schema).unwrap()),
|
||||
batch.columns().iter().cloned(),
|
||||
)
|
||||
.unwrap()
|
||||
let arrow_schema = batch.schema.arrow_schema().as_ref();
|
||||
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
|
||||
let schema_without_metadata =
|
||||
Self::arrow_schema_to_schema(arrow_schema_without_metadata).unwrap();
|
||||
RecordBatch::new(schema_without_metadata, batch.columns().iter().cloned()).unwrap()
|
||||
}
|
||||
|
||||
fn arrow_schema_without_metadata(arrow_schema: &ArrowSchema) -> ArrowSchemaRef {
|
||||
Arc::new(ArrowSchema::new(
|
||||
arrow_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.map(|field| {
|
||||
let field = field.as_ref().clone();
|
||||
let field_without_metadata = field.with_metadata(Default::default());
|
||||
Arc::new(field_without_metadata)
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
))
|
||||
}
|
||||
|
||||
fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
|
||||
let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
|
||||
Ok(Arc::new(schema))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::client_manager::DatanodeClients;
|
||||
use common_base::bytes::Bytes;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::table_name::TableName;
|
||||
@@ -72,66 +71,63 @@ impl ExtensionPlanner for DistExtensionPlanner {
|
||||
_physical_inputs: &[Arc<dyn ExecutionPlan>],
|
||||
session_state: &SessionState,
|
||||
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
|
||||
let maybe_merge_scan = { node.as_any().downcast_ref::<MergeScanLogicalPlan>() };
|
||||
if let Some(merge_scan) = maybe_merge_scan {
|
||||
if merge_scan.is_placeholder() {
|
||||
// ignore placeholder
|
||||
planner
|
||||
.create_physical_plan(merge_scan.input(), session_state)
|
||||
.await
|
||||
.map(Some)
|
||||
} else {
|
||||
// TODO(ruihang): generate different execution plans for different variant merge operation
|
||||
let input_plan = merge_scan.input();
|
||||
let optimized_input =
|
||||
self.optimize_input_logical_plan(session_state, input_plan)?;
|
||||
let input_physical_plan = planner
|
||||
.create_physical_plan(&optimized_input, session_state)
|
||||
.await?;
|
||||
let Some(table_name) = self.get_table_name(input_plan)? else {
|
||||
// no relation found in input plan, going to execute them locally
|
||||
return Ok(Some(input_physical_plan));
|
||||
};
|
||||
let Some(merge_scan) = node.as_any().downcast_ref::<MergeScanLogicalPlan>() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let input_schema = input_physical_plan.schema().clone();
|
||||
let input_plan = self.set_table_name(&table_name, input_plan.clone())?;
|
||||
let substrait_plan: Bytes = DFLogicalSubstraitConvertor
|
||||
.encode(&input_plan)
|
||||
.context(error::EncodeSubstraitLogicalPlanSnafu)?
|
||||
.into();
|
||||
let peers = self.get_peers(&table_name).await;
|
||||
match peers {
|
||||
Ok(peers) => {
|
||||
let exec = MergeScanExec::new(
|
||||
table_name,
|
||||
peers,
|
||||
substrait_plan,
|
||||
input_schema,
|
||||
self.clients.clone(),
|
||||
);
|
||||
let input_plan = merge_scan.input();
|
||||
let fallback = |logical_plan| async move {
|
||||
planner
|
||||
.create_physical_plan(logical_plan, session_state)
|
||||
.await
|
||||
.map(Some)
|
||||
};
|
||||
|
||||
Ok(Some(Arc::new(exec) as _))
|
||||
}
|
||||
Err(_) => Ok(Some(input_physical_plan)),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
if merge_scan.is_placeholder() {
|
||||
// ignore placeholder
|
||||
return fallback(input_plan).await;
|
||||
}
|
||||
|
||||
let optimized_plan = self.optimize_input_logical_plan(session_state, input_plan)?;
|
||||
let Some(table_name) = Self::extract_full_table_name(input_plan)? else {
|
||||
// no relation found in input plan, going to execute them locally
|
||||
return fallback(&optimized_plan).await;
|
||||
};
|
||||
|
||||
let Ok(peers) = self.get_peers(&table_name).await else {
|
||||
// no peers found, going to execute them locally
|
||||
return fallback(&optimized_plan).await;
|
||||
};
|
||||
|
||||
// TODO(ruihang): generate different execution plans for different variant merge operation
|
||||
let schema = optimized_plan.schema().as_ref().into();
|
||||
// Pass down the original plan, allow execution nodes to do their optimization
|
||||
let amended_plan = Self::plan_with_full_table_name(input_plan.clone(), &table_name)?;
|
||||
let substrait_plan = DFLogicalSubstraitConvertor
|
||||
.encode(&amended_plan)
|
||||
.context(error::EncodeSubstraitLogicalPlanSnafu)?
|
||||
.into();
|
||||
let merge_scan_plan = MergeScanExec::new(
|
||||
table_name,
|
||||
peers,
|
||||
substrait_plan,
|
||||
&schema,
|
||||
self.clients.clone(),
|
||||
)?;
|
||||
Ok(Some(Arc::new(merge_scan_plan) as _))
|
||||
}
|
||||
}
|
||||
|
||||
impl DistExtensionPlanner {
|
||||
/// Extract table name from logical plan
|
||||
fn get_table_name(&self, plan: &LogicalPlan) -> Result<Option<TableName>> {
|
||||
/// Extract fully resolved table name from logical plan
|
||||
fn extract_full_table_name(plan: &LogicalPlan) -> Result<Option<TableName>> {
|
||||
let mut extractor = TableNameExtractor::default();
|
||||
let _ = plan.visit(&mut extractor)?;
|
||||
Ok(extractor.table_name)
|
||||
}
|
||||
|
||||
/// Set the fully resolved table name to TableScan plan
|
||||
fn set_table_name(&self, name: &TableName, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
// let mut rewriter
|
||||
/// Apply the fully resolved table name to the TableScan plan
|
||||
fn plan_with_full_table_name(plan: LogicalPlan, name: &TableName) -> Result<LogicalPlan> {
|
||||
plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user