From 85c1a91baeddcdfb8ee2226bd7205183fde3f78f Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 16 Sep 2025 21:27:35 +0800 Subject: [PATCH] feat: support `SubqueryAlias` pushdown (#6963) * wip enforce dist requirement rewriter Signed-off-by: discord9 * feat: enforce dist req Signed-off-by: discord9 * test: sqlness result Signed-off-by: discord9 * fix: double projection Signed-off-by: discord9 * test: fix sqlness Signed-off-by: discord9 * refactor: per review Signed-off-by: discord9 * docs: use btree map Signed-off-by: discord9 * test: sqlness explain&comment Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/query/src/dist_plan/analyzer.rs | 123 ++++- src/query/src/dist_plan/analyzer/test.rs | 124 ++++- src/query/src/dist_plan/analyzer/utils.rs | 352 ++++++------ src/query/src/dist_plan/commutativity.rs | 2 +- .../distributed/explain/subqueries.result | 521 +++++++++++++++++- .../cases/distributed/explain/subqueries.sql | 165 ++++++ 6 files changed, 1101 insertions(+), 186 deletions(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index dd929f1427..424500f3a1 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::sync::Arc; use common_telemetry::debug; @@ -32,11 +32,12 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; -use crate::dist_plan::analyzer::utils::aliased_columns_for; +use crate::dist_plan::analyzer::utils::{aliased_columns_for, rewrite_merge_sort_exprs}; use crate::dist_plan::commutativity::{ Categorizer, Commutativity, partial_commutative_transformer, }; use crate::dist_plan::merge_scan::MergeScanLogicalPlan; +use crate::dist_plan::merge_sort::MergeSortLogicalPlan; use crate::metrics::PUSH_DOWN_FALLBACK_ERRORS_TOTAL; use crate::plan::ExtractExpr; use crate::query_engine::DefaultSerializer; @@ -548,7 +549,7 @@ impl PlanRewriter { // add merge scan as the new root let mut node = MergeScanLogicalPlan::new( - on_node, + on_node.clone(), false, // at this stage, the partition cols should be set // treat it as non-partitioned if None @@ -558,6 +559,15 @@ impl PlanRewriter { // expand stages for new_stage in self.stage.drain(..) { + // tracking alias for merge sort's sort exprs + let new_stage = if let LogicalPlan::Extension(ext) = &new_stage + && let Some(merge_sort) = ext.node.as_any().downcast_ref::() + { + // TODO(discord9): change `on_node` to `node` once alias tracking is supported for merge scan + rewrite_merge_sort_exprs(merge_sort, &on_node)? + } else { + new_stage + }; node = new_stage .with_new_exprs(new_stage.expressions_consider_join(), vec![node.clone()])?; } @@ -599,6 +609,7 @@ struct EnforceDistRequirementRewriter { /// when on `Projection` node, we don't need to apply the column requirements of `Aggregate` node /// because the `Projection` node is not in the scope of the `Aggregate` node cur_level: usize, + plan_per_level: BTreeMap, } impl EnforceDistRequirementRewriter { @@ -606,8 +617,67 @@ impl EnforceDistRequirementRewriter { Self { column_requirements, cur_level, + plan_per_level: BTreeMap::new(), } } + + /// Return a mapping from (original column, level) to aliased columns in current node of all + /// applicable column requirements + /// i.e. only column requirements with level >= `cur_level` will be considered + fn get_current_applicable_column_requirements( + &self, + node: &LogicalPlan, + ) -> DfResult>> { + let col_req_per_level = self + .column_requirements + .iter() + .filter(|(_, level)| *level >= self.cur_level) + .collect::>(); + + // track alias for columns and use aliased columns instead + // aliased col reqs at current level + let mut result_alias_mapping = BTreeMap::new(); + let Some(child) = node.inputs().first().cloned() else { + return Ok(Default::default()); + }; + for (col_req, level) in col_req_per_level { + if let Some(original) = self.plan_per_level.get(level) { + // query for alias in current plan + let aliased_cols = + aliased_columns_for(&col_req.iter().cloned().collect(), node, Some(original))?; + for original_col in col_req { + let aliased_cols = aliased_cols.get(original_col).cloned(); + if let Some(cols) = aliased_cols + && !cols.is_empty() + { + result_alias_mapping.insert((original_col.clone(), *level), cols); + } else { + // if no aliased column found in current node, there should be alias in child node as promised by enforce col reqs + // because it should insert required columns in child node + // so we can find the alias in child node + // if not found, it's an internal error + let aliases_in_child = aliased_columns_for( + &[original_col.clone()].into(), + child, + Some(original), + )?; + let Some(aliases) = aliases_in_child + .get(original_col) + .cloned() + .filter(|a| !a.is_empty()) + else { + return Err(datafusion_common::DataFusionError::Internal(format!( + "EnforceDistRequirementRewriter: no alias found for required column {original_col} in child plan {child} from original plan {original}", + ))); + }; + + result_alias_mapping.insert((original_col.clone(), *level), aliases); + } + } + } + } + Ok(result_alias_mapping) + } } impl TreeNodeRewriter for EnforceDistRequirementRewriter { @@ -621,6 +691,7 @@ impl TreeNodeRewriter for EnforceDistRequirementRewriter { .to_string(), )); } + self.plan_per_level.insert(self.cur_level, node.clone()); self.cur_level += 1; Ok(Transformed::no(node)) } @@ -628,38 +699,41 @@ impl TreeNodeRewriter for EnforceDistRequirementRewriter { fn f_up(&mut self, node: Self::Node) -> DfResult> { self.cur_level -= 1; // first get all applicable column requirements - let mut applicable_column_requirements = self - .column_requirements - .iter() - .filter(|(_, level)| *level >= self.cur_level) - .map(|(cols, _)| cols.clone()) - .reduce(|mut acc, cols| { - acc.extend(cols); - acc - }) - .unwrap_or_default(); - - debug!( - "EnforceDistRequirementRewriter: applicable column requirements at level {} = {:?} for node {}", - self.cur_level, - applicable_column_requirements, - node.display() - ); // make sure all projection applicable scope has the required columns if let LogicalPlan::Projection(ref projection) = node { + let mut applicable_column_requirements = + self.get_current_applicable_column_requirements(&node)?; + + debug!( + "EnforceDistRequirementRewriter: applicable column requirements at level {} = {:?} for node {}", + self.cur_level, + applicable_column_requirements, + node.display() + ); + for expr in &projection.expr { let (qualifier, name) = expr.qualified_name(); let column = Column::new(qualifier, name); - applicable_column_requirements.remove(&column); + applicable_column_requirements.retain(|_col_level, alias_set| { + // remove all columns that are already in the projection exprs + !alias_set.contains(&column) + }); } if applicable_column_requirements.is_empty() { return Ok(Transformed::no(node)); } let mut new_exprs = projection.expr.clone(); - for col in &applicable_column_requirements { - new_exprs.push(Expr::Column(col.clone())); + for (col, alias_set) in &applicable_column_requirements { + // use the first alias in alias set as the column to add + new_exprs.push(Expr::Column(alias_set.first().cloned().ok_or_else( + || { + datafusion_common::DataFusionError::Internal( + format!("EnforceDistRequirementRewriter: alias set is empty, for column {col:?} in node {node}"), + ) + }, + )?)); } let new_node = node.with_new_exprs(new_exprs, node.inputs().into_iter().cloned().collect())?; @@ -668,6 +742,9 @@ impl TreeNodeRewriter for EnforceDistRequirementRewriter { applicable_column_requirements ); + // update plan for later use + self.plan_per_level.insert(self.cur_level, new_node.clone()); + // still need to continue for next projection if applicable return Ok(Transformed::yes(new_node)); } diff --git a/src/query/src/dist_plan/analyzer/test.rs b/src/query/src/dist_plan/analyzer/test.rs index 888cd48432..18195ed0e0 100644 --- a/src/query/src/dist_plan/analyzer/test.rs +++ b/src/query/src/dist_plan/analyzer/test.rs @@ -199,6 +199,53 @@ fn expand_proj_sort_proj() { assert_eq!(expected, result.to_string()); } +#[test] +fn expand_proj_sort_partial_proj() { + // use logging for better debugging + init_default_ut_logging(); + let test_table = TestTable::table_with_name(0, "t".to_string()); + let table_source = Arc::new(DefaultTableSource::new(Arc::new( + DfTableProviderAdapter::new(test_table), + ))); + let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![]) + .unwrap() + .project(vec![col("number"), col("pk1"), col("pk2"), col("pk3")]) + .unwrap() + .project(vec![ + col("number"), + col("pk1"), + col("pk3"), + col("pk1").eq(col("pk2")), + ]) + .unwrap() + .sort(vec![col("t.pk1 = t.pk2").sort(true, true)]) + .unwrap() + .project(vec![col("number"), col("t.pk1 = t.pk2").alias("eq_sorted")]) + .unwrap() + .project(vec![col("number")]) + .unwrap() + .build() + .unwrap(); + + let config = ConfigOptions::default(); + let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); + + let expected = [ + "Projection: t.number", + " MergeSort: eq_sorted ASC NULLS FIRST", // notice how `eq_sorted` is used here + " MergeScan [is_placeholder=false, remote_input=[", + "Projection: t.number, eq_sorted", // notice how `eq_sorted` is added not `t.pk1 = t.pk2` + " Projection: t.number, t.pk1 = t.pk2 AS eq_sorted", + " Sort: t.pk1 = t.pk2 ASC NULLS FIRST", + " Projection: t.number, t.pk1, t.pk3, t.pk1 = t.pk2", + " Projection: t.number, t.pk1, t.pk2, t.pk3", // notice this projection doesn't add `t.pk1 = t.pk2` column requirement + " TableScan: t", + "]]", + ] + .join("\n"); + assert_eq!(expected, result.to_string()); +} + #[test] fn expand_sort_limit() { // use logging for better debugging @@ -233,6 +280,8 @@ fn expand_sort_limit() { assert_eq!(expected, result.to_string()); } +/// Test merge sort can apply enforce dist requirement columns correctly and use the aliased column correctly, as there is +/// a aliased sort column, there is no need to add a duplicate sort column using it's original column name #[test] fn expand_sort_alias_limit() { // use logging for better debugging @@ -258,10 +307,10 @@ fn expand_sort_alias_limit() { let expected = [ "Projection: something", " Limit: skip=0, fetch=10", - " MergeSort: t.pk1 ASC NULLS LAST", + " MergeSort: something ASC NULLS LAST", " MergeScan [is_placeholder=false, remote_input=[", "Limit: skip=0, fetch=10", - " Projection: t.pk1 AS something, t.pk1", + " Projection: t.pk1 AS something", " Sort: t.pk1 ASC NULLS LAST", " TableScan: t", "]]", @@ -1332,10 +1381,73 @@ fn transform_unalighed_join_with_alias() { " MergeScan [is_placeholder=false, remote_input=[", "TableScan: t", "]]", - " SubqueryAlias: right", - " Projection: t.number", - " MergeScan [is_placeholder=false, remote_input=[", - "TableScan: t", + " Projection: right.number", + " MergeScan [is_placeholder=false, remote_input=[", + "SubqueryAlias: right", + " TableScan: t", + "]]", + ] + .join("\n"); + assert_eq!(expected, result.to_string()); +} + +#[test] +fn transform_subquery_sort_alias() { + init_default_ut_logging(); + + let test_table = TestTable::table_with_name(0, "numbers".to_string()); + let table_source = Arc::new(DefaultTableSource::new(Arc::new( + DfTableProviderAdapter::new(test_table), + ))); + + let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![]) + .unwrap() + .alias("a") + .unwrap() + .sort(vec![col("a.number").sort(true, false)]) + .unwrap() + .build() + .unwrap(); + let config = ConfigOptions::default(); + let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); + let expected = [ + "Projection: a.pk1, a.pk2, a.pk3, a.ts, a.number", + " MergeSort: a.number ASC NULLS LAST", + " MergeScan [is_placeholder=false, remote_input=[", + "Sort: a.number ASC NULLS LAST", + " SubqueryAlias: a", + " TableScan: t", + "]]", + ] + .join("\n"); + assert_eq!(expected, result.to_string()); +} + +#[test] +fn transform_sort_subquery_alias() { + init_default_ut_logging(); + let test_table = TestTable::table_with_name(0, "numbers".to_string()); + let table_source = Arc::new(DefaultTableSource::new(Arc::new( + DfTableProviderAdapter::new(test_table), + ))); + + let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![]) + .unwrap() + .sort(vec![col("t.number").sort(true, false)]) + .unwrap() + .alias("a") + .unwrap() + .build() + .unwrap(); + let config = ConfigOptions::default(); + let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); + let expected = [ + "Projection: a.pk1, a.pk2, a.pk3, a.ts, a.number", + " MergeSort: a.number ASC NULLS LAST", + " MergeScan [is_placeholder=false, remote_input=[", + "SubqueryAlias: a", + " Sort: t.number ASC NULLS LAST", + " TableScan: t", "]]", ] .join("\n"); diff --git a/src/query/src/dist_plan/analyzer/utils.rs b/src/query/src/dist_plan/analyzer/utils.rs index 652e7a34e1..8605f9ce26 100644 --- a/src/query/src/dist_plan/analyzer/utils.rs +++ b/src/query/src/dist_plan/analyzer/utils.rs @@ -17,45 +17,117 @@ use std::sync::Arc; use datafusion::error::Result as DfResult; use datafusion_common::Column; +use datafusion_common::tree_node::{Transformed, TreeNode as _}; use datafusion_expr::expr::Alias; -use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_expr::{Expr, Extension, LogicalPlan}; + +use crate::dist_plan::merge_sort::MergeSortLogicalPlan; +use crate::plan::ExtractExpr as _; + +fn rewrite_column( + mapping: &BTreeMap>, + original_node: &LogicalPlan, + alias_node: &LogicalPlan, +) -> impl Fn(Expr) -> DfResult> { + move |e: Expr| { + if let Expr::Column(col) = e { + if let Some(aliased_cols) = mapping.get(&col) { + // if multiple alias is available, just use first one + if let Some(aliased_col) = aliased_cols.iter().next() { + Ok(Transformed::yes(Expr::Column(aliased_col.clone()))) + } else { + Err(datafusion_common::DataFusionError::Internal(format!( + "PlanRewriter: expand: column {col} from {original_node}\n has empty alias set in plan: {alias_node}\n but expect at least one alias", + ))) + } + } else { + Err(datafusion_common::DataFusionError::Internal(format!( + "PlanRewriter: expand: column {col} from {original_node}\n has no alias in plan: {alias_node}", + ))) + } + } else { + Ok(Transformed::no(e)) + } + } +} + +/// Rewrite the expressions of the given merge sort plan from original columns(at merge sort's input plan) to aliased columns at the given aliased node +pub fn rewrite_merge_sort_exprs( + merge_sort: &MergeSortLogicalPlan, + aliased_node: &LogicalPlan, +) -> DfResult { + let merge_sort = LogicalPlan::Extension(Extension { + node: Arc::new(merge_sort.clone()), + }); + + // tracking alias for sort exprs, + let sort_input = merge_sort.inputs().first().cloned().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "PlanRewriter: expand: merge sort stage has no input: {merge_sort}" + )) + })?; + let sort_exprs = merge_sort.expressions_consider_join(); + let column_refs = sort_exprs + .iter() + .flat_map(|e| e.column_refs().into_iter().cloned()) + .collect::>(); + let column_alias_mapping = aliased_columns_for(&column_refs, aliased_node, Some(sort_input))?; + let aliased_sort_exprs = sort_exprs + .into_iter() + .map(|e| { + e.transform(rewrite_column( + &column_alias_mapping, + &merge_sort, + aliased_node, + )) + }) + .map(|e| e.map(|e| e.data)) + .collect::>>()?; + let new_merge_sort = merge_sort.with_new_exprs( + aliased_sort_exprs, + merge_sort.inputs().into_iter().cloned().collect(), + )?; + Ok(new_merge_sort) +} /// Return all the original columns(at original node) for the given aliased columns at the aliased node /// /// if `original_node` is None, it means original columns are from leaf node +/// +/// Return value use `BTreeMap` to have deterministic order for choose first alias when multiple alias exist #[allow(unused)] pub fn original_column_for( - aliased_columns: &HashSet, + aliased_columns: &BTreeSet, aliased_node: LogicalPlan, original_node: Option>, -) -> DfResult> { - let schema_cols: HashSet = aliased_node.schema().columns().iter().cloned().collect(); - let cur_aliases: HashMap = aliased_columns +) -> DfResult> { + let schema_cols: BTreeSet = aliased_node.schema().columns().iter().cloned().collect(); + let cur_aliases: BTreeMap = aliased_columns .iter() .filter(|c| schema_cols.contains(c)) .map(|c| (c.clone(), c.clone())) .collect(); if cur_aliases.is_empty() { - return Ok(HashMap::new()); + return Ok(BTreeMap::new()); } original_column_for_inner(cur_aliases, &aliased_node, &original_node) } fn original_column_for_inner( - mut cur_aliases: HashMap, + mut cur_aliases: BTreeMap, node: &LogicalPlan, original_node: &Option>, -) -> DfResult> { +) -> DfResult> { let mut current_node = node; loop { // Base case: check if we've reached the target node - if let Some(original_node) = original_node { - if *current_node == **original_node { - return Ok(cur_aliases); - } + if let Some(original_node) = original_node + && *current_node == **original_node + { + return Ok(cur_aliases); } else if current_node.inputs().is_empty() { // leaf node reached return Ok(cur_aliases); @@ -63,14 +135,15 @@ fn original_column_for_inner( // Validate node has exactly one child if current_node.inputs().len() != 1 { - return Err(datafusion::error::DataFusionError::Internal( - "only accept plan with at most one child".to_string(), - )); + return Err(datafusion::error::DataFusionError::Internal(format!( + "only accept plan with at most one child, found: {}", + current_node + ))); } // Get alias layer and update aliases let layer = get_alias_layer_from_node(current_node)?; - let mut new_aliases = HashMap::new(); + let mut new_aliases = BTreeMap::new(); for (start_alias, cur_alias) in cur_aliases { if let Some(old_column) = layer.get_old_from_new(cur_alias.clone()) { new_aliases.insert(start_alias, old_column); @@ -86,39 +159,41 @@ fn original_column_for_inner( /// Return all the aliased columns(at aliased node) for the given original columns(at original node) /// /// if `original_node` is None, it means original columns are from leaf node +/// +/// Return value use `BTreeMap` to have deterministic order for choose first alias when multiple alias exist pub fn aliased_columns_for( - original_columns: &HashSet, + original_columns: &BTreeSet, aliased_node: &LogicalPlan, original_node: Option<&LogicalPlan>, -) -> DfResult>> { - let initial_aliases: HashMap> = { +) -> DfResult>> { + let initial_aliases: BTreeMap> = { if let Some(original) = &original_node { - let schema_cols: HashSet = original.schema().columns().into_iter().collect(); + let schema_cols: BTreeSet = original.schema().columns().into_iter().collect(); original_columns .iter() .filter(|c| schema_cols.contains(c)) - .map(|c| (c.clone(), HashSet::from([c.clone()]))) + .map(|c| (c.clone(), [c.clone()].into())) .collect() } else { original_columns .iter() - .map(|c| (c.clone(), HashSet::from([c.clone()]))) + .map(|c| (c.clone(), [c.clone()].into())) .collect() } }; if initial_aliases.is_empty() { - return Ok(HashMap::new()); + return Ok(BTreeMap::new()); } aliased_columns_for_inner(initial_aliases, aliased_node, original_node) } fn aliased_columns_for_inner( - cur_aliases: HashMap>, + cur_aliases: BTreeMap>, node: &LogicalPlan, original_node: Option<&LogicalPlan>, -) -> DfResult>> { +) -> DfResult>> { // First, collect the path from current node to the target node let mut path = Vec::new(); let mut current_node = node; @@ -126,10 +201,10 @@ fn aliased_columns_for_inner( // Descend to the target node, collecting nodes along the way loop { // Base case: check if we've reached the target node - if let Some(original_node) = original_node { - if *current_node == *original_node { - break; - } + if let Some(original_node) = original_node + && *current_node == *original_node + { + break; } else if current_node.inputs().is_empty() { // leaf node reached break; @@ -137,9 +212,10 @@ fn aliased_columns_for_inner( // Validate node has exactly one child if current_node.inputs().len() != 1 { - return Err(datafusion::error::DataFusionError::Internal( - "only accept plan with at most one child".to_string(), - )); + return Err(datafusion::error::DataFusionError::Internal(format!( + "only accept plan with at most one child, found: {}", + current_node + ))); } // Add current node to path and move to child @@ -151,9 +227,9 @@ fn aliased_columns_for_inner( let mut result = cur_aliases; for &node_in_path in path.iter().rev() { let layer = get_alias_layer_from_node(node_in_path)?; - let mut new_aliases = HashMap::new(); + let mut new_aliases = BTreeMap::new(); for (original_column, cur_alias_set) in result { - let mut new_alias_set = HashSet::new(); + let mut new_alias_set = BTreeSet::new(); for cur_alias in cur_alias_set { new_alias_set.extend(layer.get_new_from_old(cur_alias.clone())); } @@ -168,6 +244,7 @@ fn aliased_columns_for_inner( } /// Return a mapping of original column to all the aliased columns in current node of the plan +/// TODO(discord9): also support merge scan node fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult { match node { LogicalPlan::Projection(proj) => Ok(get_alias_layer_from_exprs(&proj.expr)), @@ -181,7 +258,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult { old_column.name().to_string(), ); // mapping from old_column to new_column - layer.insert_alias(old_column, HashSet::from([new_column])); + layer.insert_alias(old_column, [new_column].into()); } Ok(layer) } @@ -189,7 +266,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult { let columns = scan.projected_schema.columns(); let mut layer = AliasLayer::default(); for col in columns { - layer.insert_alias(col.clone(), HashSet::from([col.clone()])); + layer.insert_alias(col.clone(), [col.clone()].into()); } Ok(layer) } @@ -198,9 +275,10 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult { .inputs() .first() .ok_or_else(|| { - datafusion::error::DataFusionError::Internal( - "only accept plan with at most one child".to_string(), - ) + datafusion::error::DataFusionError::Internal(format!( + "only accept plan with at most one child, found: {}", + node + )) })? .schema(); let output_schema = node.schema(); @@ -221,7 +299,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult { // all input is in output, so it's just adding some columns, we can do identity mapping for input columns let mut layer = AliasLayer::default(); for col in input_columns { - layer.insert_alias(col.clone(), HashSet::from([col.clone()])); + layer.insert_alias(col.clone(), [col.clone()].into()); } Ok(layer) } else { @@ -242,7 +320,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult { let mut layer = AliasLayer::default(); for col in &common_columns { - layer.insert_alias(col.clone(), HashSet::from([col.clone()])); + layer.insert_alias(col.clone(), [col.clone()].into()); } Ok(layer) } @@ -250,7 +328,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult { // identity mapping let mut layer = AliasLayer::default(); for col in output_schema.columns() { - layer.insert_alias(col.clone(), HashSet::from([col.clone()])); + layer.insert_alias(col.clone(), [col.clone()].into()); } Ok(layer) } @@ -403,71 +481,52 @@ mod tests { let child = plan.inputs()[0].clone(); assert_eq!( - aliased_columns_for( - &HashSet::from([qcol("pk1"), qcol("pk2")]), - &plan, - Some(&child) - ) - .unwrap(), - HashMap::from([ - (qcol("pk1"), HashSet::from([qcol("pk5")])), - (qcol("pk2"), HashSet::from([qcol("pk4")])) - ]) + aliased_columns_for(&[qcol("pk1"), qcol("pk2")].into(), &plan, Some(&child)).unwrap(), + [ + (qcol("pk1"), [qcol("pk5")].into()), + (qcol("pk2"), [qcol("pk4")].into()) + ] + .into() ); // columns not in the plan should return empty mapping assert_eq!( - aliased_columns_for( - &HashSet::from([qcol("pk1"), qcol("pk2")]), - &plan, - Some(&plan) - ) - .unwrap(), - HashMap::from([]) + aliased_columns_for(&[qcol("pk1"), qcol("pk2")].into(), &plan, Some(&plan)).unwrap(), + [].into() ); assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.pk3")]), &plan, Some(&child)).unwrap(), - HashMap::from([]) + aliased_columns_for(&[qcol("t.pk3")].into(), &plan, Some(&child)).unwrap(), + [].into() + ); + + assert_eq!( + original_column_for(&[qcol("pk5"), qcol("pk4")].into(), plan.clone(), None).unwrap(), + [(qcol("pk5"), qcol("t.pk3")), (qcol("pk4"), qcol("t.pk3"))].into() + ); + + assert_eq!( + aliased_columns_for(&[qcol("pk3")].into(), &plan, None).unwrap(), + [(qcol("pk3"), [qcol("pk5"), qcol("pk4")].into())].into() + ); + assert_eq!( + original_column_for(&[qcol("pk1"), qcol("pk2")].into(), child.clone(), None).unwrap(), + [(qcol("pk1"), qcol("t.pk3")), (qcol("pk2"), qcol("t.pk3"))].into() + ); + + assert_eq!( + aliased_columns_for(&[qcol("pk3")].into(), &child, None).unwrap(), + [(qcol("pk3"), [qcol("pk1"), qcol("pk2")].into())].into() ); assert_eq!( original_column_for( - &HashSet::from([qcol("pk5"), qcol("pk4")]), - plan.clone(), - None - ) - .unwrap(), - HashMap::from([(qcol("pk5"), qcol("t.pk3")), (qcol("pk4"), qcol("t.pk3"))]) - ); - - assert_eq!( - aliased_columns_for(&HashSet::from([qcol("pk3")]), &plan, None).unwrap(), - HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk5"), qcol("pk4")]))]) - ); - assert_eq!( - original_column_for( - &HashSet::from([qcol("pk1"), qcol("pk2")]), - child.clone(), - None - ) - .unwrap(), - HashMap::from([(qcol("pk1"), qcol("t.pk3")), (qcol("pk2"), qcol("t.pk3"))]) - ); - - assert_eq!( - aliased_columns_for(&HashSet::from([qcol("pk3")]), &child, None).unwrap(), - HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk1"), qcol("pk2")]))]) - ); - - assert_eq!( - original_column_for( - &HashSet::from([qcol("pk4"), qcol("pk5")]), + &[qcol("pk4"), qcol("pk5")].into(), plan.clone(), Some(Arc::new(child.clone())) ) .unwrap(), - HashMap::from([(qcol("pk4"), qcol("pk2")), (qcol("pk5"), qcol("pk1"))]) + [(qcol("pk4"), qcol("pk2")), (qcol("pk5"), qcol("pk1"))].into() ); } @@ -492,44 +551,42 @@ mod tests { // Test aliased_columns_for from scan to final plan assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, Some(&scan_plan)) - .unwrap(), - HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))]) + aliased_columns_for(&[qcol("t.number")].into(), &plan, Some(&scan_plan)).unwrap(), + [(qcol("t.number"), [qcol("a.number")].into())].into() ); // Test aliased_columns_for from sort to final plan assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, Some(&sort_plan)) - .unwrap(), - HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))]) + aliased_columns_for(&[qcol("t.number")].into(), &plan, Some(&sort_plan)).unwrap(), + [(qcol("t.number"), [qcol("a.number")].into())].into() ); // Test aliased_columns_for from leaf to final plan assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, None).unwrap(), - HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))]) + aliased_columns_for(&[qcol("t.number")].into(), &plan, None).unwrap(), + [(qcol("t.number"), [qcol("a.number")].into())].into() ); // Test original_column_for from final plan to scan assert_eq!( original_column_for( - &HashSet::from([qcol("a.number")]), + &[qcol("a.number")].into(), plan.clone(), Some(Arc::new(scan_plan.clone())) ) .unwrap(), - HashMap::from([(qcol("a.number"), qcol("t.number"))]) + [(qcol("a.number"), qcol("t.number"))].into() ); // Test original_column_for from final plan to sort assert_eq!( original_column_for( - &HashSet::from([qcol("a.number")]), + &[qcol("a.number")].into(), plan.clone(), Some(Arc::new(sort_plan.clone())) ) .unwrap(), - HashMap::from([(qcol("a.number"), qcol("t.number"))]) + [(qcol("a.number"), qcol("t.number"))].into() ); } @@ -564,63 +621,58 @@ mod tests { // Test original_column_for from final plan to scan assert_eq!( original_column_for( - &HashSet::from([qcol("pk1")]), + &[qcol("pk1")].into(), plan.clone(), Some(Arc::new(scan_plan.clone())) ) .unwrap(), - HashMap::from([(qcol("pk1"), qcol("t.pk2"))]) + [(qcol("pk1"), qcol("t.pk2"))].into() ); // Test original_column_for from final plan to first projection assert_eq!( original_column_for( - &HashSet::from([qcol("pk1")]), + &[qcol("pk1")].into(), plan.clone(), Some(Arc::new(first_proj.clone())) ) .unwrap(), - HashMap::from([(qcol("pk1"), qcol("pk3"))]) + [(qcol("pk1"), qcol("pk3"))].into() ); // Test original_column_for from final plan to leaf assert_eq!( original_column_for( - &HashSet::from([qcol("pk1")]), + &[qcol("pk1")].into(), plan.clone(), Some(Arc::new(plan.clone())) ) .unwrap(), - HashMap::from([(qcol("pk1"), qcol("pk1"))]) + [(qcol("pk1"), qcol("pk1"))].into() ); // Test aliased_columns_for from scan to first projection assert_eq!( - aliased_columns_for( - &HashSet::from([qcol("t.pk2")]), - &first_proj, - Some(&scan_plan) - ) - .unwrap(), - HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("pk3")]))]) + aliased_columns_for(&[qcol("t.pk2")].into(), &first_proj, Some(&scan_plan)).unwrap(), + [(qcol("t.pk2"), [qcol("pk3")].into())].into() ); // Test aliased_columns_for from first projection to final plan assert_eq!( - aliased_columns_for(&HashSet::from([qcol("pk3")]), &plan, Some(&first_proj)).unwrap(), - HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk1")]))]) + aliased_columns_for(&[qcol("pk3")].into(), &plan, Some(&first_proj)).unwrap(), + [(qcol("pk3"), [qcol("pk1")].into())].into() ); // Test aliased_columns_for from scan to final plan assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.pk2")]), &plan, Some(&scan_plan)).unwrap(), - HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("pk1")]))]) + aliased_columns_for(&[qcol("t.pk2")].into(), &plan, Some(&scan_plan)).unwrap(), + [(qcol("t.pk2"), [qcol("pk1")].into())].into() ); // Test aliased_columns_for from leaf to final plan assert_eq!( - aliased_columns_for(&HashSet::from([qcol("pk2")]), &plan, None).unwrap(), - HashMap::from([(qcol("pk2"), HashSet::from([qcol("pk1")]))]) + aliased_columns_for(&[qcol("pk2")].into(), &plan, None).unwrap(), + [(qcol("pk2"), [qcol("pk1")].into())].into() ); } @@ -647,8 +699,8 @@ mod tests { // Test aliased_columns_for from scan to projection assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.pk2")]), &plan, Some(&scan_plan)).unwrap(), - HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("a.pk1")]))]) + aliased_columns_for(&[qcol("t.pk2")].into(), &plan, Some(&scan_plan)).unwrap(), + [(qcol("t.pk2"), [qcol("a.pk1")].into())].into() ); } @@ -686,19 +738,14 @@ mod tests { // Test aliased_columns_for from scan to final plan assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&scan_plan)).unwrap(), - HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("pk42")]))]) + aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&scan_plan)).unwrap(), + [(qcol("t.pk1"), [qcol("pk42")].into())].into() ); // Test aliased_columns_for from scan to first projection assert_eq!( - aliased_columns_for( - &HashSet::from([Column::from_name("pk1")]), - &first_proj, - None - ) - .unwrap(), - HashMap::from([(Column::from_name("pk1"), HashSet::from([qcol("pk3")]))]) + aliased_columns_for(&[Column::from_name("pk1")].into(), &first_proj, None).unwrap(), + [(Column::from_name("pk1"), [qcol("pk3")].into())].into() ); } @@ -728,31 +775,26 @@ mod tests { // Test aliased_columns_for from scan to final plan (identity mapping for aggregates) assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&scan_plan)).unwrap(), - HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))]) + aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&scan_plan)).unwrap(), + [(qcol("t.pk1"), [qcol("t.pk1")].into())].into() ); // Test aliased_columns_for from scan to first aggregate assert_eq!( - aliased_columns_for( - &HashSet::from([qcol("t.pk1")]), - &first_aggr, - Some(&scan_plan) - ) - .unwrap(), - HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))]) + aliased_columns_for(&[qcol("t.pk1")].into(), &first_aggr, Some(&scan_plan)).unwrap(), + [(qcol("t.pk1"), [qcol("t.pk1")].into())].into() ); // Test aliased_columns_for from first aggregate to final plan assert_eq!( - aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&first_aggr)).unwrap(), - HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))]) + aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&first_aggr)).unwrap(), + [(qcol("t.pk1"), [qcol("t.pk1")].into())].into() ); // Test aliased_columns_for from leaf to final plan assert_eq!( - aliased_columns_for(&HashSet::from([Column::from_name("pk1")]), &plan, None).unwrap(), - HashMap::from([(Column::from_name("pk1"), HashSet::from([qcol("t.pk1")]))]) + aliased_columns_for(&[Column::from_name("pk1")].into(), &plan, None).unwrap(), + [(Column::from_name("pk1"), [qcol("t.pk1")].into())].into() ); } @@ -788,29 +830,31 @@ mod tests { // Test original_column_for from projection to second aggregate for aggr gen column assert_eq!( original_column_for( - &HashSet::from([Column::from_name("min_max_number")]), + &[Column::from_name("min_max_number")].into(), plan.clone(), Some(Arc::new(second_aggr.clone())) ) .unwrap(), - HashMap::from([( + [( Column::from_name("min_max_number"), Column::from_name("min(max(t.number))") - )]) + )] + .into() ); // Test aliased_columns_for from second aggregate to projection assert_eq!( aliased_columns_for( - &HashSet::from([Column::from_name("min(max(t.number))")]), + &[Column::from_name("min(max(t.number))")].into(), &plan, Some(&second_aggr) ) .unwrap(), - HashMap::from([( + [( Column::from_name("min(max(t.number))"), - HashSet::from([Column::from_name("min_max_number")]) - )]) + [Column::from_name("min_max_number")].into() + )] + .into() ); } } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index b44d12d101..9abbdcd2dd 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -187,7 +187,7 @@ impl Categorizer { LogicalPlan::TableScan(_) => Commutativity::Commutative, LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative, LogicalPlan::Subquery(_) => Commutativity::Unimplemented, - LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented, + LogicalPlan::SubqueryAlias(_) => Commutativity::Commutative, LogicalPlan::Limit(limit) => { // Only execute `fetch` on remote nodes. // wait for https://github.com/apache/arrow-datafusion/pull/7669 diff --git a/tests/cases/distributed/explain/subqueries.result b/tests/cases/distributed/explain/subqueries.result index be5fc95f97..3f40eb0c24 100644 --- a/tests/cases/distributed/explain/subqueries.result +++ b/tests/cases/distributed/explain/subqueries.result @@ -49,9 +49,9 @@ EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1 +-+-+ | logical_plan_| Sort: i1.i ASC NULLS LAST_| |_|_LeftSemi Join: i1.i = __correlated_sq_1.i_| -|_|_SubqueryAlias: i1_| |_|_MergeScan [is_placeholder=false, remote_input=[_| -|_| TableScan: integers_| +|_| SubqueryAlias: i1_| +|_|_TableScan: integers_| |_| ]]_| |_|_SubqueryAlias: __correlated_sq_1_| |_|_Projection: integers.i_| @@ -155,3 +155,520 @@ drop table integers; Affected Rows: 0 +CREATE TABLE integers(i INTEGER, j TIMESTAMP TIME INDEX) +PARTITION ON COLUMNS (i) ( + i < 1000, + i >= 1000 AND i < 2000, + i >= 2000 +); + +Affected Rows: 0 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1.i) ORDER BY i1.i; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: i1.i ASC NULLS LAST_| +|_|_LeftSemi Join: i1.i = __correlated_sq_1.i_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| SubqueryAlias: i1_| +|_|_TableScan: integers_| +|_| ]]_| +|_|_SubqueryAlias: __correlated_sq_1_| +|_|_Projection: integers.i_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| TableScan: integers_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [i@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[i@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_ProjectionExec: expr=[i@0 as i]_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT count(i) FROM integers WHERE i=i1.i) ORDER BY i1.i; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: i1.i ASC NULLS LAST_| +|_|_LeftSemi Join: i1.i = __correlated_sq_1.i_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| SubqueryAlias: i1_| +|_|_TableScan: integers_| +|_| ]]_| +|_|_SubqueryAlias: __correlated_sq_1_| +|_|_Aggregate: groupBy=[[integers.i]], aggr=[[]]_| +|_|_Projection: integers.i_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| TableScan: integers_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [i@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[i@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_AggregateExec: mode=SinglePartitioned, gby=[i@0 as i], aggr=[]_| +|_|_ProjectionExec: expr=[i@0 as i]_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +DROP TABLE integers; + +Affected Rows: 0 + +CREATE TABLE t(ts timestamp time index, a INT, b INT)PARTITION ON COLUMNS (a) ( + a < 1000, + a >= 1000 AND a < 2000, + a >= 2000 +); + +Affected Rows: 0 + +CREATE TABLE t1(ts timestamp time index, a INT)PARTITION ON COLUMNS (a) ( + a < 1000, + a >= 1000 AND a < 2000, + a >= 2000 +); + +Affected Rows: 0 + +CREATE TABLE t2(ts timestamp time index, a INT)PARTITION ON COLUMNS (a) ( + a < 1000, + a >= 1000 AND a < 2000, + a >= 2000 +); + +Affected Rows: 0 + +INSERT INTO t(ts,a,b) VALUES (1,3,30),(2,1,10),(3,2,20); + +Affected Rows: 3 + +INSERT INTO t1(ts,a) VALUES (1,1),(2,3); + +Affected Rows: 2 + +INSERT INTO t2(ts,a) VALUES (1,2),(2,3); + +Affected Rows: 2 + +SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x; + ++---+ +| x | ++---+ +| 1 | +| 2 | +| 3 | ++---+ + +-- expected: 1,2,3 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| MergeSort: sq.x ASC NULLS LAST_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Sort: sq.x ASC NULLS LAST_| +|_|_Projection: sq.x_| +|_|_SubqueryAlias: sq_| +|_|_Projection: t.a AS x_| +|_|_TableScan: t_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_| +|_|_CooperativeExec_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +SELECT x, COUNT(*) AS c FROM (SELECT a AS x FROM t) sq GROUP BY x ORDER BY x; + ++---+---+ +| x | c | ++---+---+ +| 1 | 1 | +| 2 | 1 | +| 3 | 1 | ++---+---+ + +-- expected: +-- x | c +-- 1 | 1 +-- 2 | 1 +-- 3 | 1 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x, COUNT(*) AS c FROM (SELECT a AS x FROM t) sq GROUP BY x ORDER BY x; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| MergeSort: sq.x ASC NULLS LAST_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Sort: sq.x ASC NULLS LAST_| +|_|_Projection: sq.x, count(Int64(1)) AS count(*) AS c_| +|_|_Aggregate: groupBy=[[sq.x]], aggr=[[count(Int64(1))]]_| +|_|_SubqueryAlias: sq_| +|_|_Projection: t.a AS x_| +|_|_TableScan: t_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_| +|_|_CooperativeExec_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +SELECT DISTINCT x FROM (SELECT a AS x FROM t) sq ORDER BY x; + ++---+ +| x | ++---+ +| 1 | +| 2 | +| 3 | ++---+ + +-- expecetd: 1,2,3 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT DISTINCT x FROM (SELECT a AS x FROM t) sq ORDER BY x; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: sq.x ASC NULLS LAST_| +|_|_Aggregate: groupBy=[[sq.x]], aggr=[[]]_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Projection: sq.x_| +|_|_SubqueryAlias: sq_| +|_|_Projection: t.a AS x_| +|_|_TableScan: t_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_AggregateExec: mode=SinglePartitioned, gby=[x@0 as x], aggr=[]_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +SELECT sq.x FROM (SELECT a AS x FROM t) sq ORDER BY sq.x; + ++---+ +| x | ++---+ +| 1 | +| 2 | +| 3 | ++---+ + +-- expected: 1,2,3 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT sq.x FROM (SELECT a AS x FROM t) sq ORDER BY sq.x; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| MergeSort: sq.x ASC NULLS LAST_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Sort: sq.x ASC NULLS LAST_| +|_|_Projection: sq.x_| +|_|_SubqueryAlias: sq_| +|_|_Projection: t.a AS x_| +|_|_TableScan: t_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_| +|_|_CooperativeExec_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +SELECT y FROM (SELECT x AS y FROM (SELECT a AS x FROM t) sq1) sq2 ORDER BY y; + ++---+ +| y | ++---+ +| 1 | +| 2 | +| 3 | ++---+ + +-- expected: 1,2,3 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT y FROM (SELECT x AS y FROM (SELECT a AS x FROM t) sq1) sq2 ORDER BY y; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| MergeSort: sq2.y ASC NULLS LAST_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Sort: sq2.y ASC NULLS LAST_| +|_|_Projection: sq2.y_| +|_|_SubqueryAlias: sq2_| +|_|_Projection: sq1.x AS y_| +|_|_SubqueryAlias: sq1_| +|_|_Projection: t.a AS x_| +|_|_TableScan: t_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [y@0 ASC NULLS LAST]_| +|_|_CooperativeExec_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +SELECT x, x + 1 AS y FROM (SELECT a AS x FROM t) sq ORDER BY x; + ++---+---+ +| x | y | ++---+---+ +| 1 | 2 | +| 2 | 3 | +| 3 | 4 | ++---+---+ + +-- expected: +-- (x,y) +-- (1,2) +-- (2,3) +-- (3,4) +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x, x + 1 AS y FROM (SELECT a AS x FROM t) sq ORDER BY x; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| MergeSort: sq.x ASC NULLS LAST_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Sort: sq.x ASC NULLS LAST_| +|_|_Projection: sq.x, CAST(sq.x AS Int64) + Int64(1) AS y_| +|_|_SubqueryAlias: sq_| +|_|_Projection: t.a AS x_| +|_|_TableScan: t_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_| +|_|_CooperativeExec_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +SELECT a FROM ((SELECT a FROM t1) UNION ALL (SELECT a FROM t2)) u ORDER BY a; + ++---+ +| a | ++---+ +| 1 | +| 2 | +| 3 | +| 3 | ++---+ + +-- expected: 1,2,3,3 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT a FROM ((SELECT a FROM t1) UNION ALL (SELECT a FROM t2)) u ORDER BY a; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: u.a ASC NULLS LAST_| +|_|_SubqueryAlias: u_| +|_|_Union_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Projection: t1.a_| +|_|_TableScan: t1_| +|_| ]]_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Projection: t2.a_| +|_|_TableScan: t2_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_InterleaveExec_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +SELECT u1.a +FROM (SELECT a FROM t1) u1 +JOIN (SELECT a FROM t2) u2 ON u1.a = u2.a +ORDER BY u1.a; + ++---+ +| a | ++---+ +| 3 | ++---+ + +-- expected: 3 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT u1.a +FROM (SELECT a FROM t1) u1 +JOIN (SELECT a FROM t2) u2 ON u1.a = u2.a +ORDER BY u1.a; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: u1.a ASC NULLS LAST_| +|_|_Projection: u1.a_| +|_|_Inner Join: u1.a = u2.a_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| SubqueryAlias: u1_| +|_|_Projection: t1.a_| +|_|_TableScan: t1_| +|_| ]]_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| SubqueryAlias: u2_| +|_|_Projection: t2.a_| +|_|_TableScan: t2_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_REDACTED +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +SELECT x FROM (VALUES (2),(1)) v(x) ORDER BY x; + ++---+ +| x | ++---+ +| 1 | +| 2 | ++---+ + +-- expected: 1,2 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x FROM (VALUES (2),(1)) v(x) ORDER BY x; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: v.x ASC NULLS LAST_| +|_|_SubqueryAlias: v_| +|_|_Projection: column1 AS x_| +|_|_Values: (Int64(2)), (Int64(1))_| +| physical_plan | SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false] | +|_|_ProjectionExec: expr=[column1@0 as x]_| +|_|_DataSourceExec: partitions=1, partition_sizes=[1]_| +|_|_| ++-+-+ + +SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x LIMIT 2; + ++---+ +| x | ++---+ +| 1 | +| 2 | ++---+ + +-- expected: 1,2 +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x LIMIT 2; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Limit: skip=0, fetch=2_| +|_|_MergeSort: sq.x ASC NULLS LAST_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Limit: skip=0, fetch=2_| +|_|_Sort: sq.x ASC NULLS LAST_| +|_|_Projection: sq.x_| +|_|_SubqueryAlias: sq_| +|_|_Projection: t.a AS x_| +|_|_TableScan: t_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST], fetch=2_| +|_|_CooperativeExec_| +|_|_SortExec: TopK(fetch=2), expr=[x@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_CooperativeExec_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +DROP TABLE t; + +Affected Rows: 0 + +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE t2; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/explain/subqueries.sql b/tests/cases/distributed/explain/subqueries.sql index 0d1c6ff7af..1935546f25 100644 --- a/tests/cases/distributed/explain/subqueries.sql +++ b/tests/cases/distributed/explain/subqueries.sql @@ -37,3 +37,168 @@ EXPLAIN INSERT INTO other SELECT i, 2 FROM integers WHERE i=(SELECT MAX(i) FROM drop table other; drop table integers; + +CREATE TABLE integers(i INTEGER, j TIMESTAMP TIME INDEX) +PARTITION ON COLUMNS (i) ( + i < 1000, + i >= 1000 AND i < 2000, + i >= 2000 +); + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1.i) ORDER BY i1.i; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT count(i) FROM integers WHERE i=i1.i) ORDER BY i1.i; + +DROP TABLE integers; + +CREATE TABLE t(ts timestamp time index, a INT, b INT)PARTITION ON COLUMNS (a) ( + a < 1000, + a >= 1000 AND a < 2000, + a >= 2000 +); + +CREATE TABLE t1(ts timestamp time index, a INT)PARTITION ON COLUMNS (a) ( + a < 1000, + a >= 1000 AND a < 2000, + a >= 2000 +); + +CREATE TABLE t2(ts timestamp time index, a INT)PARTITION ON COLUMNS (a) ( + a < 1000, + a >= 1000 AND a < 2000, + a >= 2000 +); + +INSERT INTO t(ts,a,b) VALUES (1,3,30),(2,1,10),(3,2,20); + +INSERT INTO t1(ts,a) VALUES (1,1),(2,3); + +INSERT INTO t2(ts,a) VALUES (1,2),(2,3); + +SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x; +-- expected: 1,2,3 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x; + +SELECT x, COUNT(*) AS c FROM (SELECT a AS x FROM t) sq GROUP BY x ORDER BY x; +-- expected: +-- x | c +-- 1 | 1 +-- 2 | 1 +-- 3 | 1 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x, COUNT(*) AS c FROM (SELECT a AS x FROM t) sq GROUP BY x ORDER BY x; + +SELECT DISTINCT x FROM (SELECT a AS x FROM t) sq ORDER BY x; +-- expecetd: 1,2,3 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT DISTINCT x FROM (SELECT a AS x FROM t) sq ORDER BY x; + +SELECT sq.x FROM (SELECT a AS x FROM t) sq ORDER BY sq.x; +-- expected: 1,2,3 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT sq.x FROM (SELECT a AS x FROM t) sq ORDER BY sq.x; + +SELECT y FROM (SELECT x AS y FROM (SELECT a AS x FROM t) sq1) sq2 ORDER BY y; +-- expected: 1,2,3 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT y FROM (SELECT x AS y FROM (SELECT a AS x FROM t) sq1) sq2 ORDER BY y; + +SELECT x, x + 1 AS y FROM (SELECT a AS x FROM t) sq ORDER BY x; +-- expected: +-- (x,y) +-- (1,2) +-- (2,3) +-- (3,4) + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x, x + 1 AS y FROM (SELECT a AS x FROM t) sq ORDER BY x; + +SELECT a FROM ((SELECT a FROM t1) UNION ALL (SELECT a FROM t2)) u ORDER BY a; +-- expected: 1,2,3,3 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT a FROM ((SELECT a FROM t1) UNION ALL (SELECT a FROM t2)) u ORDER BY a; + +SELECT u1.a +FROM (SELECT a FROM t1) u1 +JOIN (SELECT a FROM t2) u2 ON u1.a = u2.a +ORDER BY u1.a; +-- expected: 3 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT u1.a +FROM (SELECT a FROM t1) u1 +JOIN (SELECT a FROM t2) u2 ON u1.a = u2.a +ORDER BY u1.a; + +SELECT x FROM (VALUES (2),(1)) v(x) ORDER BY x; +-- expected: 1,2 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x FROM (VALUES (2),(1)) v(x) ORDER BY x; + +SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x LIMIT 2; +-- expected: 1,2 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +EXPLAIN SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x LIMIT 2; + +DROP TABLE t; +DROP TABLE t1; +DROP TABLE t2;