mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-03 05:40:40 +00:00
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeSet, HashSet, VecDeque};
|
||||
use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
|
||||
use std::sync::Arc;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
@@ -161,6 +161,130 @@ struct PromPlannerContext {
|
||||
range: Option<Millisecond>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
struct VectorLeafKey {
|
||||
metric_name: String,
|
||||
matchers: Vec<(String, String, String)>,
|
||||
or_matchers: Vec<Vec<(String, String, String)>>,
|
||||
offset_ms: i128,
|
||||
at: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct IslandLeaf {
|
||||
selector: VectorSelector,
|
||||
display_table: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum IslandExpr {
|
||||
VectorLeaf(usize),
|
||||
Scalar(DfExpr),
|
||||
Unary {
|
||||
input: Box<IslandExpr>,
|
||||
},
|
||||
Binary {
|
||||
op: TokenType,
|
||||
lhs: Box<IslandExpr>,
|
||||
rhs: Box<IslandExpr>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct IslandCollectEnv {
|
||||
leaf_by_key: HashMap<VectorLeafKey, usize>,
|
||||
leaves: Vec<IslandLeaf>,
|
||||
vector_occurrences: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PlannedIslandLeaf {
|
||||
plan: LogicalPlan,
|
||||
ctx: PromPlannerContext,
|
||||
alias: TableReference,
|
||||
display_table: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct IslandFieldExprs {
|
||||
exprs: Vec<DfExpr>,
|
||||
names: Vec<String>,
|
||||
scalar: bool,
|
||||
}
|
||||
|
||||
impl VectorLeafKey {
|
||||
fn from_selector(selector: &VectorSelector) -> Option<Self> {
|
||||
let mut metric_name = selector.name.clone();
|
||||
let mut matchers = Vec::with_capacity(selector.matchers.matchers.len());
|
||||
|
||||
for matcher in &selector.matchers.matchers {
|
||||
if matcher.name == METRIC_NAME {
|
||||
if matcher.op != MatchOp::Equal || metric_name.is_some() {
|
||||
return None;
|
||||
}
|
||||
metric_name = Some(matcher.value.clone());
|
||||
} else {
|
||||
matchers.push(Self::matcher_key(matcher));
|
||||
}
|
||||
}
|
||||
matchers.sort();
|
||||
|
||||
let mut or_matchers = selector
|
||||
.matchers
|
||||
.or_matchers
|
||||
.iter()
|
||||
.map(|group| {
|
||||
let mut group = group.iter().map(Self::matcher_key).collect::<Vec<_>>();
|
||||
group.sort();
|
||||
group
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
or_matchers.sort();
|
||||
|
||||
Some(Self {
|
||||
metric_name: metric_name?,
|
||||
matchers,
|
||||
or_matchers,
|
||||
offset_ms: Self::offset_ms(&selector.offset),
|
||||
at: format!("{:?}", selector.at),
|
||||
})
|
||||
}
|
||||
|
||||
fn matcher_key(matcher: &Matcher) -> (String, String, String) {
|
||||
(
|
||||
matcher.name.clone(),
|
||||
matcher.op.to_string(),
|
||||
matcher.value.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
fn offset_ms(offset: &Option<Offset>) -> i128 {
|
||||
match offset {
|
||||
Some(Offset::Pos(duration)) => duration.as_millis() as i128,
|
||||
Some(Offset::Neg(duration)) => -(duration.as_millis() as i128),
|
||||
None => 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IslandCollectEnv {
|
||||
fn intern_leaf(&mut self, selector: &VectorSelector) -> Option<usize> {
|
||||
self.vector_occurrences += 1;
|
||||
let key = VectorLeafKey::from_selector(selector)?;
|
||||
if let Some(id) = self.leaf_by_key.get(&key) {
|
||||
return Some(*id);
|
||||
}
|
||||
|
||||
let id = self.leaves.len();
|
||||
self.leaves.push(IslandLeaf {
|
||||
selector: selector.clone(),
|
||||
display_table: key.metric_name.clone(),
|
||||
});
|
||||
self.leaf_by_key.insert(key, id);
|
||||
Some(id)
|
||||
}
|
||||
}
|
||||
|
||||
impl PromPlannerContext {
|
||||
fn from_eval_stmt(stmt: &EvalStmt) -> Self {
|
||||
Self {
|
||||
@@ -607,11 +731,380 @@ impl PromPlanner {
|
||||
})
|
||||
}
|
||||
|
||||
async fn try_plan_binary_island(
|
||||
&mut self,
|
||||
binary_expr: &PromBinaryExpr,
|
||||
) -> Result<Option<LogicalPlan>> {
|
||||
let original_ctx = self.ctx.clone();
|
||||
let mut collect_env = IslandCollectEnv::default();
|
||||
let Some(island_expr) = Self::collect_binary_island_expr(
|
||||
&PromExpr::Binary(binary_expr.clone()),
|
||||
&mut collect_env,
|
||||
) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
if collect_env.leaves.is_empty()
|
||||
|| collect_env.vector_occurrences <= collect_env.leaves.len()
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut planned_leaves = Vec::with_capacity(collect_env.leaves.len());
|
||||
for (idx, leaf) in collect_env.leaves.iter().enumerate() {
|
||||
let plan = self
|
||||
.prom_vector_selector_to_plan(&leaf.selector, false)
|
||||
.await?;
|
||||
let ctx = self.ctx.clone();
|
||||
let alias = TableReference::bare(format!("prom_v{idx}"));
|
||||
let plan = LogicalPlanBuilder::from(plan)
|
||||
.alias(alias.clone())
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
planned_leaves.push(PlannedIslandLeaf {
|
||||
plan,
|
||||
ctx,
|
||||
alias,
|
||||
display_table: leaf.display_table.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
if !Self::binary_island_join_contexts_supported(&planned_leaves) {
|
||||
self.ctx = original_ctx;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut input = planned_leaves[0].plan.clone();
|
||||
for right_idx in 1..planned_leaves.len() {
|
||||
input = self.join_binary_island_leaf(
|
||||
input,
|
||||
&planned_leaves[0],
|
||||
&planned_leaves[right_idx],
|
||||
)?;
|
||||
}
|
||||
|
||||
let field_exprs =
|
||||
Self::build_binary_island_field_exprs(&island_expr, &planned_leaves, input.schema())?;
|
||||
if field_exprs.scalar || field_exprs.exprs.is_empty() {
|
||||
self.ctx = original_ctx;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let plan = self.project_binary_island(
|
||||
input,
|
||||
&planned_leaves[0].alias,
|
||||
&planned_leaves[0].ctx,
|
||||
field_exprs,
|
||||
)?;
|
||||
Ok(Some(plan))
|
||||
}
|
||||
|
||||
fn collect_binary_island_expr(
|
||||
expr: &PromExpr,
|
||||
env: &mut IslandCollectEnv,
|
||||
) -> Option<IslandExpr> {
|
||||
if let Some(expr) = Self::try_build_literal_expr(expr) {
|
||||
return Some(IslandExpr::Scalar(expr));
|
||||
}
|
||||
|
||||
match expr {
|
||||
PromExpr::Paren(ParenExpr { expr }) => Self::collect_binary_island_expr(expr, env),
|
||||
PromExpr::VectorSelector(selector) => {
|
||||
let leaf = env.intern_leaf(selector)?;
|
||||
Some(IslandExpr::VectorLeaf(leaf))
|
||||
}
|
||||
PromExpr::Unary(UnaryExpr { expr }) => {
|
||||
let input = Self::collect_binary_island_expr(expr, env)?;
|
||||
Some(IslandExpr::Unary {
|
||||
input: Box::new(input),
|
||||
})
|
||||
}
|
||||
PromExpr::Binary(PromBinaryExpr {
|
||||
lhs,
|
||||
rhs,
|
||||
op,
|
||||
modifier,
|
||||
}) if Self::is_safe_binary_island_op(*op)
|
||||
&& Self::is_safe_binary_island_modifier(modifier) =>
|
||||
{
|
||||
let lhs = Self::collect_binary_island_expr(lhs, env)?;
|
||||
let rhs = Self::collect_binary_island_expr(rhs, env)?;
|
||||
Some(IslandExpr::Binary {
|
||||
op: *op,
|
||||
lhs: Box::new(lhs),
|
||||
rhs: Box::new(rhs),
|
||||
})
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_safe_binary_island_op(token: TokenType) -> bool {
|
||||
matches!(
|
||||
token.id(),
|
||||
token::T_ADD
|
||||
| token::T_SUB
|
||||
| token::T_MUL
|
||||
| token::T_DIV
|
||||
| token::T_MOD
|
||||
| token::T_POW
|
||||
| token::T_ATAN2
|
||||
)
|
||||
}
|
||||
|
||||
fn is_safe_binary_island_modifier(modifier: &Option<BinModifier>) -> bool {
|
||||
modifier.as_ref().is_none_or(|modifier| {
|
||||
!modifier.return_bool
|
||||
&& modifier.matching.is_none()
|
||||
&& matches!(modifier.card, VectorMatchCardinality::OneToOne)
|
||||
})
|
||||
}
|
||||
|
||||
fn binary_island_join_contexts_supported(leaves: &[PlannedIslandLeaf]) -> bool {
|
||||
if leaves
|
||||
.iter()
|
||||
.any(|leaf| leaf.ctx.time_index_column.is_none())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if leaves.len() <= 1 {
|
||||
return true;
|
||||
}
|
||||
|
||||
let first_tags = leaves[0].ctx.tag_columns.iter().collect::<BTreeSet<_>>();
|
||||
|
||||
leaves.iter().skip(1).all(|leaf| {
|
||||
(Self::plan_has_tsid_column(&leaves[0].plan) && Self::plan_has_tsid_column(&leaf.plan))
|
||||
|| leaf.ctx.tag_columns.iter().collect::<BTreeSet<_>>() == first_tags
|
||||
})
|
||||
}
|
||||
|
||||
fn join_binary_island_leaf(
|
||||
&self,
|
||||
left: LogicalPlan,
|
||||
first_leaf: &PlannedIslandLeaf,
|
||||
right_leaf: &PlannedIslandLeaf,
|
||||
) -> Result<LogicalPlan> {
|
||||
let only_join_time_index =
|
||||
first_leaf.ctx.tag_columns.is_empty() || right_leaf.ctx.tag_columns.is_empty();
|
||||
let (mut left_keys, mut right_keys) = Self::binary_join_key_columns_with_context(
|
||||
&left,
|
||||
&right_leaf.plan,
|
||||
&first_leaf.ctx,
|
||||
&right_leaf.ctx,
|
||||
only_join_time_index,
|
||||
&None,
|
||||
);
|
||||
|
||||
if let (Some(left_time_index_column), Some(right_time_index_column)) = (
|
||||
first_leaf.ctx.time_index_column.clone(),
|
||||
right_leaf.ctx.time_index_column.clone(),
|
||||
) {
|
||||
left_keys.insert(left_time_index_column);
|
||||
right_keys.insert(right_time_index_column);
|
||||
}
|
||||
|
||||
LogicalPlanBuilder::from(left)
|
||||
.join_detailed(
|
||||
right_leaf.plan.clone(),
|
||||
JoinType::Inner,
|
||||
(
|
||||
left_keys
|
||||
.into_iter()
|
||||
.map(|name| Column::new(Some(first_leaf.alias.clone()), name))
|
||||
.collect::<Vec<_>>(),
|
||||
right_keys
|
||||
.into_iter()
|
||||
.map(|name| Column::new(Some(right_leaf.alias.clone()), name))
|
||||
.collect::<Vec<_>>(),
|
||||
),
|
||||
None,
|
||||
NullEquality::NullEqualsNull,
|
||||
)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)
|
||||
}
|
||||
|
||||
fn build_binary_island_field_exprs(
|
||||
expr: &IslandExpr,
|
||||
leaves: &[PlannedIslandLeaf],
|
||||
schema: &DFSchemaRef,
|
||||
) -> Result<IslandFieldExprs> {
|
||||
match expr {
|
||||
IslandExpr::VectorLeaf(id) => {
|
||||
let leaf = &leaves[*id];
|
||||
let exprs = leaf
|
||||
.ctx
|
||||
.field_columns
|
||||
.iter()
|
||||
.map(|field| {
|
||||
schema
|
||||
.qualified_field_with_name(Some(&leaf.alias), field)
|
||||
.context(DataFusionPlanningSnafu)
|
||||
.map(|field| DfExpr::Column(field.into()))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let names = leaf
|
||||
.ctx
|
||||
.field_columns
|
||||
.iter()
|
||||
.map(|field| format!("{}.{}", leaf.display_table, field))
|
||||
.collect();
|
||||
Ok(IslandFieldExprs {
|
||||
exprs,
|
||||
names,
|
||||
scalar: false,
|
||||
})
|
||||
}
|
||||
IslandExpr::Scalar(expr) => Ok(IslandFieldExprs {
|
||||
exprs: vec![expr.clone()],
|
||||
names: vec![expr.schema_name().to_string()],
|
||||
scalar: true,
|
||||
}),
|
||||
IslandExpr::Unary { input } => {
|
||||
let input = Self::build_binary_island_field_exprs(input, leaves, schema)?;
|
||||
let mut exprs = Vec::with_capacity(input.exprs.len());
|
||||
let mut names = Vec::with_capacity(input.names.len());
|
||||
for (expr, name) in input.exprs.into_iter().zip(input.names) {
|
||||
exprs.push(DfExpr::Negative(Box::new(expr)));
|
||||
names.push(format!("-{name}"));
|
||||
}
|
||||
Ok(IslandFieldExprs {
|
||||
exprs,
|
||||
names,
|
||||
scalar: input.scalar,
|
||||
})
|
||||
}
|
||||
IslandExpr::Binary { op, lhs, rhs } => {
|
||||
let same_leaf = match (&**lhs, &**rhs) {
|
||||
(IslandExpr::VectorLeaf(left), IslandExpr::VectorLeaf(right))
|
||||
if left == right =>
|
||||
{
|
||||
Some(*left)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
let lhs = Self::build_binary_island_field_exprs(lhs, leaves, schema)?;
|
||||
let rhs = Self::build_binary_island_field_exprs(rhs, leaves, schema)?;
|
||||
let expr_builder = Self::prom_token_to_binary_expr_builder(*op)?;
|
||||
let scalar = lhs.scalar && rhs.scalar;
|
||||
let op = op.to_string();
|
||||
|
||||
let (exprs, names) = match (lhs.scalar, rhs.scalar) {
|
||||
(true, true) => {
|
||||
let expr = expr_builder(lhs.exprs[0].clone(), rhs.exprs[0].clone())?;
|
||||
let name = format!("{} {op} {}", lhs.names[0], rhs.names[0]);
|
||||
(vec![expr], vec![name])
|
||||
}
|
||||
(true, false) => {
|
||||
let mut exprs = Vec::with_capacity(rhs.exprs.len());
|
||||
let mut names = Vec::with_capacity(rhs.names.len());
|
||||
for (rhs_expr, rhs_name) in rhs.exprs.into_iter().zip(rhs.names) {
|
||||
exprs.push(expr_builder(lhs.exprs[0].clone(), rhs_expr)?);
|
||||
names.push(format!("{} {op} {rhs_name}", lhs.names[0]));
|
||||
}
|
||||
(exprs, names)
|
||||
}
|
||||
(false, true) => {
|
||||
let mut exprs = Vec::with_capacity(lhs.exprs.len());
|
||||
let mut names = Vec::with_capacity(lhs.names.len());
|
||||
for (lhs_expr, lhs_name) in lhs.exprs.into_iter().zip(lhs.names) {
|
||||
exprs.push(expr_builder(lhs_expr, rhs.exprs[0].clone())?);
|
||||
names.push(format!("{lhs_name} {op} {}", rhs.names[0]));
|
||||
}
|
||||
(exprs, names)
|
||||
}
|
||||
(false, false) => {
|
||||
let mut exprs = Vec::new();
|
||||
let mut names = Vec::new();
|
||||
for (idx, ((lhs_expr, rhs_expr), (mut lhs_name, mut rhs_name))) in lhs
|
||||
.exprs
|
||||
.into_iter()
|
||||
.zip(rhs.exprs)
|
||||
.zip(lhs.names.into_iter().zip(rhs.names))
|
||||
.enumerate()
|
||||
{
|
||||
if let Some(leaf) = same_leaf {
|
||||
let field = leaves[leaf]
|
||||
.ctx
|
||||
.field_columns
|
||||
.get(idx)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| lhs_name.clone());
|
||||
lhs_name = format!("lhs.{field}");
|
||||
rhs_name = format!("rhs.{field}");
|
||||
}
|
||||
exprs.push(expr_builder(lhs_expr, rhs_expr)?);
|
||||
names.push(format!("{lhs_name} {op} {rhs_name}"));
|
||||
}
|
||||
(exprs, names)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(IslandFieldExprs {
|
||||
exprs,
|
||||
names,
|
||||
scalar,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn project_binary_island(
|
||||
&mut self,
|
||||
input: LogicalPlan,
|
||||
base_alias: &TableReference,
|
||||
base_ctx: &PromPlannerContext,
|
||||
field_exprs: IslandFieldExprs,
|
||||
) -> Result<LogicalPlan> {
|
||||
self.ctx = base_ctx.clone();
|
||||
|
||||
let schema = input.schema();
|
||||
let non_field_exprs = base_ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.chain(base_ctx.time_index_column.iter())
|
||||
.map(|column| {
|
||||
schema
|
||||
.qualified_field_with_name(Some(base_alias), column)
|
||||
.context(DataFusionPlanningSnafu)
|
||||
.map(|field| DfExpr::Column(field.into()))
|
||||
});
|
||||
let tsid_expr = Self::optional_tsid_projection(schema, Some(base_alias), base_ctx.use_tsid)
|
||||
.into_iter()
|
||||
.map(Ok);
|
||||
|
||||
self.ctx.field_columns = field_exprs.names;
|
||||
let field_exprs = field_exprs
|
||||
.exprs
|
||||
.into_iter()
|
||||
.zip(self.ctx.field_columns.iter())
|
||||
.map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, None::<String>, name))));
|
||||
|
||||
let project_exprs = non_field_exprs
|
||||
.chain(tsid_expr)
|
||||
.chain(field_exprs)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
LogicalPlanBuilder::from(input)
|
||||
.project(project_exprs)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)
|
||||
}
|
||||
|
||||
async fn prom_binary_expr_to_plan(
|
||||
&mut self,
|
||||
query_engine_state: &QueryEngineState,
|
||||
binary_expr: &PromBinaryExpr,
|
||||
) -> Result<LogicalPlan> {
|
||||
if let Some(plan) = self.try_plan_binary_island(binary_expr).await? {
|
||||
return Ok(plan);
|
||||
}
|
||||
|
||||
let PromBinaryExpr {
|
||||
lhs,
|
||||
rhs,
|
||||
@@ -3498,6 +3991,24 @@ impl PromPlanner {
|
||||
right: &LogicalPlan,
|
||||
only_join_time_index: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> (BTreeSet<String>, BTreeSet<String>) {
|
||||
Self::binary_join_key_columns_with_context(
|
||||
left,
|
||||
right,
|
||||
&self.ctx,
|
||||
&self.ctx,
|
||||
only_join_time_index,
|
||||
modifier,
|
||||
)
|
||||
}
|
||||
|
||||
fn binary_join_key_columns_with_context(
|
||||
left: &LogicalPlan,
|
||||
right: &LogicalPlan,
|
||||
left_ctx: &PromPlannerContext,
|
||||
right_ctx: &PromPlannerContext,
|
||||
only_join_time_index: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> (BTreeSet<String>, BTreeSet<String>) {
|
||||
let use_tsid_join = !only_join_time_index
|
||||
&& modifier.as_ref().is_none_or(|modifier| {
|
||||
@@ -3516,13 +4027,21 @@ impl PromPlanner {
|
||||
let left_tag_columns = if only_join_time_index {
|
||||
BTreeSet::new()
|
||||
} else {
|
||||
self.ctx
|
||||
left_ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<BTreeSet<_>>()
|
||||
};
|
||||
let right_tag_columns = if only_join_time_index {
|
||||
BTreeSet::new()
|
||||
} else {
|
||||
right_ctx
|
||||
.tag_columns
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<BTreeSet<_>>()
|
||||
};
|
||||
let right_tag_columns = left_tag_columns.clone();
|
||||
(left_tag_columns, right_tag_columns)
|
||||
};
|
||||
|
||||
@@ -4974,7 +5493,7 @@ mod test {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() {
|
||||
async fn repeated_tsid_binary_operand_reuses_leaf_plan() {
|
||||
let eval_stmt = build_eval_stmt("((some_metric - some_alt_metric) / some_metric) * 100");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
@@ -4995,12 +5514,24 @@ mod test {
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 1, "{plan_str}");
|
||||
assert_eq!(
|
||||
plan_str
|
||||
.matches("Filter: phy.__table_id = UInt32(1024)")
|
||||
.count(),
|
||||
1,
|
||||
"{plan_str}"
|
||||
);
|
||||
assert_eq!(
|
||||
plan_str.matches("PromInstantManipulate").count(),
|
||||
2,
|
||||
"{plan_str}"
|
||||
);
|
||||
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn repeated_tsid_binary_operand_keeps_shorter_field_side() {
|
||||
async fn repeated_tsid_binary_operand_reuses_shorter_field_side() {
|
||||
let eval_stmt =
|
||||
build_eval_stmt("((two_field_metric - one_field_metric) / one_field_metric) * 100");
|
||||
|
||||
@@ -5043,10 +5574,210 @@ mod test {
|
||||
.count();
|
||||
assert_eq!(value_columns, 1, "{field_names:?}");
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 1, "{plan_str}");
|
||||
assert_eq!(
|
||||
plan_str
|
||||
.matches("Filter: phy.__table_id = UInt32(1025)")
|
||||
.count(),
|
||||
1,
|
||||
"{plan_str}"
|
||||
);
|
||||
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_island_reuses_self_operand_without_join() {
|
||||
let eval_stmt = build_eval_stmt("some_metric / some_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string())],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 0, "{plan_str}");
|
||||
assert_eq!(
|
||||
plan_str
|
||||
.matches("Filter: phy.__table_id = UInt32(1024)")
|
||||
.count(),
|
||||
1,
|
||||
"{plan_str}"
|
||||
);
|
||||
assert_eq!(
|
||||
plan_str.matches("PromInstantManipulate").count(),
|
||||
1,
|
||||
"{plan_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_island_reuses_leaf_across_two_branches() {
|
||||
let eval_stmt =
|
||||
build_eval_stmt("(some_metric + some_alt_metric) / (some_metric + third_metric)");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "third_metric".to_string()),
|
||||
],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert_eq!(
|
||||
plan_str
|
||||
.matches("Filter: phy.__table_id = UInt32(1024)")
|
||||
.count(),
|
||||
1,
|
||||
"{plan_str}"
|
||||
);
|
||||
assert_eq!(
|
||||
plan_str.matches("PromInstantManipulate").count(),
|
||||
3,
|
||||
"{plan_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_island_keeps_distinct_matcher_leaves() {
|
||||
let eval_stmt = build_eval_stmt(
|
||||
"(some_metric{tag_0=\"foo\"} + some_alt_metric) / some_metric{tag_0=\"bar\"}",
|
||||
);
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert_eq!(
|
||||
plan_str.matches("PromInstantManipulate").count(),
|
||||
3,
|
||||
"{plan_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_island_keeps_offset_leaves_distinct() {
|
||||
let eval_stmt = build_eval_stmt("(some_metric offset 5m + some_alt_metric) / some_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert_eq!(
|
||||
plan_str.matches("PromInstantManipulate").count(),
|
||||
3,
|
||||
"{plan_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_island_falls_back_for_group_modifier() {
|
||||
let eval_stmt = build_eval_stmt(
|
||||
"(some_metric + ignoring(tag_0) group_left some_alt_metric) / some_metric",
|
||||
);
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(
|
||||
plan_str.matches("PromInstantManipulate").count(),
|
||||
3,
|
||||
"{plan_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn binary_island_falls_back_for_comparison_filter() {
|
||||
let eval_stmt = build_eval_stmt("(some_metric > some_alt_metric) / some_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
|
||||
(
|
||||
DEFAULT_SCHEMA_NAME.to_string(),
|
||||
"some_alt_metric".to_string(),
|
||||
),
|
||||
],
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.await;
|
||||
let plan =
|
||||
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let plan_str = plan.display_indent_schema().to_string();
|
||||
assert_eq!(plan_str.matches("__tsid =").count(), 2, "{plan_str}");
|
||||
assert_eq!(
|
||||
plan_str.matches("PromInstantManipulate").count(),
|
||||
3,
|
||||
"{plan_str}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_binary_join_uses_shorter_field_side() {
|
||||
let eval_stmt = build_eval_stmt("one_field_metric / two_field_metric");
|
||||
|
||||
@@ -616,14 +616,14 @@ Affected Rows: 4
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit / (cache_miss + cache_hit);
|
||||
|
||||
+-------+---------------------+-------------------------------------------------------------------------------+
|
||||
| job | ts | lhs.greptime_value / rhs.cache_miss.greptime_value + cache_hit.greptime_value |
|
||||
+-------+---------------------+-------------------------------------------------------------------------------+
|
||||
| read | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+---------------------+-------------------------------------------------------------------------------+
|
||||
+-------+---------------------+---------------------------------------------------------------------------------+
|
||||
| job | ts | cache_hit.greptime_value / cache_miss.greptime_value + cache_hit.greptime_value |
|
||||
+-------+---------------------+---------------------------------------------------------------------------------+
|
||||
| read | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+---------------------+---------------------------------------------------------------------------------+
|
||||
|
||||
drop table cache_hit;
|
||||
|
||||
@@ -672,14 +672,14 @@ Affected Rows: 4
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / (cache_miss_with_null_label + cache_hit_with_null_label);
|
||||
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| job | null_label | ts | lhs.greptime_value / rhs.cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
| read | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------+
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+
|
||||
| job | null_label | ts | cache_hit_with_null_label.greptime_value / cache_miss_with_null_label.greptime_value + cache_hit_with_null_label.greptime_value |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+
|
||||
| read | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| read | | 1970-01-01T00:00:04 | 0.75 |
|
||||
| write | | 1970-01-01T00:00:03 | 0.5 |
|
||||
| write | | 1970-01-01T00:00:04 | 0.6666666666666666 |
|
||||
+-------+------------+---------------------+---------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') cache_hit_with_null_label / ignoring(null_label) (cache_miss_with_null_label + ignoring(null_label) cache_hit_with_null_label);
|
||||
|
||||
@@ -93,6 +93,46 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- Repeated operands in a safe arithmetic island should be planned once and reused
|
||||
-- in the final projection.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') (tsid_binary_join_left + tsid_binary_join_right) / tsid_binary_join_left;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[host@1 as host, job@2 as job, ts@4 as ts, __tsid@3 as __tsid, (greptime_value@0 + greptime_value@5) / greptime_value@0 as tsid_binary_join_left.greptime_value + tsid_binary_join_right.greptime_value / tsid_binary_join_left.greptime_value] REDACTED
|
||||
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@3, __tsid@1), (ts@4, ts@2)], projection=[greptime_value@0, host@1, job@2, __tsid@3, ts@4, greptime_value@5], NullsEqual: true REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4],REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2],REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":REDACTED, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
@@ -223,6 +263,18 @@ TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
| host2 | job2 | 1970-01-01T00:00:05 | 3.0 |
|
||||
+-------+------+---------------------+------------------------------------------------------------------------------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
TQL EVAL (0, 5, '5s') (tsid_binary_join_left + tsid_binary_join_right) / tsid_binary_join_left;
|
||||
|
||||
+-------+------+---------------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| host | job | ts | tsid_binary_join_left.greptime_value + tsid_binary_join_right.greptime_value / tsid_binary_join_left.greptime_value |
|
||||
+-------+------+---------------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
| host1 | job1 | 1970-01-01T00:00:00 | 1.25 |
|
||||
| host1 | job1 | 1970-01-01T00:00:05 | 1.3333333333333333 |
|
||||
| host2 | job2 | 1970-01-01T00:00:00 | 1.3333333333333333 |
|
||||
| host2 | job2 | 1970-01-01T00:00:05 | 1.3333333333333333 |
|
||||
+-------+------+---------------------+---------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE tsid_binary_join_right;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -58,6 +58,20 @@ INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
-- Repeated operands in a safe arithmetic island should be planned once and reused
|
||||
-- in the final projection.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@1,\sts@2\],.* Hash([__tsid@1, ts@2],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[__tsid@3,\sts@4\],.* Hash([__tsid@3, ts@4],REDACTED
|
||||
-- SQLNESS REPLACE input_partitions=\d+ input_partitions=REDACTED
|
||||
-- SQLNESS REPLACE "partition_count":\{(.*?)\} "partition_count":REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') (tsid_binary_join_left + tsid_binary_join_right) / tsid_binary_join_left;
|
||||
|
||||
-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
@@ -101,6 +115,9 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right;
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
TQL EVAL (0, 5, '5s') (tsid_binary_join_left + tsid_binary_join_right) / tsid_binary_join_left;
|
||||
|
||||
DROP TABLE tsid_binary_join_right;
|
||||
DROP TABLE tsid_binary_join_left;
|
||||
DROP TABLE tsid_binary_join_physical;
|
||||
|
||||
Reference in New Issue
Block a user