Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-03-08 03:44:41 +08:00
parent c425be2785
commit db58884236
3 changed files with 59 additions and 7 deletions

View File

@@ -276,7 +276,14 @@ impl PlanRewriter {
on_node = on_node.rewrite(&mut rewriter)?.data;
// add merge scan as the new root
let mut node = MergeScanLogicalPlan::new(on_node, false).into_logical_plan();
let mut node = MergeScanLogicalPlan::new(
on_node,
false,
// at this stage, the partition cols should be set
// treat it as non-partitioned if None
self.partition_cols.clone().unwrap_or_default(),
)
.into_logical_plan();
// expand stages
for new_stage in self.stage.drain(..) {

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
use async_stream::stream;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::BoxedError;
@@ -28,16 +28,17 @@ use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::TaskContext;
use datafusion::execution::{SessionState, TaskContext};
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
};
use datafusion::physical_planner::DefaultPhysicalPlanner;
use datafusion_common::Result;
use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
@@ -59,6 +60,7 @@ pub struct MergeScanLogicalPlan {
input: LogicalPlan,
/// If this plan is a placeholder
is_placeholder: bool,
partition_cols: Vec<String>,
}
impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
@@ -95,10 +97,11 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
}
impl MergeScanLogicalPlan {
pub fn new(input: LogicalPlan, is_placeholder: bool) -> Self {
pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
Self {
input,
is_placeholder,
partition_cols,
}
}
@@ -149,6 +152,7 @@ impl std::fmt::Debug for MergeScanExec {
impl MergeScanExec {
pub fn new(
session_state: &SessionState,
table: TableName,
regions: Vec<RegionId>,
plan: LogicalPlan,
@@ -157,13 +161,52 @@ impl MergeScanExec {
query_ctx: QueryContextRef,
target_partition: usize,
) -> Result<Self> {
common_telemetry::info!("[DEBUG] input plan: {:?}", plan);
// TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
// keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
// Reconsider if it's possible to remove it.
let arrow_schema = Arc::new(arrow_schema.clone());
// todo: fetch nearest output requirement, and partition columns (rule 8)
let mut sort_columns = vec![];
let eq_properties = if let LogicalPlan::Sort(sort) = &plan
&& target_partition >= regions.len()
{
let lex_ordering = sort
.expr
.iter()
.map(|sort_expr| {
let physical_expr = session_state
.create_physical_expr(sort_expr.expr.clone(), plan.schema())
.unwrap();
sort_columns.push(physical_expr.clone());
PhysicalSortExpr::new(
physical_expr,
SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
)
})
.collect();
common_telemetry::info!("[DEBUG] lex_ordering: {:?}", lex_ordering);
EquivalenceProperties::new_with_orderings(
arrow_schema.clone(),
&[LexOrdering::new(lex_ordering)],
)
} else {
EquivalenceProperties::new(arrow_schema.clone())
};
sort_columns.pop();
let partitioning = Partitioning::Hash(sort_columns, target_partition);
let properties = PlanProperties::new(
EquivalenceProperties::new(arrow_schema.clone()),
Partitioning::UnknownPartitioning(target_partition),
// EquivalenceProperties::new(arrow_schema.clone()),
eq_properties,
// Partitioning::UnknownPartitioning(target_partition),
partitioning,
ExecutionMode::Bounded,
);
let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;

View File

@@ -22,6 +22,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datafusion::common::Result;
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
@@ -162,6 +163,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
.get_extension()
.unwrap_or_else(QueryContext::arc);
let merge_scan_plan = MergeScanExec::new(
session_state,
table_name,
regions,
input_plan.clone(),