From f694f0e6f9ac333950681768d3828da48099acd6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 1 Jun 2026 22:49:22 +0800 Subject: [PATCH] basic impl Signed-off-by: Ruihang Xia --- src/query/src/promql/planner.rs | 745 +++++++++++++++++- .../common/promql/set_operation.result | 32 +- .../promql/tsid_binary_join_regression.result | 52 ++ .../promql/tsid_binary_join_regression.sql | 17 + 4 files changed, 823 insertions(+), 23 deletions(-) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 0dacc136e8..8394de1aac 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashSet, VecDeque}; +use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::UNIX_EPOCH; @@ -161,6 +161,130 @@ struct PromPlannerContext { range: Option, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct VectorLeafKey { + metric_name: String, + matchers: Vec<(String, String, String)>, + or_matchers: Vec>, + offset_ms: i128, + at: String, +} + +#[derive(Debug, Clone)] +struct IslandLeaf { + selector: VectorSelector, + display_table: String, +} + +#[derive(Debug, Clone)] +enum IslandExpr { + VectorLeaf(usize), + Scalar(DfExpr), + Unary { + input: Box, + }, + Binary { + op: TokenType, + lhs: Box, + rhs: Box, + }, +} + +#[derive(Debug, Default)] +struct IslandCollectEnv { + leaf_by_key: HashMap, + leaves: Vec, + vector_occurrences: usize, +} + +#[derive(Debug)] +struct PlannedIslandLeaf { + plan: LogicalPlan, + ctx: PromPlannerContext, + alias: TableReference, + display_table: String, +} + +#[derive(Debug)] +struct IslandFieldExprs { + exprs: Vec, + names: Vec, + scalar: bool, +} + +impl VectorLeafKey { + fn from_selector(selector: &VectorSelector) -> Option { + let mut metric_name = selector.name.clone(); + let mut matchers = Vec::with_capacity(selector.matchers.matchers.len()); + + for matcher in &selector.matchers.matchers { + if matcher.name == METRIC_NAME { + if matcher.op != MatchOp::Equal || metric_name.is_some() { + return None; + } + metric_name = Some(matcher.value.clone()); + } else { + matchers.push(Self::matcher_key(matcher)); + } + } + matchers.sort(); + + let mut or_matchers = selector + .matchers + .or_matchers + .iter() + .map(|group| { + let mut group = group.iter().map(Self::matcher_key).collect::>(); + group.sort(); + group + }) + .collect::>(); + or_matchers.sort(); + + Some(Self { + metric_name: metric_name?, + matchers, + or_matchers, + offset_ms: Self::offset_ms(&selector.offset), + at: format!("{:?}", selector.at), + }) + } + + fn matcher_key(matcher: &Matcher) -> (String, String, String) { + ( + matcher.name.clone(), + matcher.op.to_string(), + matcher.value.clone(), + ) + } + + fn offset_ms(offset: &Option) -> i128 { + match offset { + Some(Offset::Pos(duration)) => duration.as_millis() as i128, + Some(Offset::Neg(duration)) => -(duration.as_millis() as i128), + None => 0, + } + } +} + +impl IslandCollectEnv { + fn intern_leaf(&mut self, selector: &VectorSelector) -> Option { + self.vector_occurrences += 1; + let key = VectorLeafKey::from_selector(selector)?; + if let Some(id) = self.leaf_by_key.get(&key) { + return Some(*id); + } + + let id = self.leaves.len(); + self.leaves.push(IslandLeaf { + selector: selector.clone(), + display_table: key.metric_name.clone(), + }); + self.leaf_by_key.insert(key, id); + Some(id) + } +} + impl PromPlannerContext { fn from_eval_stmt(stmt: &EvalStmt) -> Self { Self { @@ -607,11 +731,380 @@ impl PromPlanner { }) } + async fn try_plan_binary_island( + &mut self, + binary_expr: &PromBinaryExpr, + ) -> Result> { + let original_ctx = self.ctx.clone(); + let mut collect_env = IslandCollectEnv::default(); + let Some(island_expr) = Self::collect_binary_island_expr( + &PromExpr::Binary(binary_expr.clone()), + &mut collect_env, + ) else { + return Ok(None); + }; + + if collect_env.leaves.is_empty() + || collect_env.vector_occurrences <= collect_env.leaves.len() + { + return Ok(None); + } + + let mut planned_leaves = Vec::with_capacity(collect_env.leaves.len()); + for (idx, leaf) in collect_env.leaves.iter().enumerate() { + let plan = self + .prom_vector_selector_to_plan(&leaf.selector, false) + .await?; + let ctx = self.ctx.clone(); + let alias = TableReference::bare(format!("prom_v{idx}")); + let plan = LogicalPlanBuilder::from(plan) + .alias(alias.clone()) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + planned_leaves.push(PlannedIslandLeaf { + plan, + ctx, + alias, + display_table: leaf.display_table.clone(), + }); + } + + if !Self::binary_island_join_contexts_supported(&planned_leaves) { + self.ctx = original_ctx; + return Ok(None); + } + + let mut input = planned_leaves[0].plan.clone(); + for right_idx in 1..planned_leaves.len() { + input = self.join_binary_island_leaf( + input, + &planned_leaves[0], + &planned_leaves[right_idx], + )?; + } + + let field_exprs = + Self::build_binary_island_field_exprs(&island_expr, &planned_leaves, input.schema())?; + if field_exprs.scalar || field_exprs.exprs.is_empty() { + self.ctx = original_ctx; + return Ok(None); + } + + let plan = self.project_binary_island( + input, + &planned_leaves[0].alias, + &planned_leaves[0].ctx, + field_exprs, + )?; + Ok(Some(plan)) + } + + fn collect_binary_island_expr( + expr: &PromExpr, + env: &mut IslandCollectEnv, + ) -> Option { + if let Some(expr) = Self::try_build_literal_expr(expr) { + return Some(IslandExpr::Scalar(expr)); + } + + match expr { + PromExpr::Paren(ParenExpr { expr }) => Self::collect_binary_island_expr(expr, env), + PromExpr::VectorSelector(selector) => { + let leaf = env.intern_leaf(selector)?; + Some(IslandExpr::VectorLeaf(leaf)) + } + PromExpr::Unary(UnaryExpr { expr }) => { + let input = Self::collect_binary_island_expr(expr, env)?; + Some(IslandExpr::Unary { + input: Box::new(input), + }) + } + PromExpr::Binary(PromBinaryExpr { + lhs, + rhs, + op, + modifier, + }) if Self::is_safe_binary_island_op(*op) + && Self::is_safe_binary_island_modifier(modifier) => + { + let lhs = Self::collect_binary_island_expr(lhs, env)?; + let rhs = Self::collect_binary_island_expr(rhs, env)?; + Some(IslandExpr::Binary { + op: *op, + lhs: Box::new(lhs), + rhs: Box::new(rhs), + }) + } + _ => None, + } + } + + fn is_safe_binary_island_op(token: TokenType) -> bool { + matches!( + token.id(), + token::T_ADD + | token::T_SUB + | token::T_MUL + | token::T_DIV + | token::T_MOD + | token::T_POW + | token::T_ATAN2 + ) + } + + fn is_safe_binary_island_modifier(modifier: &Option) -> bool { + modifier.as_ref().is_none_or(|modifier| { + !modifier.return_bool + && modifier.matching.is_none() + && matches!(modifier.card, VectorMatchCardinality::OneToOne) + }) + } + + fn binary_island_join_contexts_supported(leaves: &[PlannedIslandLeaf]) -> bool { + if leaves + .iter() + .any(|leaf| leaf.ctx.time_index_column.is_none()) + { + return false; + } + + if leaves.len() <= 1 { + return true; + } + + let first_tags = leaves[0].ctx.tag_columns.iter().collect::>(); + + leaves.iter().skip(1).all(|leaf| { + (Self::plan_has_tsid_column(&leaves[0].plan) && Self::plan_has_tsid_column(&leaf.plan)) + || leaf.ctx.tag_columns.iter().collect::>() == first_tags + }) + } + + fn join_binary_island_leaf( + &self, + left: LogicalPlan, + first_leaf: &PlannedIslandLeaf, + right_leaf: &PlannedIslandLeaf, + ) -> Result { + let only_join_time_index = + first_leaf.ctx.tag_columns.is_empty() || right_leaf.ctx.tag_columns.is_empty(); + let (mut left_keys, mut right_keys) = Self::binary_join_key_columns_with_context( + &left, + &right_leaf.plan, + &first_leaf.ctx, + &right_leaf.ctx, + only_join_time_index, + &None, + ); + + if let (Some(left_time_index_column), Some(right_time_index_column)) = ( + first_leaf.ctx.time_index_column.clone(), + right_leaf.ctx.time_index_column.clone(), + ) { + left_keys.insert(left_time_index_column); + right_keys.insert(right_time_index_column); + } + + LogicalPlanBuilder::from(left) + .join_detailed( + right_leaf.plan.clone(), + JoinType::Inner, + ( + left_keys + .into_iter() + .map(|name| Column::new(Some(first_leaf.alias.clone()), name)) + .collect::>(), + right_keys + .into_iter() + .map(|name| Column::new(Some(right_leaf.alias.clone()), name)) + .collect::>(), + ), + None, + NullEquality::NullEqualsNull, + ) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } + + fn build_binary_island_field_exprs( + expr: &IslandExpr, + leaves: &[PlannedIslandLeaf], + schema: &DFSchemaRef, + ) -> Result { + match expr { + IslandExpr::VectorLeaf(id) => { + let leaf = &leaves[*id]; + let exprs = leaf + .ctx + .field_columns + .iter() + .map(|field| { + schema + .qualified_field_with_name(Some(&leaf.alias), field) + .context(DataFusionPlanningSnafu) + .map(|field| DfExpr::Column(field.into())) + }) + .collect::>>()?; + let names = leaf + .ctx + .field_columns + .iter() + .map(|field| format!("{}.{}", leaf.display_table, field)) + .collect(); + Ok(IslandFieldExprs { + exprs, + names, + scalar: false, + }) + } + IslandExpr::Scalar(expr) => Ok(IslandFieldExprs { + exprs: vec![expr.clone()], + names: vec![expr.schema_name().to_string()], + scalar: true, + }), + IslandExpr::Unary { input } => { + let input = Self::build_binary_island_field_exprs(input, leaves, schema)?; + let mut exprs = Vec::with_capacity(input.exprs.len()); + let mut names = Vec::with_capacity(input.names.len()); + for (expr, name) in input.exprs.into_iter().zip(input.names) { + exprs.push(DfExpr::Negative(Box::new(expr))); + names.push(format!("-{name}")); + } + Ok(IslandFieldExprs { + exprs, + names, + scalar: input.scalar, + }) + } + IslandExpr::Binary { op, lhs, rhs } => { + let same_leaf = match (&**lhs, &**rhs) { + (IslandExpr::VectorLeaf(left), IslandExpr::VectorLeaf(right)) + if left == right => + { + Some(*left) + } + _ => None, + }; + let lhs = Self::build_binary_island_field_exprs(lhs, leaves, schema)?; + let rhs = Self::build_binary_island_field_exprs(rhs, leaves, schema)?; + let expr_builder = Self::prom_token_to_binary_expr_builder(*op)?; + let scalar = lhs.scalar && rhs.scalar; + let op = op.to_string(); + + let (exprs, names) = match (lhs.scalar, rhs.scalar) { + (true, true) => { + let expr = expr_builder(lhs.exprs[0].clone(), rhs.exprs[0].clone())?; + let name = format!("{} {op} {}", lhs.names[0], rhs.names[0]); + (vec![expr], vec![name]) + } + (true, false) => { + let mut exprs = Vec::with_capacity(rhs.exprs.len()); + let mut names = Vec::with_capacity(rhs.names.len()); + for (rhs_expr, rhs_name) in rhs.exprs.into_iter().zip(rhs.names) { + exprs.push(expr_builder(lhs.exprs[0].clone(), rhs_expr)?); + names.push(format!("{} {op} {rhs_name}", lhs.names[0])); + } + (exprs, names) + } + (false, true) => { + let mut exprs = Vec::with_capacity(lhs.exprs.len()); + let mut names = Vec::with_capacity(lhs.names.len()); + for (lhs_expr, lhs_name) in lhs.exprs.into_iter().zip(lhs.names) { + exprs.push(expr_builder(lhs_expr, rhs.exprs[0].clone())?); + names.push(format!("{lhs_name} {op} {}", rhs.names[0])); + } + (exprs, names) + } + (false, false) => { + let mut exprs = Vec::new(); + let mut names = Vec::new(); + for (idx, ((lhs_expr, rhs_expr), (mut lhs_name, mut rhs_name))) in lhs + .exprs + .into_iter() + .zip(rhs.exprs) + .zip(lhs.names.into_iter().zip(rhs.names)) + .enumerate() + { + if let Some(leaf) = same_leaf { + let field = leaves[leaf] + .ctx + .field_columns + .get(idx) + .cloned() + .unwrap_or_else(|| lhs_name.clone()); + lhs_name = format!("lhs.{field}"); + rhs_name = format!("rhs.{field}"); + } + exprs.push(expr_builder(lhs_expr, rhs_expr)?); + names.push(format!("{lhs_name} {op} {rhs_name}")); + } + (exprs, names) + } + }; + + Ok(IslandFieldExprs { + exprs, + names, + scalar, + }) + } + } + } + + fn project_binary_island( + &mut self, + input: LogicalPlan, + base_alias: &TableReference, + base_ctx: &PromPlannerContext, + field_exprs: IslandFieldExprs, + ) -> Result { + self.ctx = base_ctx.clone(); + + let schema = input.schema(); + let non_field_exprs = base_ctx + .tag_columns + .iter() + .chain(base_ctx.time_index_column.iter()) + .map(|column| { + schema + .qualified_field_with_name(Some(base_alias), column) + .context(DataFusionPlanningSnafu) + .map(|field| DfExpr::Column(field.into())) + }); + let tsid_expr = Self::optional_tsid_projection(schema, Some(base_alias), base_ctx.use_tsid) + .into_iter() + .map(Ok); + + self.ctx.field_columns = field_exprs.names; + let field_exprs = field_exprs + .exprs + .into_iter() + .zip(self.ctx.field_columns.iter()) + .map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::, name)))); + + let project_exprs = non_field_exprs + .chain(tsid_expr) + .chain(field_exprs) + .collect::>>()?; + + LogicalPlanBuilder::from(input) + .project(project_exprs) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } + async fn prom_binary_expr_to_plan( &mut self, query_engine_state: &QueryEngineState, binary_expr: &PromBinaryExpr, ) -> Result { + if let Some(plan) = self.try_plan_binary_island(binary_expr).await? { + return Ok(plan); + } + let PromBinaryExpr { lhs, rhs, @@ -3498,6 +3991,24 @@ impl PromPlanner { right: &LogicalPlan, only_join_time_index: bool, modifier: &Option, + ) -> (BTreeSet, BTreeSet) { + Self::binary_join_key_columns_with_context( + left, + right, + &self.ctx, + &self.ctx, + only_join_time_index, + modifier, + ) + } + + fn binary_join_key_columns_with_context( + left: &LogicalPlan, + right: &LogicalPlan, + left_ctx: &PromPlannerContext, + right_ctx: &PromPlannerContext, + only_join_time_index: bool, + modifier: &Option, ) -> (BTreeSet, BTreeSet) { let use_tsid_join = !only_join_time_index && modifier.as_ref().is_none_or(|modifier| { @@ -3516,13 +4027,21 @@ impl PromPlanner { let left_tag_columns = if only_join_time_index { BTreeSet::new() } else { - self.ctx + left_ctx + .tag_columns + .iter() + .cloned() + .collect::>() + }; + let right_tag_columns = if only_join_time_index { + BTreeSet::new() + } else { + right_ctx .tag_columns .iter() .cloned() .collect::>() }; - let right_tag_columns = left_tag_columns.clone(); (left_tag_columns, right_tag_columns) }; @@ -4974,7 +5493,7 @@ mod test { } #[tokio::test] - async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() { + async fn repeated_tsid_binary_operand_reuses_leaf_plan() { let eval_stmt = build_eval_stmt("((some_metric - some_alt_metric) / some_metric) * 100"); let table_provider = build_test_table_provider_with_tsid( @@ -4995,12 +5514,24 @@ mod test { .unwrap(); let plan_str = plan.display_indent_schema().to_string(); - assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert_eq!(plan_str.matches("__tsid =").count(), 1, "{plan_str}"); + assert_eq!( + plan_str + .matches("Filter: phy.__table_id = UInt32(1024)") + .count(), + 1, + "{plan_str}" + ); + assert_eq!( + plan_str.matches("PromInstantManipulate").count(), + 2, + "{plan_str}" + ); assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); } #[tokio::test] - async fn repeated_tsid_binary_operand_keeps_shorter_field_side() { + async fn repeated_tsid_binary_operand_reuses_shorter_field_side() { let eval_stmt = build_eval_stmt("((two_field_metric - one_field_metric) / one_field_metric) * 100"); @@ -5043,10 +5574,210 @@ mod test { .count(); assert_eq!(value_columns, 1, "{field_names:?}"); let plan_str = plan.display_indent_schema().to_string(); - assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert_eq!(plan_str.matches("__tsid =").count(), 1, "{plan_str}"); + assert_eq!( + plan_str + .matches("Filter: phy.__table_id = UInt32(1025)") + .count(), + 1, + "{plan_str}" + ); assert!(!plan_str.contains("tag_0 ="), "{plan_str}"); } + #[tokio::test] + async fn binary_island_reuses_self_operand_without_join() { + let eval_stmt = build_eval_stmt("some_metric / some_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 0, "{plan_str}"); + assert_eq!( + plan_str + .matches("Filter: phy.__table_id = UInt32(1024)") + .count(), + 1, + "{plan_str}" + ); + assert_eq!( + plan_str.matches("PromInstantManipulate").count(), + 1, + "{plan_str}" + ); + } + + #[tokio::test] + async fn binary_island_reuses_leaf_across_two_branches() { + let eval_stmt = + build_eval_stmt("(some_metric + some_alt_metric) / (some_metric + third_metric)"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + (DEFAULT_SCHEMA_NAME.to_string(), "third_metric".to_string()), + ], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert_eq!( + plan_str + .matches("Filter: phy.__table_id = UInt32(1024)") + .count(), + 1, + "{plan_str}" + ); + assert_eq!( + plan_str.matches("PromInstantManipulate").count(), + 3, + "{plan_str}" + ); + } + + #[tokio::test] + async fn binary_island_keeps_distinct_matcher_leaves() { + let eval_stmt = build_eval_stmt( + "(some_metric{tag_0=\"foo\"} + some_alt_metric) / some_metric{tag_0=\"bar\"}", + ); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert_eq!( + plan_str.matches("PromInstantManipulate").count(), + 3, + "{plan_str}" + ); + } + + #[tokio::test] + async fn binary_island_keeps_offset_leaves_distinct() { + let eval_stmt = build_eval_stmt("(some_metric offset 5m + some_alt_metric) / some_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert_eq!( + plan_str.matches("PromInstantManipulate").count(), + 3, + "{plan_str}" + ); + } + + #[tokio::test] + async fn binary_island_falls_back_for_group_modifier() { + let eval_stmt = build_eval_stmt( + "(some_metric + ignoring(tag_0) group_left some_alt_metric) / some_metric", + ); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!( + plan_str.matches("PromInstantManipulate").count(), + 3, + "{plan_str}" + ); + } + + #[tokio::test] + async fn binary_island_falls_back_for_comparison_filter() { + let eval_stmt = build_eval_stmt("(some_metric > some_alt_metric) / some_metric"); + + let table_provider = build_test_table_provider_with_tsid( + &[ + (DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "some_alt_metric".to_string(), + ), + ], + 1, + 1, + ) + .await; + let plan = + PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state()) + .await + .unwrap(); + + let plan_str = plan.display_indent_schema().to_string(); + assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}"); + assert_eq!( + plan_str.matches("PromInstantManipulate").count(), + 3, + "{plan_str}" + ); + } + #[tokio::test] async fn tsid_binary_join_uses_shorter_field_side() { let eval_stmt = build_eval_stmt("one_field_metric / two_field_metric"); diff --git a/tests/cases/standalone/common/promql/set_operation.result b/tests/cases/standalone/common/promql/set_operation.result index 5bb1d04d8b..83af6fc875 100644 --- a/tests/cases/standalone/common/promql/set_operation.result +++ b/tests/cases/standalone/common/promql/set_operation.result @@ -616,14 +616,14 @@ Affected Rows: 4 -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit); -+-------+---------------------+-------------------------------------------------------------------------------+ -| job | ts | lhs.greptime_value / rhs.cache_miss.greptime_value + cache_hit.greptime_value | -+-------+---------------------+-------------------------------------------------------------------------------+ -| read | 1970-01-01T00:00:03 | 0.5 | -| read | 1970-01-01T00:00:04 | 0.75 | -| write | 1970-01-01T00:00:03 | 0.5 | -| write | 1970-01-01T00:00:04 | 0.6666666666666666 | -+-------+---------------------+-------------------------------------------------------------------------------+ ++-------+---------------------+---------------------------------------------------------------------------------+ +| job | ts | cache_hit.greptime_value / cache_miss.greptime_value + cache_hit.greptime_value | ++-------+---------------------+---------------------------------------------------------------------------------+ +| read | 1970-01-01T00:00:03 | 0.5 | +| read | 1970-01-01T00:00:04 | 0.75 | +| write | 1970-01-01T00:00:03 | 0.5 | +| write | 1970-01-01T00:00:04 | 0.6666666666666666 | ++-------+---------------------+---------------------------------------------------------------------------------+ drop table cache_hit; @@ -672,14 +672,14 @@ Affected Rows: 4 -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label); -+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+ -| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value | -+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+ -| read | | 1970-01-01T00:00:03 | 0.5 | -| read | | 1970-01-01T00:00:04 | 0.75 | -| write | | 1970-01-01T00:00:03 | 0.5 | -| write | | 1970-01-01T00:00:04 | 0.6666666666666666 | -+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+ ++-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+ +| job | null_label | ts | cache_hit_with_null_label.greptime_value / cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value | ++-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+ +| read | | 1970-01-01T00:00:03 | 0.5 | +| read | | 1970-01-01T00:00:04 | 0.75 | +| write | | 1970-01-01T00:00:03 | 0.5 | +| write | | 1970-01-01T00:00:04 | 0.6666666666666666 | ++-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+ -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label); diff --git a/tests/cases/standalone/common/promql/tsid_binary_join_regression.result b/tests/cases/standalone/common/promql/tsid_binary_join_regression.result index d414eb6bba..133c1c5581 100644 --- a/tests/cases/standalone/common/promql/tsid_binary_join_regression.result +++ b/tests/cases/standalone/common/promql/tsid_binary_join_regression.result @@ -93,6 +93,46 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; |_|_| Total rows: 4_| +-+-+-+ +-- Repeated operands in a safe arithmetic island should be planned once and reused +-- in the final projection. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') (tsid_binary_join_left + tsid_binary_join_right) / tsid_binary_join_left; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_ProjectionExec: expr=[host@1 as host, job@2 as job, ts@4 as ts, __tsid@3 as __tsid, (greptime_value@0 + greptime_value@5) / greptime_value@0 as tsid_binary_join_left.greptime_value + tsid_binary_join_right.greptime_value / tsid_binary_join_left.greptime_value] REDACTED +|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@3, __tsid@1), (ts@4, ts@2)], projection=[greptime_value@0, host@1, job@2, __tsid@3, ts@4, greptime_value@5], NullsEqual: true REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4],REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2],REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED +|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED +|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED +|_|_|_| +|_|_| Total rows: 4_| ++-+-+-+ + -- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels. -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED @@ -223,6 +263,18 @@ TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; | host2 | job2 | 1970-01-01T00:00:05 | 3.0 | +-------+------+---------------------+------------------------------------------------------------------------------+ +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 5, '5s') (tsid_binary_join_left + tsid_binary_join_right) / tsid_binary_join_left; + ++-------+------+---------------------+---------------------------------------------------------------------------------------------------------------------+ +| host | job | ts | tsid_binary_join_left.greptime_value + tsid_binary_join_right.greptime_value / tsid_binary_join_left.greptime_value | ++-------+------+---------------------+---------------------------------------------------------------------------------------------------------------------+ +| host1 | job1 | 1970-01-01T00:00:00 | 1.25 | +| host1 | job1 | 1970-01-01T00:00:05 | 1.3333333333333333 | +| host2 | job2 | 1970-01-01T00:00:00 | 1.3333333333333333 | +| host2 | job2 | 1970-01-01T00:00:05 | 1.3333333333333333 | ++-------+------+---------------------+---------------------------------------------------------------------------------------------------------------------+ + DROP TABLE tsid_binary_join_right; Affected Rows: 0 diff --git a/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql b/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql index 61dfd23765..787818a60c 100644 --- a/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql +++ b/tests/cases/standalone/common/promql/tsid_binary_join_regression.sql @@ -58,6 +58,20 @@ INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES -- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; +-- Repeated operands in a safe arithmetic island should be planned once and reused +-- in the final projection. +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED +-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED +-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED +-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +TQL ANALYZE (0, 5, '5s') (tsid_binary_join_left + tsid_binary_join_right) / tsid_binary_join_left; + -- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels. -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED @@ -101,6 +115,9 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right; -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right; +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 5, '5s') (tsid_binary_join_left + tsid_binary_join_right) / tsid_binary_join_left; + DROP TABLE tsid_binary_join_right; DROP TABLE tsid_binary_join_left; DROP TABLE tsid_binary_join_physical;