feat(query): better alias tracker (#6909)

* better resolve

Signed-off-by: discord9 <discord9@163.com>

feat: layered alias tracker

Signed-off-by: discord9 <discord9@163.com>

refactor

Signed-off-by: discord9 <discord9@163.com>

docs: expalin for no offset by one

Signed-off-by: discord9 <discord9@163.com>

test: more

Signed-off-by: discord9 <discord9@163.com>

simpify api

Signed-off-by: discord9 <discord9@163.com>

wip

Signed-off-by: discord9 <discord9@163.com>

fix: filter non-exist columns

Signed-off-by: discord9 <discord9@163.com>

feat: stuff

Signed-off-by: discord9 <discord9@163.com>

feat: cache partition columns

Signed-off-by: discord9 <discord9@163.com>

refactor: rm unused fn

Signed-off-by: discord9 <discord9@163.com>

no need res

Signed-off-by: discord9 <discord9@163.com>

chore: rm unwrap&docs update

Signed-off-by: discord9 <discord9@163.com>

* chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* fix: unsupport part

Signed-off-by: discord9 <discord9@163.com>

* err msg

Signed-off-by: discord9 <discord9@163.com>

* fix: pass correct partition cols

Signed-off-by: discord9 <discord9@163.com>

* fix? use column name only

Signed-off-by: discord9 <discord9@163.com>

* fix: merge scan has partition columns no alias/no partition diff

Signed-off-by: discord9 <discord9@163.com>

* refactor: loop instead of recursive

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* feat: overlaps

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-09-11 10:30:51 +08:00
committed by GitHub
parent ea8125aafb
commit 2bddbe8c47
7 changed files with 883 additions and 343 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashSet};
use std::sync::Arc;
use common_telemetry::debug;
@@ -32,6 +32,7 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::analyzer::utils::aliased_columns_for;
use crate::dist_plan::commutativity::{
Categorizer, Commutativity, partial_commutative_transformer,
};
@@ -46,7 +47,7 @@ mod test;
mod fallback;
mod utils;
pub(crate) use utils::{AliasMapping, AliasTracker};
pub(crate) use utils::AliasMapping;
#[derive(Debug, Clone)]
pub struct DistPlannerOptions {
@@ -229,8 +230,7 @@ struct PlanRewriter {
stage: Vec<LogicalPlan>,
status: RewriterStatus,
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
alias_tracker: Option<AliasTracker>,
partition_cols: Option<AliasMapping>,
/// use stack count as scope to determine column requirements is needed or not
/// i.e for a logical plan like:
/// ```ignore
@@ -311,7 +311,7 @@ impl PlanRewriter {
}
if self.expand_on_next_part_cond_trans_commutative {
let comm = Categorizer::check_plan(plan, self.get_aliased_partition_columns());
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
match comm {
Commutativity::PartialCommutative => {
// a small difference is that for partial commutative, we still need to
@@ -333,7 +333,7 @@ impl PlanRewriter {
}
}
match Categorizer::check_plan(plan, self.get_aliased_partition_columns()) {
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
@@ -427,49 +427,31 @@ impl PlanRewriter {
self.status = RewriterStatus::Unexpanded;
}
/// Maybe update alias for original table columns in the plan
fn maybe_update_alias(&mut self, node: &LogicalPlan) {
if let Some(alias_tracker) = &mut self.alias_tracker {
alias_tracker.update_alias(node);
debug!(
"Current partition columns are: {:?}",
self.get_aliased_partition_columns()
);
} else if let LogicalPlan::TableScan(table_scan) = node {
self.alias_tracker = AliasTracker::new(table_scan);
debug!(
"Initialize partition columns: {:?} with table={}",
self.get_aliased_partition_columns(),
table_scan.table_name
);
}
}
fn maybe_set_partitions(&mut self, plan: &LogicalPlan) -> DfResult<()> {
if let Some(part_cols) = &mut self.partition_cols {
// update partition alias
let child = plan.inputs().first().cloned().ok_or_else(|| {
datafusion_common::DataFusionError::Internal(format!(
"PlanRewriter: maybe_set_partitions: plan has no child: {plan}"
))
})?;
fn get_aliased_partition_columns(&self) -> Option<AliasMapping> {
if let Some(part_cols) = self.partition_cols.as_ref() {
let Some(alias_tracker) = &self.alias_tracker else {
// no alias tracker meaning no table scan encountered
return None;
};
let mut aliased = HashMap::new();
for part_col in part_cols {
let all_alias = alias_tracker
.get_all_alias_for_col(part_col)
.cloned()
.unwrap_or_default();
aliased.insert(part_col.clone(), all_alias);
for (_col_name, alias_set) in part_cols.iter_mut() {
let aliased_cols = aliased_columns_for(
&alias_set.clone().into_iter().collect(),
plan,
Some(child),
)?;
*alias_set = aliased_cols.into_values().flatten().collect();
}
Some(aliased)
} else {
None
}
}
fn maybe_set_partitions(&mut self, plan: &LogicalPlan) {
if self.partition_cols.is_some() {
// only need to set once
return;
debug!(
"PlanRewriter: maybe_set_partitions: updated partition columns: {:?} at plan: {}",
part_cols,
plan.display()
);
return Ok(());
}
if let LogicalPlan::TableScan(table_scan) = plan
@@ -507,9 +489,31 @@ impl PlanRewriter {
// This helps with distinguishing between non-partitioned table and partitioned table with all phy part cols not in logical table
partition_cols.push("__OTHER_PHYSICAL_PART_COLS_PLACEHOLDER__".to_string());
}
self.partition_cols = Some(partition_cols);
self.partition_cols = Some(
partition_cols
.into_iter()
.map(|c| {
let index =
plan.schema().index_of_column_by_name(None, &c).ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
format!(
"PlanRewriter: maybe_set_partitions: column {c} not found in schema of plan: {plan}"
),
)
})?;
let column = plan.schema().columns().get(index).cloned().ok_or_else(|| {
datafusion_common::DataFusionError::Internal(format!(
"PlanRewriter: maybe_set_partitions: column index {index} out of bounds in schema of plan: {plan}"
))
})?;
Ok((c.clone(), BTreeSet::from([column])))
})
.collect::<DfResult<AliasMapping>>()?,
);
}
}
Ok(())
}
/// pop one stack item and reduce the level by 1
@@ -537,6 +541,11 @@ impl PlanRewriter {
"PlanRewriter: after enforced column requirements with rewriter: {rewriter:?} for node:\n{on_node}"
);
debug!(
"PlanRewriter: expand on node: {on_node} with partition col alias mapping: {:?}",
self.partition_cols
);
// add merge scan as the new root
let mut node = MergeScanLogicalPlan::new(
on_node,
@@ -677,7 +686,6 @@ impl TreeNodeRewriter for PlanRewriter {
self.stage.clear();
self.set_unexpanded();
self.partition_cols = None;
self.alias_tracker = None;
Ok(Transformed::no(node))
}
@@ -698,9 +706,7 @@ impl TreeNodeRewriter for PlanRewriter {
return Ok(Transformed::no(node));
}
self.maybe_set_partitions(&node);
self.maybe_update_alias(&node);
self.maybe_set_partitions(&node)?;
let Some(parent) = self.get_parent() else {
debug!("Plan Rewriter: expand now for no parent found for node: {node}");

View File

@@ -17,14 +17,18 @@
//! This is a temporary solution, and will be removed once we have a more robust plan rewriter
//!
use std::collections::BTreeSet;
use common_telemetry::debug;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion_expr::LogicalPlan;
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::MergeScanLogicalPlan;
use crate::dist_plan::analyzer::AliasMapping;
/// FallbackPlanRewriter is a plan rewriter that will only push down table scan node
/// This is used when `PlanRewriter` produce errors when trying to rewrite the plan
@@ -38,9 +42,9 @@ impl TreeNodeRewriter for FallbackPlanRewriter {
fn f_down(
&mut self,
node: Self::Node,
) -> datafusion_common::Result<datafusion_common::tree_node::Transformed<Self::Node>> {
if let LogicalPlan::TableScan(table_scan) = &node {
plan: Self::Node,
) -> DfResult<datafusion_common::tree_node::Transformed<Self::Node>> {
if let LogicalPlan::TableScan(table_scan) = &plan {
let partition_cols = if let Some(source) = table_scan
.source
.as_any()
@@ -63,7 +67,25 @@ impl TreeNodeRewriter for FallbackPlanRewriter {
"FallbackPlanRewriter: table {} has partition columns: {:?}",
info.name, partition_cols
);
Some(partition_cols)
Some(partition_cols
.into_iter()
.map(|c| {
let index =
plan.schema().index_of_column_by_name(None, &c).ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
format!(
"PlanRewriter: maybe_set_partitions: column {c} not found in schema of plan: {plan}"
),
)
})?;
let column = plan.schema().columns().get(index).cloned().ok_or_else(|| {
datafusion_common::DataFusionError::Internal(format!(
"PlanRewriter: maybe_set_partitions: column index {index} out of bounds in schema of plan: {plan}"
))
})?;
Ok((c.clone(), BTreeSet::from([column])))
})
.collect::<DfResult<AliasMapping>>()?)
} else {
None
}
@@ -74,7 +96,7 @@ impl TreeNodeRewriter for FallbackPlanRewriter {
None
};
let node = MergeScanLogicalPlan::new(
node,
plan,
false,
// at this stage, the partition cols should be set
// treat it as non-partitioned if None
@@ -83,7 +105,7 @@ impl TreeNodeRewriter for FallbackPlanRewriter {
.into_logical_plan();
Ok(Transformed::yes(node))
} else {
Ok(Transformed::no(node))
Ok(Transformed::no(plan))
}
}
}

View File

@@ -156,7 +156,7 @@ impl Stream for EmptyStream {
fn expand_proj_sort_proj() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -203,7 +203,7 @@ fn expand_proj_sort_proj() {
fn expand_sort_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -237,7 +237,7 @@ fn expand_sort_limit() {
fn expand_sort_alias_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -276,7 +276,7 @@ fn expand_sort_alias_limit() {
fn expand_sort_alias_conflict_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -318,7 +318,7 @@ fn expand_sort_alias_conflict_limit() {
fn expand_sort_alias_conflict_but_not_really_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -358,7 +358,7 @@ fn expand_sort_alias_conflict_but_not_really_limit() {
fn expand_limit_sort() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -391,7 +391,7 @@ fn expand_limit_sort() {
fn expand_sort_limit_sort() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -438,7 +438,7 @@ fn expand_sort_limit_sort() {
fn expand_proj_step_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -473,7 +473,7 @@ fn expand_proj_step_aggr() {
fn expand_proj_alias_fake_part_col_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -517,7 +517,7 @@ fn expand_proj_alias_fake_part_col_aggr() {
fn expand_proj_alias_aliased_part_col_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -563,7 +563,7 @@ fn expand_proj_alias_aliased_part_col_aggr() {
fn expand_part_col_aggr_step_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -596,7 +596,7 @@ fn expand_part_col_aggr_step_aggr() {
fn expand_step_aggr_step_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -629,7 +629,7 @@ fn expand_step_aggr_step_aggr() {
fn expand_part_col_aggr_part_col_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -673,7 +673,7 @@ fn expand_part_col_aggr_part_col_aggr() {
fn expand_step_aggr_proj() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -709,7 +709,7 @@ fn expand_step_aggr_proj() {
fn expand_proj_sort_step_aggr_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -750,7 +750,7 @@ fn expand_proj_sort_step_aggr_limit() {
fn expand_proj_sort_limit_step_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -792,7 +792,7 @@ fn expand_proj_sort_limit_step_aggr() {
fn expand_proj_limit_step_aggr_sort() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -833,7 +833,7 @@ fn expand_proj_limit_step_aggr_sort() {
fn expand_proj_sort_part_col_aggr_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -875,7 +875,7 @@ fn expand_proj_sort_part_col_aggr_limit() {
fn expand_proj_sort_limit_part_col_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -917,7 +917,7 @@ fn expand_proj_sort_limit_part_col_aggr() {
fn expand_proj_part_col_aggr_limit_sort() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -959,7 +959,7 @@ fn expand_proj_part_col_aggr_limit_sort() {
fn expand_proj_part_col_aggr_sort_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -1002,7 +1002,7 @@ fn expand_proj_part_col_aggr_sort_limit() {
fn expand_proj_limit_part_col_aggr_sort() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -1044,7 +1044,7 @@ fn expand_proj_limit_part_col_aggr_sort() {
fn expand_proj_limit_sort_part_col_aggr() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -1087,7 +1087,7 @@ fn expand_proj_limit_sort_part_col_aggr() {
fn expand_step_aggr_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -1120,7 +1120,7 @@ fn expand_step_aggr_limit() {
fn expand_step_aggr_avg_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
@@ -1153,7 +1153,7 @@ fn expand_step_aggr_avg_limit() {
fn expand_part_col_aggr_limit() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));

File diff suppressed because it is too large Load Diff

View File

@@ -302,6 +302,10 @@ impl Categorizer {
/// Return true if the given expr and partition cols satisfied the rule.
/// In this case the plan can be treated as fully commutative.
///
/// So only if all partition columns show up in `exprs`, return true.
/// Otherwise return false.
///
fn check_partition(exprs: &[Expr], partition_cols: &AliasMapping) -> bool {
let mut ref_cols = HashSet::new();
for expr in exprs {

View File

@@ -52,6 +52,7 @@ use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::time::Instant;
use crate::dist_plan::analyzer::AliasMapping;
use crate::error::ConvertSchemaSnafu;
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
use crate::region_query::RegionQueryHandlerRef;
@@ -62,7 +63,7 @@ pub struct MergeScanLogicalPlan {
input: LogicalPlan,
/// If this plan is a placeholder
is_placeholder: bool,
partition_cols: Vec<String>,
partition_cols: AliasMapping,
}
impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
@@ -103,7 +104,7 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
}
impl MergeScanLogicalPlan {
pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
Self {
input,
is_placeholder,
@@ -130,7 +131,7 @@ impl MergeScanLogicalPlan {
&self.input
}
pub fn partition_cols(&self) -> &[String] {
pub fn partition_cols(&self) -> &AliasMapping {
&self.partition_cols
}
}
@@ -150,7 +151,7 @@ pub struct MergeScanExec {
partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
query_ctx: QueryContextRef,
target_partition: usize,
partition_cols: Vec<String>,
partition_cols: AliasMapping,
}
impl std::fmt::Debug for MergeScanExec {
@@ -175,7 +176,7 @@ impl MergeScanExec {
region_query_handler: RegionQueryHandlerRef,
query_ctx: QueryContextRef,
target_partition: usize,
partition_cols: Vec<String>,
partition_cols: AliasMapping,
) -> Result<Self> {
// TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
// keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
@@ -215,12 +216,18 @@ impl MergeScanExec {
let partition_exprs = partition_cols
.iter()
.filter_map(|col| {
session_state
.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(col)),
plan.schema(),
)
.ok()
if let Some(first_alias) = col.1.first() {
session_state
.create_physical_expr(
Expr::Column(ColumnExpr::new_unqualified(
first_alias.name().to_string(),
)),
plan.schema(),
)
.ok()
} else {
None
}
})
.collect();
let partitioning = Partitioning::Hash(partition_exprs, target_partition);
@@ -424,20 +431,20 @@ impl MergeScanExec {
return None;
}
let partition_cols = self
let all_partition_col_aliases: HashSet<_> = self
.partition_cols
.iter()
.map(|x| x.as_str())
.collect::<HashSet<_>>();
.values()
.flat_map(|aliases| aliases.iter().map(|c| c.name()))
.collect();
let mut overlaps = vec![];
for expr in &hash_exprs {
// TODO(ruihang): tracking aliases
if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
&& partition_cols.contains(col_expr.name())
&& all_partition_col_aliases.contains(col_expr.name())
{
overlaps.push(expr.clone());
}
}
if overlaps.is_empty() {
return None;
}

View File

@@ -177,7 +177,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
self.region_query_handler.clone(),
query_ctx,
session_state.config().target_partitions(),
merge_scan.partition_cols().to_vec(),
merge_scan.partition_cols().clone(),
)?;
Ok(Some(Arc::new(merge_scan_plan) as _))
}