mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: support SubqueryAlias pushdown (#6963)
* wip enforce dist requirement rewriter Signed-off-by: discord9 <discord9@163.com> * feat: enforce dist req Signed-off-by: discord9 <discord9@163.com> * test: sqlness result Signed-off-by: discord9 <discord9@163.com> * fix: double projection Signed-off-by: discord9 <discord9@163.com> * test: fix sqlness Signed-off-by: discord9 <discord9@163.com> * refactor: per review Signed-off-by: discord9 <discord9@163.com> * docs: use btree map Signed-off-by: discord9 <discord9@163.com> * test: sqlness explain&comment Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
@@ -32,11 +32,12 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use table::metadata::TableType;
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use crate::dist_plan::analyzer::utils::aliased_columns_for;
|
||||
use crate::dist_plan::analyzer::utils::{aliased_columns_for, rewrite_merge_sort_exprs};
|
||||
use crate::dist_plan::commutativity::{
|
||||
Categorizer, Commutativity, partial_commutative_transformer,
|
||||
};
|
||||
use crate::dist_plan::merge_scan::MergeScanLogicalPlan;
|
||||
use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
|
||||
use crate::metrics::PUSH_DOWN_FALLBACK_ERRORS_TOTAL;
|
||||
use crate::plan::ExtractExpr;
|
||||
use crate::query_engine::DefaultSerializer;
|
||||
@@ -548,7 +549,7 @@ impl PlanRewriter {
|
||||
|
||||
// add merge scan as the new root
|
||||
let mut node = MergeScanLogicalPlan::new(
|
||||
on_node,
|
||||
on_node.clone(),
|
||||
false,
|
||||
// at this stage, the partition cols should be set
|
||||
// treat it as non-partitioned if None
|
||||
@@ -558,6 +559,15 @@ impl PlanRewriter {
|
||||
|
||||
// expand stages
|
||||
for new_stage in self.stage.drain(..) {
|
||||
// tracking alias for merge sort's sort exprs
|
||||
let new_stage = if let LogicalPlan::Extension(ext) = &new_stage
|
||||
&& let Some(merge_sort) = ext.node.as_any().downcast_ref::<MergeSortLogicalPlan>()
|
||||
{
|
||||
// TODO(discord9): change `on_node` to `node` once alias tracking is supported for merge scan
|
||||
rewrite_merge_sort_exprs(merge_sort, &on_node)?
|
||||
} else {
|
||||
new_stage
|
||||
};
|
||||
node = new_stage
|
||||
.with_new_exprs(new_stage.expressions_consider_join(), vec![node.clone()])?;
|
||||
}
|
||||
@@ -599,6 +609,7 @@ struct EnforceDistRequirementRewriter {
|
||||
/// when on `Projection` node, we don't need to apply the column requirements of `Aggregate` node
|
||||
/// because the `Projection` node is not in the scope of the `Aggregate` node
|
||||
cur_level: usize,
|
||||
plan_per_level: BTreeMap<usize, LogicalPlan>,
|
||||
}
|
||||
|
||||
impl EnforceDistRequirementRewriter {
|
||||
@@ -606,8 +617,67 @@ impl EnforceDistRequirementRewriter {
|
||||
Self {
|
||||
column_requirements,
|
||||
cur_level,
|
||||
plan_per_level: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a mapping from (original column, level) to aliased columns in current node of all
|
||||
/// applicable column requirements
|
||||
/// i.e. only column requirements with level >= `cur_level` will be considered
|
||||
fn get_current_applicable_column_requirements(
|
||||
&self,
|
||||
node: &LogicalPlan,
|
||||
) -> DfResult<BTreeMap<(Column, usize), BTreeSet<Column>>> {
|
||||
let col_req_per_level = self
|
||||
.column_requirements
|
||||
.iter()
|
||||
.filter(|(_, level)| *level >= self.cur_level)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// track alias for columns and use aliased columns instead
|
||||
// aliased col reqs at current level
|
||||
let mut result_alias_mapping = BTreeMap::new();
|
||||
let Some(child) = node.inputs().first().cloned() else {
|
||||
return Ok(Default::default());
|
||||
};
|
||||
for (col_req, level) in col_req_per_level {
|
||||
if let Some(original) = self.plan_per_level.get(level) {
|
||||
// query for alias in current plan
|
||||
let aliased_cols =
|
||||
aliased_columns_for(&col_req.iter().cloned().collect(), node, Some(original))?;
|
||||
for original_col in col_req {
|
||||
let aliased_cols = aliased_cols.get(original_col).cloned();
|
||||
if let Some(cols) = aliased_cols
|
||||
&& !cols.is_empty()
|
||||
{
|
||||
result_alias_mapping.insert((original_col.clone(), *level), cols);
|
||||
} else {
|
||||
// if no aliased column found in current node, there should be alias in child node as promised by enforce col reqs
|
||||
// because it should insert required columns in child node
|
||||
// so we can find the alias in child node
|
||||
// if not found, it's an internal error
|
||||
let aliases_in_child = aliased_columns_for(
|
||||
&[original_col.clone()].into(),
|
||||
child,
|
||||
Some(original),
|
||||
)?;
|
||||
let Some(aliases) = aliases_in_child
|
||||
.get(original_col)
|
||||
.cloned()
|
||||
.filter(|a| !a.is_empty())
|
||||
else {
|
||||
return Err(datafusion_common::DataFusionError::Internal(format!(
|
||||
"EnforceDistRequirementRewriter: no alias found for required column {original_col} in child plan {child} from original plan {original}",
|
||||
)));
|
||||
};
|
||||
|
||||
result_alias_mapping.insert((original_col.clone(), *level), aliases);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(result_alias_mapping)
|
||||
}
|
||||
}
|
||||
|
||||
impl TreeNodeRewriter for EnforceDistRequirementRewriter {
|
||||
@@ -621,6 +691,7 @@ impl TreeNodeRewriter for EnforceDistRequirementRewriter {
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
self.plan_per_level.insert(self.cur_level, node.clone());
|
||||
self.cur_level += 1;
|
||||
Ok(Transformed::no(node))
|
||||
}
|
||||
@@ -628,16 +699,11 @@ impl TreeNodeRewriter for EnforceDistRequirementRewriter {
|
||||
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||
self.cur_level -= 1;
|
||||
// first get all applicable column requirements
|
||||
let mut applicable_column_requirements = self
|
||||
.column_requirements
|
||||
.iter()
|
||||
.filter(|(_, level)| *level >= self.cur_level)
|
||||
.map(|(cols, _)| cols.clone())
|
||||
.reduce(|mut acc, cols| {
|
||||
acc.extend(cols);
|
||||
acc
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
// make sure all projection applicable scope has the required columns
|
||||
if let LogicalPlan::Projection(ref projection) = node {
|
||||
let mut applicable_column_requirements =
|
||||
self.get_current_applicable_column_requirements(&node)?;
|
||||
|
||||
debug!(
|
||||
"EnforceDistRequirementRewriter: applicable column requirements at level {} = {:?} for node {}",
|
||||
@@ -646,20 +712,28 @@ impl TreeNodeRewriter for EnforceDistRequirementRewriter {
|
||||
node.display()
|
||||
);
|
||||
|
||||
// make sure all projection applicable scope has the required columns
|
||||
if let LogicalPlan::Projection(ref projection) = node {
|
||||
for expr in &projection.expr {
|
||||
let (qualifier, name) = expr.qualified_name();
|
||||
let column = Column::new(qualifier, name);
|
||||
applicable_column_requirements.remove(&column);
|
||||
applicable_column_requirements.retain(|_col_level, alias_set| {
|
||||
// remove all columns that are already in the projection exprs
|
||||
!alias_set.contains(&column)
|
||||
});
|
||||
}
|
||||
if applicable_column_requirements.is_empty() {
|
||||
return Ok(Transformed::no(node));
|
||||
}
|
||||
|
||||
let mut new_exprs = projection.expr.clone();
|
||||
for col in &applicable_column_requirements {
|
||||
new_exprs.push(Expr::Column(col.clone()));
|
||||
for (col, alias_set) in &applicable_column_requirements {
|
||||
// use the first alias in alias set as the column to add
|
||||
new_exprs.push(Expr::Column(alias_set.first().cloned().ok_or_else(
|
||||
|| {
|
||||
datafusion_common::DataFusionError::Internal(
|
||||
format!("EnforceDistRequirementRewriter: alias set is empty, for column {col:?} in node {node}"),
|
||||
)
|
||||
},
|
||||
)?));
|
||||
}
|
||||
let new_node =
|
||||
node.with_new_exprs(new_exprs, node.inputs().into_iter().cloned().collect())?;
|
||||
@@ -668,6 +742,9 @@ impl TreeNodeRewriter for EnforceDistRequirementRewriter {
|
||||
applicable_column_requirements
|
||||
);
|
||||
|
||||
// update plan for later use
|
||||
self.plan_per_level.insert(self.cur_level, new_node.clone());
|
||||
|
||||
// still need to continue for next projection if applicable
|
||||
return Ok(Transformed::yes(new_node));
|
||||
}
|
||||
|
||||
@@ -199,6 +199,53 @@ fn expand_proj_sort_proj() {
|
||||
assert_eq!(expected, result.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn expand_proj_sort_partial_proj() {
|
||||
// use logging for better debugging
|
||||
init_default_ut_logging();
|
||||
let test_table = TestTable::table_with_name(0, "t".to_string());
|
||||
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
|
||||
DfTableProviderAdapter::new(test_table),
|
||||
)));
|
||||
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
|
||||
.unwrap()
|
||||
.project(vec![col("number"), col("pk1"), col("pk2"), col("pk3")])
|
||||
.unwrap()
|
||||
.project(vec![
|
||||
col("number"),
|
||||
col("pk1"),
|
||||
col("pk3"),
|
||||
col("pk1").eq(col("pk2")),
|
||||
])
|
||||
.unwrap()
|
||||
.sort(vec![col("t.pk1 = t.pk2").sort(true, true)])
|
||||
.unwrap()
|
||||
.project(vec![col("number"), col("t.pk1 = t.pk2").alias("eq_sorted")])
|
||||
.unwrap()
|
||||
.project(vec![col("number")])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let config = ConfigOptions::default();
|
||||
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
|
||||
|
||||
let expected = [
|
||||
"Projection: t.number",
|
||||
" MergeSort: eq_sorted ASC NULLS FIRST", // notice how `eq_sorted` is used here
|
||||
" MergeScan [is_placeholder=false, remote_input=[",
|
||||
"Projection: t.number, eq_sorted", // notice how `eq_sorted` is added not `t.pk1 = t.pk2`
|
||||
" Projection: t.number, t.pk1 = t.pk2 AS eq_sorted",
|
||||
" Sort: t.pk1 = t.pk2 ASC NULLS FIRST",
|
||||
" Projection: t.number, t.pk1, t.pk3, t.pk1 = t.pk2",
|
||||
" Projection: t.number, t.pk1, t.pk2, t.pk3", // notice this projection doesn't add `t.pk1 = t.pk2` column requirement
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
]
|
||||
.join("\n");
|
||||
assert_eq!(expected, result.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn expand_sort_limit() {
|
||||
// use logging for better debugging
|
||||
@@ -233,6 +280,8 @@ fn expand_sort_limit() {
|
||||
assert_eq!(expected, result.to_string());
|
||||
}
|
||||
|
||||
/// Test merge sort can apply enforce dist requirement columns correctly and use the aliased column correctly, as there is
|
||||
/// a aliased sort column, there is no need to add a duplicate sort column using it's original column name
|
||||
#[test]
|
||||
fn expand_sort_alias_limit() {
|
||||
// use logging for better debugging
|
||||
@@ -258,10 +307,10 @@ fn expand_sort_alias_limit() {
|
||||
let expected = [
|
||||
"Projection: something",
|
||||
" Limit: skip=0, fetch=10",
|
||||
" MergeSort: t.pk1 ASC NULLS LAST",
|
||||
" MergeSort: something ASC NULLS LAST",
|
||||
" MergeScan [is_placeholder=false, remote_input=[",
|
||||
"Limit: skip=0, fetch=10",
|
||||
" Projection: t.pk1 AS something, t.pk1",
|
||||
" Projection: t.pk1 AS something",
|
||||
" Sort: t.pk1 ASC NULLS LAST",
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
@@ -1332,10 +1381,73 @@ fn transform_unalighed_join_with_alias() {
|
||||
" MergeScan [is_placeholder=false, remote_input=[",
|
||||
"TableScan: t",
|
||||
"]]",
|
||||
" SubqueryAlias: right",
|
||||
" Projection: t.number",
|
||||
" Projection: right.number",
|
||||
" MergeScan [is_placeholder=false, remote_input=[",
|
||||
"TableScan: t",
|
||||
"SubqueryAlias: right",
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
]
|
||||
.join("\n");
|
||||
assert_eq!(expected, result.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transform_subquery_sort_alias() {
|
||||
init_default_ut_logging();
|
||||
|
||||
let test_table = TestTable::table_with_name(0, "numbers".to_string());
|
||||
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
|
||||
DfTableProviderAdapter::new(test_table),
|
||||
)));
|
||||
|
||||
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
|
||||
.unwrap()
|
||||
.alias("a")
|
||||
.unwrap()
|
||||
.sort(vec![col("a.number").sort(true, false)])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
let config = ConfigOptions::default();
|
||||
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
|
||||
let expected = [
|
||||
"Projection: a.pk1, a.pk2, a.pk3, a.ts, a.number",
|
||||
" MergeSort: a.number ASC NULLS LAST",
|
||||
" MergeScan [is_placeholder=false, remote_input=[",
|
||||
"Sort: a.number ASC NULLS LAST",
|
||||
" SubqueryAlias: a",
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
]
|
||||
.join("\n");
|
||||
assert_eq!(expected, result.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transform_sort_subquery_alias() {
|
||||
init_default_ut_logging();
|
||||
let test_table = TestTable::table_with_name(0, "numbers".to_string());
|
||||
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
|
||||
DfTableProviderAdapter::new(test_table),
|
||||
)));
|
||||
|
||||
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source, None, vec![])
|
||||
.unwrap()
|
||||
.sort(vec![col("t.number").sort(true, false)])
|
||||
.unwrap()
|
||||
.alias("a")
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
let config = ConfigOptions::default();
|
||||
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
|
||||
let expected = [
|
||||
"Projection: a.pk1, a.pk2, a.pk3, a.ts, a.number",
|
||||
" MergeSort: a.number ASC NULLS LAST",
|
||||
" MergeScan [is_placeholder=false, remote_input=[",
|
||||
"SubqueryAlias: a",
|
||||
" Sort: t.number ASC NULLS LAST",
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
@@ -17,45 +17,117 @@ use std::sync::Arc;
|
||||
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode as _};
|
||||
use datafusion_expr::expr::Alias;
|
||||
use datafusion_expr::{Expr, LogicalPlan};
|
||||
use datafusion_expr::{Expr, Extension, LogicalPlan};
|
||||
|
||||
use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
|
||||
use crate::plan::ExtractExpr as _;
|
||||
|
||||
fn rewrite_column(
|
||||
mapping: &BTreeMap<Column, BTreeSet<Column>>,
|
||||
original_node: &LogicalPlan,
|
||||
alias_node: &LogicalPlan,
|
||||
) -> impl Fn(Expr) -> DfResult<Transformed<Expr>> {
|
||||
move |e: Expr| {
|
||||
if let Expr::Column(col) = e {
|
||||
if let Some(aliased_cols) = mapping.get(&col) {
|
||||
// if multiple alias is available, just use first one
|
||||
if let Some(aliased_col) = aliased_cols.iter().next() {
|
||||
Ok(Transformed::yes(Expr::Column(aliased_col.clone())))
|
||||
} else {
|
||||
Err(datafusion_common::DataFusionError::Internal(format!(
|
||||
"PlanRewriter: expand: column {col} from {original_node}\n has empty alias set in plan: {alias_node}\n but expect at least one alias",
|
||||
)))
|
||||
}
|
||||
} else {
|
||||
Err(datafusion_common::DataFusionError::Internal(format!(
|
||||
"PlanRewriter: expand: column {col} from {original_node}\n has no alias in plan: {alias_node}",
|
||||
)))
|
||||
}
|
||||
} else {
|
||||
Ok(Transformed::no(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrite the expressions of the given merge sort plan from original columns(at merge sort's input plan) to aliased columns at the given aliased node
|
||||
pub fn rewrite_merge_sort_exprs(
|
||||
merge_sort: &MergeSortLogicalPlan,
|
||||
aliased_node: &LogicalPlan,
|
||||
) -> DfResult<LogicalPlan> {
|
||||
let merge_sort = LogicalPlan::Extension(Extension {
|
||||
node: Arc::new(merge_sort.clone()),
|
||||
});
|
||||
|
||||
// tracking alias for sort exprs,
|
||||
let sort_input = merge_sort.inputs().first().cloned().ok_or_else(|| {
|
||||
datafusion_common::DataFusionError::Internal(format!(
|
||||
"PlanRewriter: expand: merge sort stage has no input: {merge_sort}"
|
||||
))
|
||||
})?;
|
||||
let sort_exprs = merge_sort.expressions_consider_join();
|
||||
let column_refs = sort_exprs
|
||||
.iter()
|
||||
.flat_map(|e| e.column_refs().into_iter().cloned())
|
||||
.collect::<BTreeSet<_>>();
|
||||
let column_alias_mapping = aliased_columns_for(&column_refs, aliased_node, Some(sort_input))?;
|
||||
let aliased_sort_exprs = sort_exprs
|
||||
.into_iter()
|
||||
.map(|e| {
|
||||
e.transform(rewrite_column(
|
||||
&column_alias_mapping,
|
||||
&merge_sort,
|
||||
aliased_node,
|
||||
))
|
||||
})
|
||||
.map(|e| e.map(|e| e.data))
|
||||
.collect::<DfResult<Vec<_>>>()?;
|
||||
let new_merge_sort = merge_sort.with_new_exprs(
|
||||
aliased_sort_exprs,
|
||||
merge_sort.inputs().into_iter().cloned().collect(),
|
||||
)?;
|
||||
Ok(new_merge_sort)
|
||||
}
|
||||
|
||||
/// Return all the original columns(at original node) for the given aliased columns at the aliased node
|
||||
///
|
||||
/// if `original_node` is None, it means original columns are from leaf node
|
||||
///
|
||||
/// Return value use `BTreeMap` to have deterministic order for choose first alias when multiple alias exist
|
||||
#[allow(unused)]
|
||||
pub fn original_column_for(
|
||||
aliased_columns: &HashSet<Column>,
|
||||
aliased_columns: &BTreeSet<Column>,
|
||||
aliased_node: LogicalPlan,
|
||||
original_node: Option<Arc<LogicalPlan>>,
|
||||
) -> DfResult<HashMap<Column, Column>> {
|
||||
let schema_cols: HashSet<Column> = aliased_node.schema().columns().iter().cloned().collect();
|
||||
let cur_aliases: HashMap<Column, Column> = aliased_columns
|
||||
) -> DfResult<BTreeMap<Column, Column>> {
|
||||
let schema_cols: BTreeSet<Column> = aliased_node.schema().columns().iter().cloned().collect();
|
||||
let cur_aliases: BTreeMap<Column, Column> = aliased_columns
|
||||
.iter()
|
||||
.filter(|c| schema_cols.contains(c))
|
||||
.map(|c| (c.clone(), c.clone()))
|
||||
.collect();
|
||||
|
||||
if cur_aliases.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
return Ok(BTreeMap::new());
|
||||
}
|
||||
|
||||
original_column_for_inner(cur_aliases, &aliased_node, &original_node)
|
||||
}
|
||||
|
||||
fn original_column_for_inner(
|
||||
mut cur_aliases: HashMap<Column, Column>,
|
||||
mut cur_aliases: BTreeMap<Column, Column>,
|
||||
node: &LogicalPlan,
|
||||
original_node: &Option<Arc<LogicalPlan>>,
|
||||
) -> DfResult<HashMap<Column, Column>> {
|
||||
) -> DfResult<BTreeMap<Column, Column>> {
|
||||
let mut current_node = node;
|
||||
|
||||
loop {
|
||||
// Base case: check if we've reached the target node
|
||||
if let Some(original_node) = original_node {
|
||||
if *current_node == **original_node {
|
||||
if let Some(original_node) = original_node
|
||||
&& *current_node == **original_node
|
||||
{
|
||||
return Ok(cur_aliases);
|
||||
}
|
||||
} else if current_node.inputs().is_empty() {
|
||||
// leaf node reached
|
||||
return Ok(cur_aliases);
|
||||
@@ -63,14 +135,15 @@ fn original_column_for_inner(
|
||||
|
||||
// Validate node has exactly one child
|
||||
if current_node.inputs().len() != 1 {
|
||||
return Err(datafusion::error::DataFusionError::Internal(
|
||||
"only accept plan with at most one child".to_string(),
|
||||
));
|
||||
return Err(datafusion::error::DataFusionError::Internal(format!(
|
||||
"only accept plan with at most one child, found: {}",
|
||||
current_node
|
||||
)));
|
||||
}
|
||||
|
||||
// Get alias layer and update aliases
|
||||
let layer = get_alias_layer_from_node(current_node)?;
|
||||
let mut new_aliases = HashMap::new();
|
||||
let mut new_aliases = BTreeMap::new();
|
||||
for (start_alias, cur_alias) in cur_aliases {
|
||||
if let Some(old_column) = layer.get_old_from_new(cur_alias.clone()) {
|
||||
new_aliases.insert(start_alias, old_column);
|
||||
@@ -86,39 +159,41 @@ fn original_column_for_inner(
|
||||
/// Return all the aliased columns(at aliased node) for the given original columns(at original node)
|
||||
///
|
||||
/// if `original_node` is None, it means original columns are from leaf node
|
||||
///
|
||||
/// Return value use `BTreeMap` to have deterministic order for choose first alias when multiple alias exist
|
||||
pub fn aliased_columns_for(
|
||||
original_columns: &HashSet<Column>,
|
||||
original_columns: &BTreeSet<Column>,
|
||||
aliased_node: &LogicalPlan,
|
||||
original_node: Option<&LogicalPlan>,
|
||||
) -> DfResult<HashMap<Column, HashSet<Column>>> {
|
||||
let initial_aliases: HashMap<Column, HashSet<Column>> = {
|
||||
) -> DfResult<BTreeMap<Column, BTreeSet<Column>>> {
|
||||
let initial_aliases: BTreeMap<Column, BTreeSet<Column>> = {
|
||||
if let Some(original) = &original_node {
|
||||
let schema_cols: HashSet<Column> = original.schema().columns().into_iter().collect();
|
||||
let schema_cols: BTreeSet<Column> = original.schema().columns().into_iter().collect();
|
||||
original_columns
|
||||
.iter()
|
||||
.filter(|c| schema_cols.contains(c))
|
||||
.map(|c| (c.clone(), HashSet::from([c.clone()])))
|
||||
.map(|c| (c.clone(), [c.clone()].into()))
|
||||
.collect()
|
||||
} else {
|
||||
original_columns
|
||||
.iter()
|
||||
.map(|c| (c.clone(), HashSet::from([c.clone()])))
|
||||
.map(|c| (c.clone(), [c.clone()].into()))
|
||||
.collect()
|
||||
}
|
||||
};
|
||||
|
||||
if initial_aliases.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
return Ok(BTreeMap::new());
|
||||
}
|
||||
|
||||
aliased_columns_for_inner(initial_aliases, aliased_node, original_node)
|
||||
}
|
||||
|
||||
fn aliased_columns_for_inner(
|
||||
cur_aliases: HashMap<Column, HashSet<Column>>,
|
||||
cur_aliases: BTreeMap<Column, BTreeSet<Column>>,
|
||||
node: &LogicalPlan,
|
||||
original_node: Option<&LogicalPlan>,
|
||||
) -> DfResult<HashMap<Column, HashSet<Column>>> {
|
||||
) -> DfResult<BTreeMap<Column, BTreeSet<Column>>> {
|
||||
// First, collect the path from current node to the target node
|
||||
let mut path = Vec::new();
|
||||
let mut current_node = node;
|
||||
@@ -126,10 +201,10 @@ fn aliased_columns_for_inner(
|
||||
// Descend to the target node, collecting nodes along the way
|
||||
loop {
|
||||
// Base case: check if we've reached the target node
|
||||
if let Some(original_node) = original_node {
|
||||
if *current_node == *original_node {
|
||||
if let Some(original_node) = original_node
|
||||
&& *current_node == *original_node
|
||||
{
|
||||
break;
|
||||
}
|
||||
} else if current_node.inputs().is_empty() {
|
||||
// leaf node reached
|
||||
break;
|
||||
@@ -137,9 +212,10 @@ fn aliased_columns_for_inner(
|
||||
|
||||
// Validate node has exactly one child
|
||||
if current_node.inputs().len() != 1 {
|
||||
return Err(datafusion::error::DataFusionError::Internal(
|
||||
"only accept plan with at most one child".to_string(),
|
||||
));
|
||||
return Err(datafusion::error::DataFusionError::Internal(format!(
|
||||
"only accept plan with at most one child, found: {}",
|
||||
current_node
|
||||
)));
|
||||
}
|
||||
|
||||
// Add current node to path and move to child
|
||||
@@ -151,9 +227,9 @@ fn aliased_columns_for_inner(
|
||||
let mut result = cur_aliases;
|
||||
for &node_in_path in path.iter().rev() {
|
||||
let layer = get_alias_layer_from_node(node_in_path)?;
|
||||
let mut new_aliases = HashMap::new();
|
||||
let mut new_aliases = BTreeMap::new();
|
||||
for (original_column, cur_alias_set) in result {
|
||||
let mut new_alias_set = HashSet::new();
|
||||
let mut new_alias_set = BTreeSet::new();
|
||||
for cur_alias in cur_alias_set {
|
||||
new_alias_set.extend(layer.get_new_from_old(cur_alias.clone()));
|
||||
}
|
||||
@@ -168,6 +244,7 @@ fn aliased_columns_for_inner(
|
||||
}
|
||||
|
||||
/// Return a mapping of original column to all the aliased columns in current node of the plan
|
||||
/// TODO(discord9): also support merge scan node
|
||||
fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
|
||||
match node {
|
||||
LogicalPlan::Projection(proj) => Ok(get_alias_layer_from_exprs(&proj.expr)),
|
||||
@@ -181,7 +258,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
|
||||
old_column.name().to_string(),
|
||||
);
|
||||
// mapping from old_column to new_column
|
||||
layer.insert_alias(old_column, HashSet::from([new_column]));
|
||||
layer.insert_alias(old_column, [new_column].into());
|
||||
}
|
||||
Ok(layer)
|
||||
}
|
||||
@@ -189,7 +266,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
|
||||
let columns = scan.projected_schema.columns();
|
||||
let mut layer = AliasLayer::default();
|
||||
for col in columns {
|
||||
layer.insert_alias(col.clone(), HashSet::from([col.clone()]));
|
||||
layer.insert_alias(col.clone(), [col.clone()].into());
|
||||
}
|
||||
Ok(layer)
|
||||
}
|
||||
@@ -198,9 +275,10 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
|
||||
.inputs()
|
||||
.first()
|
||||
.ok_or_else(|| {
|
||||
datafusion::error::DataFusionError::Internal(
|
||||
"only accept plan with at most one child".to_string(),
|
||||
)
|
||||
datafusion::error::DataFusionError::Internal(format!(
|
||||
"only accept plan with at most one child, found: {}",
|
||||
node
|
||||
))
|
||||
})?
|
||||
.schema();
|
||||
let output_schema = node.schema();
|
||||
@@ -221,7 +299,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
|
||||
// all input is in output, so it's just adding some columns, we can do identity mapping for input columns
|
||||
let mut layer = AliasLayer::default();
|
||||
for col in input_columns {
|
||||
layer.insert_alias(col.clone(), HashSet::from([col.clone()]));
|
||||
layer.insert_alias(col.clone(), [col.clone()].into());
|
||||
}
|
||||
Ok(layer)
|
||||
} else {
|
||||
@@ -242,7 +320,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
|
||||
|
||||
let mut layer = AliasLayer::default();
|
||||
for col in &common_columns {
|
||||
layer.insert_alias(col.clone(), HashSet::from([col.clone()]));
|
||||
layer.insert_alias(col.clone(), [col.clone()].into());
|
||||
}
|
||||
Ok(layer)
|
||||
}
|
||||
@@ -250,7 +328,7 @@ fn get_alias_layer_from_node(node: &LogicalPlan) -> DfResult<AliasLayer> {
|
||||
// identity mapping
|
||||
let mut layer = AliasLayer::default();
|
||||
for col in output_schema.columns() {
|
||||
layer.insert_alias(col.clone(), HashSet::from([col.clone()]));
|
||||
layer.insert_alias(col.clone(), [col.clone()].into());
|
||||
}
|
||||
Ok(layer)
|
||||
}
|
||||
@@ -403,71 +481,52 @@ mod tests {
|
||||
let child = plan.inputs()[0].clone();
|
||||
|
||||
assert_eq!(
|
||||
aliased_columns_for(
|
||||
&HashSet::from([qcol("pk1"), qcol("pk2")]),
|
||||
&plan,
|
||||
Some(&child)
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([
|
||||
(qcol("pk1"), HashSet::from([qcol("pk5")])),
|
||||
(qcol("pk2"), HashSet::from([qcol("pk4")]))
|
||||
])
|
||||
aliased_columns_for(&[qcol("pk1"), qcol("pk2")].into(), &plan, Some(&child)).unwrap(),
|
||||
[
|
||||
(qcol("pk1"), [qcol("pk5")].into()),
|
||||
(qcol("pk2"), [qcol("pk4")].into())
|
||||
]
|
||||
.into()
|
||||
);
|
||||
|
||||
// columns not in the plan should return empty mapping
|
||||
assert_eq!(
|
||||
aliased_columns_for(
|
||||
&HashSet::from([qcol("pk1"), qcol("pk2")]),
|
||||
&plan,
|
||||
Some(&plan)
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([])
|
||||
aliased_columns_for(&[qcol("pk1"), qcol("pk2")].into(), &plan, Some(&plan)).unwrap(),
|
||||
[].into()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.pk3")]), &plan, Some(&child)).unwrap(),
|
||||
HashMap::from([])
|
||||
aliased_columns_for(&[qcol("t.pk3")].into(), &plan, Some(&child)).unwrap(),
|
||||
[].into()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
original_column_for(&[qcol("pk5"), qcol("pk4")].into(), plan.clone(), None).unwrap(),
|
||||
[(qcol("pk5"), qcol("t.pk3")), (qcol("pk4"), qcol("t.pk3"))].into()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
aliased_columns_for(&[qcol("pk3")].into(), &plan, None).unwrap(),
|
||||
[(qcol("pk3"), [qcol("pk5"), qcol("pk4")].into())].into()
|
||||
);
|
||||
assert_eq!(
|
||||
original_column_for(&[qcol("pk1"), qcol("pk2")].into(), child.clone(), None).unwrap(),
|
||||
[(qcol("pk1"), qcol("t.pk3")), (qcol("pk2"), qcol("t.pk3"))].into()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
aliased_columns_for(&[qcol("pk3")].into(), &child, None).unwrap(),
|
||||
[(qcol("pk3"), [qcol("pk1"), qcol("pk2")].into())].into()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([qcol("pk5"), qcol("pk4")]),
|
||||
plan.clone(),
|
||||
None
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("pk5"), qcol("t.pk3")), (qcol("pk4"), qcol("t.pk3"))])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("pk3")]), &plan, None).unwrap(),
|
||||
HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk5"), qcol("pk4")]))])
|
||||
);
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([qcol("pk1"), qcol("pk2")]),
|
||||
child.clone(),
|
||||
None
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("pk1"), qcol("t.pk3")), (qcol("pk2"), qcol("t.pk3"))])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("pk3")]), &child, None).unwrap(),
|
||||
HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk1"), qcol("pk2")]))])
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([qcol("pk4"), qcol("pk5")]),
|
||||
&[qcol("pk4"), qcol("pk5")].into(),
|
||||
plan.clone(),
|
||||
Some(Arc::new(child.clone()))
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("pk4"), qcol("pk2")), (qcol("pk5"), qcol("pk1"))])
|
||||
[(qcol("pk4"), qcol("pk2")), (qcol("pk5"), qcol("pk1"))].into()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -492,44 +551,42 @@ mod tests {
|
||||
|
||||
// Test aliased_columns_for from scan to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, Some(&scan_plan))
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))])
|
||||
aliased_columns_for(&[qcol("t.number")].into(), &plan, Some(&scan_plan)).unwrap(),
|
||||
[(qcol("t.number"), [qcol("a.number")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from sort to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, Some(&sort_plan))
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))])
|
||||
aliased_columns_for(&[qcol("t.number")].into(), &plan, Some(&sort_plan)).unwrap(),
|
||||
[(qcol("t.number"), [qcol("a.number")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from leaf to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.number")]), &plan, None).unwrap(),
|
||||
HashMap::from([(qcol("t.number"), HashSet::from([qcol("a.number")]))])
|
||||
aliased_columns_for(&[qcol("t.number")].into(), &plan, None).unwrap(),
|
||||
[(qcol("t.number"), [qcol("a.number")].into())].into()
|
||||
);
|
||||
|
||||
// Test original_column_for from final plan to scan
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([qcol("a.number")]),
|
||||
&[qcol("a.number")].into(),
|
||||
plan.clone(),
|
||||
Some(Arc::new(scan_plan.clone()))
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("a.number"), qcol("t.number"))])
|
||||
[(qcol("a.number"), qcol("t.number"))].into()
|
||||
);
|
||||
|
||||
// Test original_column_for from final plan to sort
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([qcol("a.number")]),
|
||||
&[qcol("a.number")].into(),
|
||||
plan.clone(),
|
||||
Some(Arc::new(sort_plan.clone()))
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("a.number"), qcol("t.number"))])
|
||||
[(qcol("a.number"), qcol("t.number"))].into()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -564,63 +621,58 @@ mod tests {
|
||||
// Test original_column_for from final plan to scan
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([qcol("pk1")]),
|
||||
&[qcol("pk1")].into(),
|
||||
plan.clone(),
|
||||
Some(Arc::new(scan_plan.clone()))
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("pk1"), qcol("t.pk2"))])
|
||||
[(qcol("pk1"), qcol("t.pk2"))].into()
|
||||
);
|
||||
|
||||
// Test original_column_for from final plan to first projection
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([qcol("pk1")]),
|
||||
&[qcol("pk1")].into(),
|
||||
plan.clone(),
|
||||
Some(Arc::new(first_proj.clone()))
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("pk1"), qcol("pk3"))])
|
||||
[(qcol("pk1"), qcol("pk3"))].into()
|
||||
);
|
||||
|
||||
// Test original_column_for from final plan to leaf
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([qcol("pk1")]),
|
||||
&[qcol("pk1")].into(),
|
||||
plan.clone(),
|
||||
Some(Arc::new(plan.clone()))
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("pk1"), qcol("pk1"))])
|
||||
[(qcol("pk1"), qcol("pk1"))].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from scan to first projection
|
||||
assert_eq!(
|
||||
aliased_columns_for(
|
||||
&HashSet::from([qcol("t.pk2")]),
|
||||
&first_proj,
|
||||
Some(&scan_plan)
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("pk3")]))])
|
||||
aliased_columns_for(&[qcol("t.pk2")].into(), &first_proj, Some(&scan_plan)).unwrap(),
|
||||
[(qcol("t.pk2"), [qcol("pk3")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from first projection to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("pk3")]), &plan, Some(&first_proj)).unwrap(),
|
||||
HashMap::from([(qcol("pk3"), HashSet::from([qcol("pk1")]))])
|
||||
aliased_columns_for(&[qcol("pk3")].into(), &plan, Some(&first_proj)).unwrap(),
|
||||
[(qcol("pk3"), [qcol("pk1")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from scan to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.pk2")]), &plan, Some(&scan_plan)).unwrap(),
|
||||
HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("pk1")]))])
|
||||
aliased_columns_for(&[qcol("t.pk2")].into(), &plan, Some(&scan_plan)).unwrap(),
|
||||
[(qcol("t.pk2"), [qcol("pk1")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from leaf to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("pk2")]), &plan, None).unwrap(),
|
||||
HashMap::from([(qcol("pk2"), HashSet::from([qcol("pk1")]))])
|
||||
aliased_columns_for(&[qcol("pk2")].into(), &plan, None).unwrap(),
|
||||
[(qcol("pk2"), [qcol("pk1")].into())].into()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -647,8 +699,8 @@ mod tests {
|
||||
|
||||
// Test aliased_columns_for from scan to projection
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.pk2")]), &plan, Some(&scan_plan)).unwrap(),
|
||||
HashMap::from([(qcol("t.pk2"), HashSet::from([qcol("a.pk1")]))])
|
||||
aliased_columns_for(&[qcol("t.pk2")].into(), &plan, Some(&scan_plan)).unwrap(),
|
||||
[(qcol("t.pk2"), [qcol("a.pk1")].into())].into()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -686,19 +738,14 @@ mod tests {
|
||||
|
||||
// Test aliased_columns_for from scan to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&scan_plan)).unwrap(),
|
||||
HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("pk42")]))])
|
||||
aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&scan_plan)).unwrap(),
|
||||
[(qcol("t.pk1"), [qcol("pk42")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from scan to first projection
|
||||
assert_eq!(
|
||||
aliased_columns_for(
|
||||
&HashSet::from([Column::from_name("pk1")]),
|
||||
&first_proj,
|
||||
None
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(Column::from_name("pk1"), HashSet::from([qcol("pk3")]))])
|
||||
aliased_columns_for(&[Column::from_name("pk1")].into(), &first_proj, None).unwrap(),
|
||||
[(Column::from_name("pk1"), [qcol("pk3")].into())].into()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -728,31 +775,26 @@ mod tests {
|
||||
|
||||
// Test aliased_columns_for from scan to final plan (identity mapping for aggregates)
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&scan_plan)).unwrap(),
|
||||
HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))])
|
||||
aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&scan_plan)).unwrap(),
|
||||
[(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from scan to first aggregate
|
||||
assert_eq!(
|
||||
aliased_columns_for(
|
||||
&HashSet::from([qcol("t.pk1")]),
|
||||
&first_aggr,
|
||||
Some(&scan_plan)
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))])
|
||||
aliased_columns_for(&[qcol("t.pk1")].into(), &first_aggr, Some(&scan_plan)).unwrap(),
|
||||
[(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from first aggregate to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([qcol("t.pk1")]), &plan, Some(&first_aggr)).unwrap(),
|
||||
HashMap::from([(qcol("t.pk1"), HashSet::from([qcol("t.pk1")]))])
|
||||
aliased_columns_for(&[qcol("t.pk1")].into(), &plan, Some(&first_aggr)).unwrap(),
|
||||
[(qcol("t.pk1"), [qcol("t.pk1")].into())].into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from leaf to final plan
|
||||
assert_eq!(
|
||||
aliased_columns_for(&HashSet::from([Column::from_name("pk1")]), &plan, None).unwrap(),
|
||||
HashMap::from([(Column::from_name("pk1"), HashSet::from([qcol("t.pk1")]))])
|
||||
aliased_columns_for(&[Column::from_name("pk1")].into(), &plan, None).unwrap(),
|
||||
[(Column::from_name("pk1"), [qcol("t.pk1")].into())].into()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -788,29 +830,31 @@ mod tests {
|
||||
// Test original_column_for from projection to second aggregate for aggr gen column
|
||||
assert_eq!(
|
||||
original_column_for(
|
||||
&HashSet::from([Column::from_name("min_max_number")]),
|
||||
&[Column::from_name("min_max_number")].into(),
|
||||
plan.clone(),
|
||||
Some(Arc::new(second_aggr.clone()))
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(
|
||||
[(
|
||||
Column::from_name("min_max_number"),
|
||||
Column::from_name("min(max(t.number))")
|
||||
)])
|
||||
)]
|
||||
.into()
|
||||
);
|
||||
|
||||
// Test aliased_columns_for from second aggregate to projection
|
||||
assert_eq!(
|
||||
aliased_columns_for(
|
||||
&HashSet::from([Column::from_name("min(max(t.number))")]),
|
||||
&[Column::from_name("min(max(t.number))")].into(),
|
||||
&plan,
|
||||
Some(&second_aggr)
|
||||
)
|
||||
.unwrap(),
|
||||
HashMap::from([(
|
||||
[(
|
||||
Column::from_name("min(max(t.number))"),
|
||||
HashSet::from([Column::from_name("min_max_number")])
|
||||
)])
|
||||
[Column::from_name("min_max_number")].into()
|
||||
)]
|
||||
.into()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +187,7 @@ impl Categorizer {
|
||||
LogicalPlan::TableScan(_) => Commutativity::Commutative,
|
||||
LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative,
|
||||
LogicalPlan::Subquery(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented,
|
||||
LogicalPlan::SubqueryAlias(_) => Commutativity::Commutative,
|
||||
LogicalPlan::Limit(limit) => {
|
||||
// Only execute `fetch` on remote nodes.
|
||||
// wait for https://github.com/apache/arrow-datafusion/pull/7669
|
||||
|
||||
@@ -49,9 +49,9 @@ EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1
|
||||
+-+-+
|
||||
| logical_plan_| Sort: i1.i ASC NULLS LAST_|
|
||||
|_|_LeftSemi Join: i1.i = __correlated_sq_1.i_|
|
||||
|_|_SubqueryAlias: i1_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| TableScan: integers_|
|
||||
|_| SubqueryAlias: i1_|
|
||||
|_|_TableScan: integers_|
|
||||
|_| ]]_|
|
||||
|_|_SubqueryAlias: __correlated_sq_1_|
|
||||
|_|_Projection: integers.i_|
|
||||
@@ -155,3 +155,520 @@ drop table integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE integers(i INTEGER, j TIMESTAMP TIME INDEX)
|
||||
PARTITION ON COLUMNS (i) (
|
||||
i < 1000,
|
||||
i >= 1000 AND i < 2000,
|
||||
i >= 2000
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1.i) ORDER BY i1.i;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| Sort: i1.i ASC NULLS LAST_|
|
||||
|_|_LeftSemi Join: i1.i = __correlated_sq_1.i_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| SubqueryAlias: i1_|
|
||||
|_|_TableScan: integers_|
|
||||
|_| ]]_|
|
||||
|_|_SubqueryAlias: __correlated_sq_1_|
|
||||
|_|_Projection: integers.i_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| TableScan: integers_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [i@0 ASC NULLS LAST]_|
|
||||
|_|_SortExec: expr=[i@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
||||
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|
||||
|_|_REDACTED
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_ProjectionExec: expr=[i@0 as i]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT count(i) FROM integers WHERE i=i1.i) ORDER BY i1.i;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| Sort: i1.i ASC NULLS LAST_|
|
||||
|_|_LeftSemi Join: i1.i = __correlated_sq_1.i_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| SubqueryAlias: i1_|
|
||||
|_|_TableScan: integers_|
|
||||
|_| ]]_|
|
||||
|_|_SubqueryAlias: __correlated_sq_1_|
|
||||
|_|_Aggregate: groupBy=[[integers.i]], aggr=[[]]_|
|
||||
|_|_Projection: integers.i_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| TableScan: integers_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [i@0 ASC NULLS LAST]_|
|
||||
|_|_SortExec: expr=[i@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
||||
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|
||||
|_|_REDACTED
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_AggregateExec: mode=SinglePartitioned, gby=[i@0 as i], aggr=[]_|
|
||||
|_|_ProjectionExec: expr=[i@0 as i]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE t(ts timestamp time index, a INT, b INT)PARTITION ON COLUMNS (a) (
|
||||
a < 1000,
|
||||
a >= 1000 AND a < 2000,
|
||||
a >= 2000
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE t1(ts timestamp time index, a INT)PARTITION ON COLUMNS (a) (
|
||||
a < 1000,
|
||||
a >= 1000 AND a < 2000,
|
||||
a >= 2000
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE t2(ts timestamp time index, a INT)PARTITION ON COLUMNS (a) (
|
||||
a < 1000,
|
||||
a >= 1000 AND a < 2000,
|
||||
a >= 2000
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO t(ts,a,b) VALUES (1,3,30),(2,1,10),(3,2,20);
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
INSERT INTO t1(ts,a) VALUES (1,1),(2,3);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
INSERT INTO t2(ts,a) VALUES (1,2),(2,3);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
+---+
|
||||
| x |
|
||||
+---+
|
||||
| 1 |
|
||||
| 2 |
|
||||
| 3 |
|
||||
+---+
|
||||
|
||||
-- expected: 1,2,3
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| MergeSort: sq.x ASC NULLS LAST_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Sort: sq.x ASC NULLS LAST_|
|
||||
|_|_Projection: sq.x_|
|
||||
|_|_SubqueryAlias: sq_|
|
||||
|_|_Projection: t.a AS x_|
|
||||
|_|_TableScan: t_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT x, COUNT(*) AS c FROM (SELECT a AS x FROM t) sq GROUP BY x ORDER BY x;
|
||||
|
||||
+---+---+
|
||||
| x | c |
|
||||
+---+---+
|
||||
| 1 | 1 |
|
||||
| 2 | 1 |
|
||||
| 3 | 1 |
|
||||
+---+---+
|
||||
|
||||
-- expected:
|
||||
-- x | c
|
||||
-- 1 | 1
|
||||
-- 2 | 1
|
||||
-- 3 | 1
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x, COUNT(*) AS c FROM (SELECT a AS x FROM t) sq GROUP BY x ORDER BY x;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| MergeSort: sq.x ASC NULLS LAST_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Sort: sq.x ASC NULLS LAST_|
|
||||
|_|_Projection: sq.x, count(Int64(1)) AS count(*) AS c_|
|
||||
|_|_Aggregate: groupBy=[[sq.x]], aggr=[[count(Int64(1))]]_|
|
||||
|_|_SubqueryAlias: sq_|
|
||||
|_|_Projection: t.a AS x_|
|
||||
|_|_TableScan: t_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT DISTINCT x FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
+---+
|
||||
| x |
|
||||
+---+
|
||||
| 1 |
|
||||
| 2 |
|
||||
| 3 |
|
||||
+---+
|
||||
|
||||
-- expecetd: 1,2,3
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT DISTINCT x FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| Sort: sq.x ASC NULLS LAST_|
|
||||
|_|_Aggregate: groupBy=[[sq.x]], aggr=[[]]_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Projection: sq.x_|
|
||||
|_|_SubqueryAlias: sq_|
|
||||
|_|_Projection: t.a AS x_|
|
||||
|_|_TableScan: t_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_|
|
||||
|_|_SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
||||
|_|_AggregateExec: mode=SinglePartitioned, gby=[x@0 as x], aggr=[]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT sq.x FROM (SELECT a AS x FROM t) sq ORDER BY sq.x;
|
||||
|
||||
+---+
|
||||
| x |
|
||||
+---+
|
||||
| 1 |
|
||||
| 2 |
|
||||
| 3 |
|
||||
+---+
|
||||
|
||||
-- expected: 1,2,3
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT sq.x FROM (SELECT a AS x FROM t) sq ORDER BY sq.x;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| MergeSort: sq.x ASC NULLS LAST_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Sort: sq.x ASC NULLS LAST_|
|
||||
|_|_Projection: sq.x_|
|
||||
|_|_SubqueryAlias: sq_|
|
||||
|_|_Projection: t.a AS x_|
|
||||
|_|_TableScan: t_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT y FROM (SELECT x AS y FROM (SELECT a AS x FROM t) sq1) sq2 ORDER BY y;
|
||||
|
||||
+---+
|
||||
| y |
|
||||
+---+
|
||||
| 1 |
|
||||
| 2 |
|
||||
| 3 |
|
||||
+---+
|
||||
|
||||
-- expected: 1,2,3
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT y FROM (SELECT x AS y FROM (SELECT a AS x FROM t) sq1) sq2 ORDER BY y;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| MergeSort: sq2.y ASC NULLS LAST_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Sort: sq2.y ASC NULLS LAST_|
|
||||
|_|_Projection: sq2.y_|
|
||||
|_|_SubqueryAlias: sq2_|
|
||||
|_|_Projection: sq1.x AS y_|
|
||||
|_|_SubqueryAlias: sq1_|
|
||||
|_|_Projection: t.a AS x_|
|
||||
|_|_TableScan: t_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [y@0 ASC NULLS LAST]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT x, x + 1 AS y FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
+---+---+
|
||||
| x | y |
|
||||
+---+---+
|
||||
| 1 | 2 |
|
||||
| 2 | 3 |
|
||||
| 3 | 4 |
|
||||
+---+---+
|
||||
|
||||
-- expected:
|
||||
-- (x,y)
|
||||
-- (1,2)
|
||||
-- (2,3)
|
||||
-- (3,4)
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x, x + 1 AS y FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| MergeSort: sq.x ASC NULLS LAST_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Sort: sq.x ASC NULLS LAST_|
|
||||
|_|_Projection: sq.x, CAST(sq.x AS Int64) + Int64(1) AS y_|
|
||||
|_|_SubqueryAlias: sq_|
|
||||
|_|_Projection: t.a AS x_|
|
||||
|_|_TableScan: t_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT a FROM ((SELECT a FROM t1) UNION ALL (SELECT a FROM t2)) u ORDER BY a;
|
||||
|
||||
+---+
|
||||
| a |
|
||||
+---+
|
||||
| 1 |
|
||||
| 2 |
|
||||
| 3 |
|
||||
| 3 |
|
||||
+---+
|
||||
|
||||
-- expected: 1,2,3,3
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT a FROM ((SELECT a FROM t1) UNION ALL (SELECT a FROM t2)) u ORDER BY a;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| Sort: u.a ASC NULLS LAST_|
|
||||
|_|_SubqueryAlias: u_|
|
||||
|_|_Union_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Projection: t1.a_|
|
||||
|_|_TableScan: t1_|
|
||||
|_| ]]_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Projection: t2.a_|
|
||||
|_|_TableScan: t2_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST]_|
|
||||
|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
||||
|_|_InterleaveExec_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT u1.a
|
||||
FROM (SELECT a FROM t1) u1
|
||||
JOIN (SELECT a FROM t2) u2 ON u1.a = u2.a
|
||||
ORDER BY u1.a;
|
||||
|
||||
+---+
|
||||
| a |
|
||||
+---+
|
||||
| 3 |
|
||||
+---+
|
||||
|
||||
-- expected: 3
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT u1.a
|
||||
FROM (SELECT a FROM t1) u1
|
||||
JOIN (SELECT a FROM t2) u2 ON u1.a = u2.a
|
||||
ORDER BY u1.a;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| Sort: u1.a ASC NULLS LAST_|
|
||||
|_|_Projection: u1.a_|
|
||||
|_|_Inner Join: u1.a = u2.a_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| SubqueryAlias: u1_|
|
||||
|_|_Projection: t1.a_|
|
||||
|_|_TableScan: t1_|
|
||||
|_| ]]_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| SubqueryAlias: u2_|
|
||||
|_|_Projection: t2.a_|
|
||||
|_|_TableScan: t2_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST]_|
|
||||
|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
||||
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|
||||
|_|_REDACTED
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT x FROM (VALUES (2),(1)) v(x) ORDER BY x;
|
||||
|
||||
+---+
|
||||
| x |
|
||||
+---+
|
||||
| 1 |
|
||||
| 2 |
|
||||
+---+
|
||||
|
||||
-- expected: 1,2
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x FROM (VALUES (2),(1)) v(x) ORDER BY x;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| Sort: v.x ASC NULLS LAST_|
|
||||
|_|_SubqueryAlias: v_|
|
||||
|_|_Projection: column1 AS x_|
|
||||
|_|_Values: (Int64(2)), (Int64(1))_|
|
||||
| physical_plan | SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false] |
|
||||
|_|_ProjectionExec: expr=[column1@0 as x]_|
|
||||
|_|_DataSourceExec: partitions=1, partition_sizes=[1]_|
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x LIMIT 2;
|
||||
|
||||
+---+
|
||||
| x |
|
||||
+---+
|
||||
| 1 |
|
||||
| 2 |
|
||||
+---+
|
||||
|
||||
-- expected: 1,2
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x LIMIT 2;
|
||||
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| logical_plan_| Limit: skip=0, fetch=2_|
|
||||
|_|_MergeSort: sq.x ASC NULLS LAST_|
|
||||
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
||||
|_| Limit: skip=0, fetch=2_|
|
||||
|_|_Sort: sq.x ASC NULLS LAST_|
|
||||
|_|_Projection: sq.x_|
|
||||
|_|_SubqueryAlias: sq_|
|
||||
|_|_Projection: t.a AS x_|
|
||||
|_|_TableScan: t_|
|
||||
|_| ]]_|
|
||||
| physical_plan | SortPreservingMergeExec: [x@0 ASC NULLS LAST], fetch=2_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_SortExec: TopK(fetch=2), expr=[x@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
||||
|_|_CooperativeExec_|
|
||||
|_|_MergeScanExec: REDACTED
|
||||
|_|_|
|
||||
+-+-+
|
||||
|
||||
DROP TABLE t;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE t1;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE t2;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -37,3 +37,168 @@ EXPLAIN INSERT INTO other SELECT i, 2 FROM integers WHERE i=(SELECT MAX(i) FROM
|
||||
drop table other;
|
||||
|
||||
drop table integers;
|
||||
|
||||
CREATE TABLE integers(i INTEGER, j TIMESTAMP TIME INDEX)
|
||||
PARTITION ON COLUMNS (i) (
|
||||
i < 1000,
|
||||
i >= 1000 AND i < 2000,
|
||||
i >= 2000
|
||||
);
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1.i) ORDER BY i1.i;
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT * FROM integers i1 WHERE EXISTS(SELECT count(i) FROM integers WHERE i=i1.i) ORDER BY i1.i;
|
||||
|
||||
DROP TABLE integers;
|
||||
|
||||
CREATE TABLE t(ts timestamp time index, a INT, b INT)PARTITION ON COLUMNS (a) (
|
||||
a < 1000,
|
||||
a >= 1000 AND a < 2000,
|
||||
a >= 2000
|
||||
);
|
||||
|
||||
CREATE TABLE t1(ts timestamp time index, a INT)PARTITION ON COLUMNS (a) (
|
||||
a < 1000,
|
||||
a >= 1000 AND a < 2000,
|
||||
a >= 2000
|
||||
);
|
||||
|
||||
CREATE TABLE t2(ts timestamp time index, a INT)PARTITION ON COLUMNS (a) (
|
||||
a < 1000,
|
||||
a >= 1000 AND a < 2000,
|
||||
a >= 2000
|
||||
);
|
||||
|
||||
INSERT INTO t(ts,a,b) VALUES (1,3,30),(2,1,10),(3,2,20);
|
||||
|
||||
INSERT INTO t1(ts,a) VALUES (1,1),(2,3);
|
||||
|
||||
INSERT INTO t2(ts,a) VALUES (1,2),(2,3);
|
||||
|
||||
SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
-- expected: 1,2,3
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
SELECT x, COUNT(*) AS c FROM (SELECT a AS x FROM t) sq GROUP BY x ORDER BY x;
|
||||
-- expected:
|
||||
-- x | c
|
||||
-- 1 | 1
|
||||
-- 2 | 1
|
||||
-- 3 | 1
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x, COUNT(*) AS c FROM (SELECT a AS x FROM t) sq GROUP BY x ORDER BY x;
|
||||
|
||||
SELECT DISTINCT x FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
-- expecetd: 1,2,3
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT DISTINCT x FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
SELECT sq.x FROM (SELECT a AS x FROM t) sq ORDER BY sq.x;
|
||||
-- expected: 1,2,3
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT sq.x FROM (SELECT a AS x FROM t) sq ORDER BY sq.x;
|
||||
|
||||
SELECT y FROM (SELECT x AS y FROM (SELECT a AS x FROM t) sq1) sq2 ORDER BY y;
|
||||
-- expected: 1,2,3
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT y FROM (SELECT x AS y FROM (SELECT a AS x FROM t) sq1) sq2 ORDER BY y;
|
||||
|
||||
SELECT x, x + 1 AS y FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
-- expected:
|
||||
-- (x,y)
|
||||
-- (1,2)
|
||||
-- (2,3)
|
||||
-- (3,4)
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x, x + 1 AS y FROM (SELECT a AS x FROM t) sq ORDER BY x;
|
||||
|
||||
SELECT a FROM ((SELECT a FROM t1) UNION ALL (SELECT a FROM t2)) u ORDER BY a;
|
||||
-- expected: 1,2,3,3
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT a FROM ((SELECT a FROM t1) UNION ALL (SELECT a FROM t2)) u ORDER BY a;
|
||||
|
||||
SELECT u1.a
|
||||
FROM (SELECT a FROM t1) u1
|
||||
JOIN (SELECT a FROM t2) u2 ON u1.a = u2.a
|
||||
ORDER BY u1.a;
|
||||
-- expected: 3
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT u1.a
|
||||
FROM (SELECT a FROM t1) u1
|
||||
JOIN (SELECT a FROM t2) u2 ON u1.a = u2.a
|
||||
ORDER BY u1.a;
|
||||
|
||||
SELECT x FROM (VALUES (2),(1)) v(x) ORDER BY x;
|
||||
-- expected: 1,2
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x FROM (VALUES (2),(1)) v(x) ORDER BY x;
|
||||
|
||||
SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x LIMIT 2;
|
||||
-- expected: 1,2
|
||||
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
EXPLAIN SELECT x FROM (SELECT a AS x FROM t) sq ORDER BY x LIMIT 2;
|
||||
|
||||
DROP TABLE t;
|
||||
DROP TABLE t1;
|
||||
DROP TABLE t2;
|
||||
|
||||
Reference in New Issue
Block a user