feat: impl optimizer rule to handle last_value case (#4357)

* feat: impl optimizer rule to handle last_value case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/optimizer/scan_hint.rs

Co-authored-by: Jeremyhi <jiachun_feng@proton.me>

* split methods

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Ruihang Xia
2024-07-12 22:36:23 +08:00
committed by GitHub
parent 67dfdd6c61
commit 16075ada67
8 changed files with 430 additions and 173 deletions

View File

@@ -30,7 +30,7 @@ use datatypes::arrow::datatypes::SchemaRef;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector};
use table::table::scan::RegionScanExec;
use crate::error::{GetRegionMetadataSnafu, Result};
@@ -192,11 +192,20 @@ impl DummyTableProvider {
}
}
pub fn region_metadata(&self) -> RegionMetadataRef {
self.metadata.clone()
}
/// Sets the ordering hint of the query to the provider.
pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
}
/// Sets the time series selector hint of the query to the provider.
pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
self.scan_request.lock().unwrap().series_row_selector = Some(selector);
}
/// Gets the scan request of the provider.
#[cfg(test)]
pub fn scan_request(&self) -> ScanRequest {

View File

@@ -13,9 +13,9 @@
// limitations under the License.
pub mod count_wildcard;
pub mod order_hint;
pub mod parallelize_scan;
pub mod remove_duplicate;
pub mod scan_hint;
pub mod string_normalization;
#[cfg(test)]
pub(crate) mod test_util;

View File

@@ -1,168 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_schema::SortOptions;
use common_recordbatch::OrderOption;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::Result as DataFusionResult;
use datafusion_expr::expr::Sort;
use datafusion_expr::{Expr, LogicalPlan};
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use crate::dummy_catalog::DummyTableProvider;
/// This rule will pass the nearest order requirement to the leaf table
/// scan node as ordering hint.
pub struct OrderHintRule;
impl OptimizerRule for OrderHintRule {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> DataFusionResult<Option<LogicalPlan>> {
Self::optimize(plan).map(Some)
}
fn name(&self) -> &str {
"OrderHintRule"
}
}
impl OrderHintRule {
fn optimize(plan: &LogicalPlan) -> DataFusionResult<LogicalPlan> {
let mut visitor = OrderHintVisitor::default();
let _ = plan.visit(&mut visitor)?;
if let Some(order_expr) = visitor.order_expr.take() {
plan.clone()
.transform_down(&|plan| Self::set_ordering_hint(plan, &order_expr))
.map(|x| x.data)
} else {
Ok(plan.clone())
}
}
fn set_ordering_hint(
plan: LogicalPlan,
order_expr: &[Sort],
) -> DataFusionResult<Transformed<LogicalPlan>> {
match &plan {
LogicalPlan::TableScan(table_scan) => {
let mut transformed = false;
if let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
{
// The provider in the region server is [DummyTableProvider].
if let Some(adapter) = source
.table_provider
.as_any()
.downcast_ref::<DummyTableProvider>()
{
let mut opts = Vec::with_capacity(order_expr.len());
for sort in order_expr {
let name = match sort.expr.try_as_col() {
Some(col) => col.name.clone(),
None => return Ok(Transformed::no(plan)),
};
opts.push(OrderOption {
name,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
})
}
adapter.with_ordering_hint(&opts);
transformed = true;
}
}
if transformed {
Ok(Transformed::yes(plan))
} else {
Ok(Transformed::no(plan))
}
}
_ => Ok(Transformed::no(plan)),
}
}
}
/// Find the most closest order requirement to the leaf node.
#[derive(Default)]
struct OrderHintVisitor {
order_expr: Option<Vec<Sort>>,
}
impl TreeNodeVisitor<'_> for OrderHintVisitor {
type Node = LogicalPlan;
fn f_down(&mut self, node: &Self::Node) -> DataFusionResult<TreeNodeRecursion> {
if let LogicalPlan::Sort(sort) = node {
let mut exprs = vec![];
for expr in &sort.expr {
if let Expr::Sort(sort_expr) = expr {
exprs.push(sort_expr.clone());
}
}
self.order_expr = Some(exprs);
}
Ok(TreeNodeRecursion::Continue)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion_expr::{col, LogicalPlanBuilder};
use datafusion_optimizer::OptimizerContext;
use store_api::storage::RegionId;
use super::*;
use crate::optimizer::test_util::mock_table_provider;
#[test]
fn set_order_hint() {
let provider = Arc::new(mock_table_provider(RegionId::new(1, 1)));
let table_source = Arc::new(DefaultTableSource::new(provider.clone()));
let plan = LogicalPlanBuilder::scan("t", table_source, None)
.unwrap()
.sort(vec![col("ts").sort(true, false)])
.unwrap()
.sort(vec![col("ts").sort(false, true)])
.unwrap()
.build()
.unwrap();
let context = OptimizerContext::default();
OrderHintRule.try_optimize(&plan, &context).unwrap();
// should read the first (with `.sort(true, false)`) sort option
let scan_req = provider.scan_request();
assert_eq!(
OrderOption {
name: "ts".to_string(),
options: SortOptions {
descending: false,
nulls_first: false
}
},
scan_req.output_ordering.as_ref().unwrap()[0]
);
}
}

View File

@@ -0,0 +1,354 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::SemanticType;
use arrow_schema::SortOptions;
use common_recordbatch::OrderOption;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{Column, Result as DataFusionResult};
use datafusion_expr::expr::Sort;
use datafusion_expr::{utils, Expr, LogicalPlan};
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use store_api::storage::TimeSeriesRowSelector;
use crate::dummy_catalog::DummyTableProvider;
/// This rule will traverse the plan to collect necessary hints for leaf
/// table scan node and set them in [`ScanRequest`]. Hints include:
/// - the nearest order requirement to the leaf table scan node as ordering hint.
/// - the group by columns when all aggregate functions are `last_value` as
/// time series row selector hint.
///
/// [`ScanRequest`]: store_api::storage::ScanRequest
pub struct ScanHintRule;
impl OptimizerRule for ScanHintRule {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> DataFusionResult<Option<LogicalPlan>> {
Self::optimize(plan).map(Some)
}
fn name(&self) -> &str {
"ScanHintRule"
}
}
impl ScanHintRule {
fn optimize(plan: &LogicalPlan) -> DataFusionResult<LogicalPlan> {
let mut visitor = ScanHintVisitor::default();
let _ = plan.visit(&mut visitor)?;
if visitor.need_rewrite() {
plan.clone()
.transform_down(&|plan| Self::set_hints(plan, &visitor))
.map(|x| x.data)
} else {
Ok(plan.clone())
}
}
fn set_hints(
plan: LogicalPlan,
visitor: &ScanHintVisitor,
) -> DataFusionResult<Transformed<LogicalPlan>> {
match &plan {
LogicalPlan::TableScan(table_scan) => {
let mut transformed = false;
if let Some(source) = table_scan
.source
.as_any()
.downcast_ref::<DefaultTableSource>()
{
// The provider in the region server is [DummyTableProvider].
if let Some(adapter) = source
.table_provider
.as_any()
.downcast_ref::<DummyTableProvider>()
{
// set order_hint
if let Some(order_expr) = &visitor.order_expr {
Self::set_order_hint(adapter, order_expr);
}
// set time series selector hint
if let Some((group_by_cols, order_by_col)) = &visitor.ts_row_selector {
Self::set_time_series_row_selector_hint(
adapter,
group_by_cols,
order_by_col,
);
}
transformed = true;
}
}
if transformed {
Ok(Transformed::yes(plan))
} else {
Ok(Transformed::no(plan))
}
}
_ => Ok(Transformed::no(plan)),
}
}
fn set_order_hint(adapter: &DummyTableProvider, order_expr: &Vec<Sort>) {
let mut opts = Vec::with_capacity(order_expr.len());
for sort in order_expr {
let name = match sort.expr.try_as_col() {
Some(col) => col.name.clone(),
None => return,
};
opts.push(OrderOption {
name,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
adapter.with_ordering_hint(&opts);
}
fn set_time_series_row_selector_hint(
adapter: &DummyTableProvider,
group_by_cols: &HashSet<Column>,
order_by_col: &Column,
) {
let region_metadata = adapter.region_metadata();
let mut should_set_selector_hint = true;
// check if order_by column is time index
if let Some(column_metadata) = region_metadata.column_by_name(&order_by_col.name) {
if column_metadata.semantic_type != SemanticType::Timestamp {
should_set_selector_hint = false;
}
} else {
should_set_selector_hint = false;
}
// check if all group_by columns are primary key
for col in group_by_cols {
let Some(column_metadata) = region_metadata.column_by_name(&col.name) else {
should_set_selector_hint = false;
break;
};
if column_metadata.semantic_type != SemanticType::Tag {
should_set_selector_hint = false;
break;
}
}
if should_set_selector_hint {
adapter.with_time_series_selector_hint(TimeSeriesRowSelector::LastRow);
}
}
}
/// Traverse and fetch hints.
#[derive(Default)]
struct ScanHintVisitor {
/// The closest order requirement to the leaf node.
order_expr: Option<Vec<Sort>>,
/// Row selection on time series distribution.
/// This field stores saved `group_by` columns when all aggregate functions are `last_value`
/// and the `order_by` column which should be time index.
ts_row_selector: Option<(HashSet<Column>, Column)>,
}
impl TreeNodeVisitor<'_> for ScanHintVisitor {
type Node = LogicalPlan;
fn f_down(&mut self, node: &Self::Node) -> DataFusionResult<TreeNodeRecursion> {
// Get order requirement from sort plan
if let LogicalPlan::Sort(sort) = node {
let mut exprs = vec![];
for expr in &sort.expr {
if let Expr::Sort(sort_expr) = expr {
exprs.push(sort_expr.clone());
}
}
self.order_expr = Some(exprs);
}
// Get time series row selector from aggr plan
if let LogicalPlan::Aggregate(aggregate) = node {
let mut is_all_last_value = !aggregate.aggr_expr.is_empty();
let mut order_by_expr = None;
for expr in &aggregate.aggr_expr {
// check function name
let Expr::AggregateFunction(func) = expr else {
is_all_last_value = false;
break;
};
if func.func_def.name() != "last_value" || func.filter.is_some() || func.distinct {
is_all_last_value = false;
break;
}
// check order by requirement
if let Some(order_by) = &func.order_by
&& let Some(first_order_by) = order_by.first()
&& order_by.len() == 1
{
if let Some(existing_order_by) = &order_by_expr {
if existing_order_by != first_order_by {
is_all_last_value = false;
break;
}
} else if let Expr::Sort(sort_expr) = first_order_by {
// only allow `order by xxx [DESC]`, xxx is a bare column reference
if sort_expr.asc || !matches!(&*sort_expr.expr, Expr::Column(_)) {
is_all_last_value = false;
break;
}
order_by_expr = Some(first_order_by.clone());
}
}
}
is_all_last_value &= order_by_expr.is_some();
if is_all_last_value {
// make sure all the exprs are DIRECT `col` and collect them
let mut group_by_cols = HashSet::with_capacity(aggregate.group_expr.len());
for expr in &aggregate.group_expr {
if let Expr::Column(col) = expr {
group_by_cols.insert(col.clone());
} else {
is_all_last_value = false;
break;
}
}
// Safety: checked in the above loop
let Expr::Sort(Sort {
expr: order_by_col, ..
}) = order_by_expr.unwrap()
else {
unreachable!()
};
let Expr::Column(order_by_col) = *order_by_col else {
unreachable!()
};
if is_all_last_value {
self.ts_row_selector = Some((group_by_cols, order_by_col));
}
}
}
if self.ts_row_selector.is_some()
&& (matches!(node, LogicalPlan::Subquery(_)) || node.inputs().len() > 1)
{
// clean previous time series selector hint when encounter subqueries or join
self.ts_row_selector = None;
}
if let LogicalPlan::Filter(filter) = node
&& let Some(group_by_exprs) = &self.ts_row_selector
{
let mut filter_referenced_cols = HashSet::default();
utils::expr_to_columns(&filter.predicate, &mut filter_referenced_cols)?;
// ensure only group_by columns are used in filter
if !filter_referenced_cols.is_subset(&group_by_exprs.0) {
self.ts_row_selector = None;
}
}
Ok(TreeNodeRecursion::Continue)
}
}
impl ScanHintVisitor {
fn need_rewrite(&self) -> bool {
self.order_expr.is_some() || self.ts_row_selector.is_some()
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion_expr::expr::{AggregateFunction, AggregateFunctionDefinition};
use datafusion_expr::{col, LogicalPlanBuilder};
use datafusion_optimizer::OptimizerContext;
use datafusion_physical_expr::expressions::LastValue;
use store_api::storage::RegionId;
use super::*;
use crate::optimizer::test_util::mock_table_provider;
#[test]
fn set_order_hint() {
let provider = Arc::new(mock_table_provider(RegionId::new(1, 1)));
let table_source = Arc::new(DefaultTableSource::new(provider.clone()));
let plan = LogicalPlanBuilder::scan("t", table_source, None)
.unwrap()
.sort(vec![col("ts").sort(true, false)])
.unwrap()
.sort(vec![col("ts").sort(false, true)])
.unwrap()
.build()
.unwrap();
let context = OptimizerContext::default();
ScanHintRule.try_optimize(&plan, &context).unwrap();
// should read the first (with `.sort(true, false)`) sort option
let scan_req = provider.scan_request();
assert_eq!(
OrderOption {
name: "ts".to_string(),
options: SortOptions {
descending: false,
nulls_first: false
}
},
scan_req.output_ordering.as_ref().unwrap()[0]
);
}
#[test]
fn set_time_series_row_selector_hint() {
let provider = Arc::new(mock_table_provider(RegionId::new(1, 1)));
let table_source = Arc::new(DefaultTableSource::new(provider.clone()));
let plan = LogicalPlanBuilder::scan("t", table_source, None)
.unwrap()
.aggregate(
vec![col("k0")],
vec![Expr::AggregateFunction(AggregateFunction {
func_def: AggregateFunctionDefinition::UDF(Arc::new(LastValue::new().into())),
args: vec![col("v0")],
distinct: false,
filter: None,
order_by: Some(vec![Expr::Sort(Sort {
expr: Box::new(col("ts")),
asc: false,
nulls_first: true,
})]),
null_treatment: None,
})],
)
.unwrap()
.build()
.unwrap();
let context = OptimizerContext::default();
ScanHintRule.try_optimize(&plan, &context).unwrap();
let scan_req = provider.scan_request();
let _ = scan_req.series_row_selector.unwrap();
}
}

View File

@@ -42,9 +42,9 @@ use table::TableRef;
use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer};
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
use crate::optimizer::order_hint::OrderHintRule;
use crate::optimizer::parallelize_scan::ParallelizeScan;
use crate::optimizer::remove_duplicate::RemoveDuplicate;
use crate::optimizer::scan_hint::ScanHintRule;
use crate::optimizer::string_normalization::StringNormalizationRule;
use crate::optimizer::type_conversion::TypeConversionRule;
use crate::optimizer::ExtensionAnalyzerRule;
@@ -109,7 +109,7 @@ impl QueryEngineState {
}
let mut optimizer = Optimizer::new();
optimizer.rules.push(Arc::new(OrderHintRule));
optimizer.rules.push(Arc::new(ScanHintRule));
// add physical optimizer
let mut physical_optimizer = PhysicalOptimizer::new();

View File

@@ -0,0 +1,34 @@
create table t (
ts timestamp time index,
host string primary key,
not_pk string,
val double,
);
Affected Rows: 0
insert into t values
(0, 'a', '🌕', 1.0),
(1, 'b', '🌖', 2.0),
(2, 'a', '🌗', 3.0),
(3, 'c', '🌘', 4.0),
(4, 'a', '🌑', 5.0),
(5, 'b', '🌒', 6.0),
(6, 'a', '🌓', 7.0),
(7, 'c', '🌔', 8.0),
(8, 'd', '🌕', 9.0);
Affected Rows: 9
-- Wait for #4354
-- explain analyze
-- select
-- last_value(host order by ts),
-- last_value(not_pk order by ts),
-- last_value(val order by ts)
-- from t
-- group by host;
drop table t;
Affected Rows: 0

View File

@@ -0,0 +1,28 @@
create table t (
ts timestamp time index,
host string primary key,
not_pk string,
val double,
);
insert into t values
(0, 'a', '🌕', 1.0),
(1, 'b', '🌖', 2.0),
(2, 'a', '🌗', 3.0),
(3, 'c', '🌘', 4.0),
(4, 'a', '🌑', 5.0),
(5, 'b', '🌒', 6.0),
(6, 'a', '🌓', 7.0),
(7, 'c', '🌔', 8.0),
(8, 'd', '🌕', 9.0);
-- Wait for #4354
-- explain analyze
-- select
-- last_value(host order by ts),
-- last_value(not_pk order by ts),
-- last_value(val order by ts)
-- from t
-- group by host;
drop table t;

View File

@@ -125,7 +125,7 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test;
| logical_plan after common_sub_expression_eliminate_| SAME TEXT AS ABOVE_|
| logical_plan after eliminate_group_by_constant_| SAME TEXT AS ABOVE_|
| logical_plan after optimize_projections_| SAME TEXT AS ABOVE_|
| logical_plan after OrderHintRule_| SAME TEXT AS ABOVE_|
| logical_plan after ScanHintRule_| SAME TEXT AS ABOVE_|
| logical_plan_| PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_|
|_|_PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false]_|
|_|_PromSeriesDivide: tags=["k"]_|