fix: skip placeholder when partition columns (#7020)

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-09-25 15:01:49 +08:00
committed by GitHub
parent cff9cb6327
commit 9c8ff1d8a0
4 changed files with 51 additions and 2 deletions

View File

@@ -52,6 +52,9 @@ mod utils;
pub(crate) use utils::AliasMapping;
/// Placeholder for other physical partition columns that are not in logical table
const OTHER_PHY_PART_COL_PLACEHOLDER: &str = "__OTHER_PHYSICAL_PART_COLS_PLACEHOLDER__";
#[derive(Debug, Clone)]
pub struct DistPlannerOptions {
pub allow_query_fallback: bool,
@@ -530,12 +533,16 @@ impl PlanRewriter {
// as subset of phy part cols can still be used for certain optimization, and it works as if
// those columns are always null
// This helps with distinguishing between non-partitioned table and partitioned table with all phy part cols not in logical table
partition_cols.push("__OTHER_PHYSICAL_PART_COLS_PLACEHOLDER__".to_string());
partition_cols.push(OTHER_PHY_PART_COL_PLACEHOLDER.to_string());
}
self.partition_cols = Some(
partition_cols
.into_iter()
.map(|c| {
if c == OTHER_PHY_PART_COL_PLACEHOLDER {
// for placeholder, just return a empty alias
return Ok((c.clone(), BTreeSet::new()));
}
let index =
plan.schema().index_of_column_by_name(None, &c).ok_or_else(|| {
datafusion_common::DataFusionError::Internal(

View File

@@ -28,7 +28,7 @@ use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::MergeScanLogicalPlan;
use crate::dist_plan::analyzer::AliasMapping;
use crate::dist_plan::analyzer::{AliasMapping, OTHER_PHY_PART_COL_PLACEHOLDER};
/// FallbackPlanRewriter is a plan rewriter that will only push down table scan node
/// This is used when `PlanRewriter` produce errors when trying to rewrite the plan
@@ -70,6 +70,10 @@ impl TreeNodeRewriter for FallbackPlanRewriter {
Some(partition_cols
.into_iter()
.map(|c| {
if c == OTHER_PHY_PART_COL_PLACEHOLDER {
// for placeholder, just return a empty alias
return Ok((c.clone(), BTreeSet::new()));
}
let index =
plan.schema().index_of_column_by_name(None, &c).ok_or_else(|| {
datafusion_common::DataFusionError::Internal(

View File

@@ -386,6 +386,35 @@ select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
|_|_|
+-+-+
select * from logical_table_4;
+-----------------------+-----+------+-------------------+---------------------+
| another_partition_key | cpu | host | one_partition_key | ts |
+-----------------------+-----+------+-------------------+---------------------+
| | 1.0 | | | 2023-01-01T00:00:00 |
| | 2.0 | | | 2023-01-01T00:00:01 |
| | 3.0 | | | 2023-01-01T00:00:02 |
+-----------------------+-----+------+-------------------+---------------------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN select * from logical_table_4;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|
|_| Projection: logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts |
|_|_TableScan: logical_table_4_|
|_| ]]_|
| physical_plan | CooperativeExec_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
drop table logical_table_2;
Affected Rows: 0

View File

@@ -140,6 +140,15 @@ select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
EXPLAIN
select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
select * from logical_table_4;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN select * from logical_table_4;
drop table logical_table_2;
drop table logical_table_3;