fix: window sort support alias time index (#5543)

* fix: use alias expr to check commutativity

* chore: debug sort

* feat: consider alias in window sort optimizer

* test: sqlness test

* test: update sqlness result
This commit is contained in:
Yingwen
2025-02-18 18:35:43 +08:00
committed by GitHub
parent 4ef038d098
commit 77223a0f3e
7 changed files with 240 additions and 15 deletions

View File

@@ -158,8 +158,9 @@ impl Categorizer {
| Expr::ScalarSubquery(_)
| Expr::Wildcard { .. } => Commutativity::Unimplemented,
Expr::Alias(_)
| Expr::Unnest(_)
Expr::Alias(alias) => Self::check_expr(&alias.expr),
Expr::Unnest(_)
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,

View File

@@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -88,7 +90,7 @@ impl WindowedSortPhysicalRule {
.expr
.as_any()
.downcast_ref::<PhysicalColumn>()
&& column_expr.name() == scanner_info.time_index
&& scanner_info.time_index.contains(column_expr.name())
{
} else {
return Ok(Transformed::no(plan));
@@ -148,13 +150,13 @@ impl WindowedSortPhysicalRule {
#[derive(Debug)]
struct ScannerInfo {
partition_ranges: Vec<Vec<PartitionRange>>,
time_index: String,
time_index: HashSet<String>,
tag_columns: Vec<String>,
}
fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
let mut partition_ranges = None;
let mut time_index = None;
let mut time_index = HashSet::new();
let mut tag_columns = None;
let mut is_batch_coalesced = false;
@@ -172,9 +174,21 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
is_batch_coalesced = true;
}
// Collects alias of the time index column.
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
for (expr, output_name) in projection.expr() {
if let Some(column_expr) = expr.as_any().downcast_ref::<PhysicalColumn>() {
if time_index.contains(column_expr.name()) {
time_index.insert(output_name.clone());
}
}
}
}
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = Some(region_scan_exec.time_index());
// Reset time index column.
time_index = HashSet::from([region_scan_exec.time_index()]);
tag_columns = Some(region_scan_exec.tag_columns());
// set distinguish_partition_ranges to true, this is an incorrect workaround
@@ -189,7 +203,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
let result = try {
ScannerInfo {
partition_ranges: partition_ranges?,
time_index: time_index?,
time_index,
tag_columns: tag_columns?,
}
};