mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
Compare commits
3 Commits
bytes_trac
...
fix/empty_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2f8e8be042 | ||
|
|
9d30459a58 | ||
|
|
f1650a78f7 |
@@ -23,6 +23,7 @@ use common_error::ext::ErrorExt;
|
|||||||
use common_error::status_code::StatusCode;
|
use common_error::status_code::StatusCode;
|
||||||
use common_function::function::FunctionContext;
|
use common_function::function::FunctionContext;
|
||||||
use common_query::prelude::GREPTIME_VALUE;
|
use common_query::prelude::GREPTIME_VALUE;
|
||||||
|
use common_telemetry::debug;
|
||||||
use datafusion::common::DFSchemaRef;
|
use datafusion::common::DFSchemaRef;
|
||||||
use datafusion::datasource::DefaultTableSource;
|
use datafusion::datasource::DefaultTableSource;
|
||||||
use datafusion::functions_aggregate::average::avg_udaf;
|
use datafusion::functions_aggregate::average::avg_udaf;
|
||||||
@@ -661,10 +662,30 @@ impl PromPlanner {
|
|||||||
}
|
}
|
||||||
Ok(binary_expr)
|
Ok(binary_expr)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
fn optimize(plan: &LogicalPlan) -> LogicalPlan {
|
||||||
|
use datafusion_optimizer::OptimizerRule;
|
||||||
|
let new_plan =
|
||||||
|
datafusion::optimizer::optimize_projections::OptimizeProjections::new()
|
||||||
|
.rewrite(
|
||||||
|
plan.clone(),
|
||||||
|
&datafusion::optimizer::OptimizerContext::default(),
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.data;
|
||||||
|
if new_plan != *plan {
|
||||||
|
debug!(
|
||||||
|
"Optimized projection plan: {new_plan:#?}\n From old plan: {plan:#?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
new_plan
|
||||||
|
}
|
||||||
|
|
||||||
if is_comparison_op && !should_return_bool {
|
if is_comparison_op && !should_return_bool {
|
||||||
self.filter_on_field_column(join_plan, bin_expr_builder)
|
self.filter_on_field_column(join_plan, bin_expr_builder)
|
||||||
} else {
|
} else {
|
||||||
self.projection_for_each_field_column(join_plan, bin_expr_builder)
|
self.projection_for_each_field_column(join_plan, bin_expr_builder)
|
||||||
|
.map(|p| optimize(&p))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -3289,6 +3310,8 @@ mod test {
|
|||||||
use common_base::Plugins;
|
use common_base::Plugins;
|
||||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||||
use common_query::test_util::DummyDecoder;
|
use common_query::test_util::DummyDecoder;
|
||||||
|
use datafusion::functions_aggregate::count::count;
|
||||||
|
use datafusion_optimizer::OptimizerContext;
|
||||||
use datatypes::prelude::ConcreteDataType;
|
use datatypes::prelude::ConcreteDataType;
|
||||||
use datatypes::schema::{ColumnSchema, Schema};
|
use datatypes::schema::{ColumnSchema, Schema};
|
||||||
use promql_parser::label::Labels;
|
use promql_parser::label::Labels;
|
||||||
@@ -4909,6 +4932,132 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
|
|||||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_nested_aggr_not_exists_table_label() {
|
||||||
|
let mut eval_stmt = EvalStmt {
|
||||||
|
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
|
||||||
|
start: UNIX_EPOCH,
|
||||||
|
end: UNIX_EPOCH
|
||||||
|
.checked_add(Duration::from_secs(100_000))
|
||||||
|
.unwrap(),
|
||||||
|
interval: Duration::from_secs(5),
|
||||||
|
lookback_delta: Duration::from_secs(1),
|
||||||
|
};
|
||||||
|
let case = r#"count(count(node_cpu_seconds_total)) / node_load5"#;
|
||||||
|
|
||||||
|
let prom_expr = parser::parse(case).unwrap();
|
||||||
|
eval_stmt.expr = prom_expr;
|
||||||
|
let table_provider = build_test_table_provider_with_fields(
|
||||||
|
&[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
|
||||||
|
&["job"],
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let plan =
|
||||||
|
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let expected = r#"Projection: lhs.time, lhs.count(count(.value)) / rhs.value AS lhs.count(count(.value)) / rhs.value [time:Timestamp(Millisecond, None), lhs.count(count(.value)) / rhs.value:Float64;N]
|
||||||
|
Inner Join: lhs.time = rhs.time [time:Timestamp(Millisecond, None), count(.value):Int64, count(count(.value)):Int64, time:Timestamp(Millisecond, None), value:Float64;N]
|
||||||
|
SubqueryAlias: lhs [time:Timestamp(Millisecond, None), count(.value):Int64, count(count(.value)):Int64]
|
||||||
|
Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), count(.value):Int64, count(count(.value)):Int64]
|
||||||
|
Aggregate: groupBy=[[.time, count(.value)]], aggr=[[count(count(.value))]] [time:Timestamp(Millisecond, None), count(.value):Int64, count(count(.value)):Int64]
|
||||||
|
Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), count(.value):Int64]
|
||||||
|
Aggregate: groupBy=[[.time]], aggr=[[count(.value)]] [time:Timestamp(Millisecond, None), count(.value):Int64]
|
||||||
|
EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
|
||||||
|
SubqueryAlias: rhs [time:Timestamp(Millisecond, None), value:Float64;N]
|
||||||
|
EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]"#;
|
||||||
|
|
||||||
|
let rhs = LogicalPlanBuilder::from(LogicalPlan::Extension(Extension {
|
||||||
|
node: Arc::new(
|
||||||
|
EmptyMetric::new(
|
||||||
|
0,
|
||||||
|
-1,
|
||||||
|
5000,
|
||||||
|
"time".to_string(),
|
||||||
|
"value".to_string(),
|
||||||
|
Some(lit(0.0f64)),
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
|
),
|
||||||
|
}))
|
||||||
|
.alias("rhs")
|
||||||
|
.unwrap()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let full = LogicalPlanBuilder::from(LogicalPlan::Extension(Extension {
|
||||||
|
node: Arc::new(
|
||||||
|
EmptyMetric::new(
|
||||||
|
0,
|
||||||
|
-1,
|
||||||
|
5000,
|
||||||
|
"time".to_string(),
|
||||||
|
"value".to_string(),
|
||||||
|
Some(lit(0.0f64)),
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
|
),
|
||||||
|
}))
|
||||||
|
.aggregate(
|
||||||
|
vec![col(Column::new(Some(""), "time"))],
|
||||||
|
vec![count(col(Column::new(Some(""), "value")))],
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.sort(vec![SortExpr::new(
|
||||||
|
col(Column::new(Some(""), "time")),
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
)])
|
||||||
|
.unwrap()
|
||||||
|
.aggregate(
|
||||||
|
vec![col(Column::new(Some(""), "time"))],
|
||||||
|
vec![count(col("count(.value)"))],
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.sort(vec![SortExpr::new(
|
||||||
|
col(Column::new(Some(""), "time")),
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
)])
|
||||||
|
.unwrap()
|
||||||
|
.alias("lhs")
|
||||||
|
.unwrap()
|
||||||
|
.project(vec![
|
||||||
|
col("lhs.time"),
|
||||||
|
col(Column::new(Some("lhs"), "count(count(.value))")),
|
||||||
|
])
|
||||||
|
.unwrap()
|
||||||
|
.join(
|
||||||
|
rhs,
|
||||||
|
JoinType::Inner,
|
||||||
|
(
|
||||||
|
vec![Column::new(Some("lhs"), "time")],
|
||||||
|
vec![Column::new(Some("rhs"), "time")],
|
||||||
|
),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
dbg!(&full);
|
||||||
|
{
|
||||||
|
let optimizer = datafusion_optimizer::Optimizer::new();
|
||||||
|
let optimized_full_plan = optimizer
|
||||||
|
.optimize(full, &OptimizerContext::default(), |_, _| {})
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
|
||||||
|
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||||
|
let optimizer = datafusion_optimizer::Optimizer::new();
|
||||||
|
let optimized_plan = optimizer
|
||||||
|
.optimize(plan, &OptimizerContext::default(), |_, _| {})
|
||||||
|
.unwrap();
|
||||||
|
println!("{}", optimized_plan.display_indent_schema().to_string());
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_histogram_quantile_missing_le_column() {
|
async fn test_histogram_quantile_missing_le_column() {
|
||||||
let mut eval_stmt = EvalStmt {
|
let mut eval_stmt = EvalStmt {
|
||||||
|
|||||||
Reference in New Issue
Block a user