mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-29 19:30:37 +00:00
@@ -429,24 +429,10 @@ impl MergeScanExec {
|
||||
return None;
|
||||
}
|
||||
|
||||
let all_partition_col_aliases: HashSet<_> = self
|
||||
.partition_cols
|
||||
.values()
|
||||
.flat_map(|aliases| aliases.iter().map(|c| c.name()))
|
||||
.collect();
|
||||
let mut overlaps = vec![];
|
||||
for expr in &hash_exprs {
|
||||
if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
|
||||
&& all_partition_col_aliases.contains(col_expr.name())
|
||||
{
|
||||
overlaps.push(expr.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Metric-engine scans can satisfy any hash distribution that includes `__tsid`.
|
||||
// Equal requested keys also share the same `__tsid`, and equal `__tsid` values stay
|
||||
// co-located across MergeScan partitions.
|
||||
if self
|
||||
let overlaps = if self
|
||||
.arrow_schema
|
||||
.column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
.is_some()
|
||||
@@ -454,10 +440,24 @@ impl MergeScanExec {
|
||||
expr.as_any()
|
||||
.downcast_ref::<Column>()
|
||||
.is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
})
|
||||
{
|
||||
overlaps = hash_exprs.clone();
|
||||
}
|
||||
}) {
|
||||
hash_exprs
|
||||
} else {
|
||||
let all_partition_col_aliases: HashSet<_> = self
|
||||
.partition_cols
|
||||
.values()
|
||||
.flat_map(|aliases| aliases.iter().map(|c| c.name()))
|
||||
.collect();
|
||||
hash_exprs
|
||||
.iter()
|
||||
.filter(|expr| {
|
||||
expr.as_any()
|
||||
.downcast_ref::<Column>()
|
||||
.is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
|
||||
})
|
||||
.cloned()
|
||||
.collect()
|
||||
};
|
||||
|
||||
if overlaps.is_empty() {
|
||||
return None;
|
||||
|
||||
@@ -85,9 +85,10 @@ impl PassDistribution {
|
||||
let mut new_children = Vec::with_capacity(children.len());
|
||||
for (idx, child) in children.into_iter().enumerate() {
|
||||
let child_req = match required.get(idx) {
|
||||
Some(Distribution::UnspecifiedDistribution) => {
|
||||
Self::propagate_unspecified_child_requirement(plan.as_ref(), idx, ¤t_req)
|
||||
Some(Distribution::UnspecifiedDistribution) if idx == 0 => {
|
||||
Self::map_hash_requirement_through_projection(plan.as_ref(), ¤t_req)
|
||||
}
|
||||
Some(Distribution::UnspecifiedDistribution) => None,
|
||||
None => current_req.clone(),
|
||||
Some(req) => Some(req.clone()),
|
||||
};
|
||||
@@ -108,15 +109,10 @@ impl PassDistribution {
|
||||
}
|
||||
}
|
||||
|
||||
fn propagate_unspecified_child_requirement(
|
||||
fn map_hash_requirement_through_projection(
|
||||
plan: &dyn ExecutionPlan,
|
||||
idx: usize,
|
||||
current_req: &Option<Distribution>,
|
||||
) -> Option<Distribution> {
|
||||
if idx != 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let Some(Distribution::HashPartitioned(required_exprs)) = current_req else {
|
||||
return None;
|
||||
};
|
||||
|
||||
@@ -709,17 +709,12 @@ impl PromPlanner {
|
||||
self.ctx.table_name = Some("rhs".to_string());
|
||||
}
|
||||
}
|
||||
let field_columns = left_field_columns
|
||||
.iter()
|
||||
.zip(right_field_columns.iter())
|
||||
.collect::<Vec<_>>();
|
||||
let (output_field_columns, field_columns) =
|
||||
Self::align_binary_field_columns(&left_field_columns, &right_field_columns);
|
||||
// PromQL binary arithmetic only combines the shared prefix of value columns.
|
||||
// Keep the output field count aligned with that zipped prefix so planning
|
||||
// remains stable even when the two sides have uneven multi-field schemas.
|
||||
self.ctx.field_columns = field_columns
|
||||
.iter()
|
||||
.map(|(left_col_name, _)| (*left_col_name).clone())
|
||||
.collect();
|
||||
self.ctx.field_columns = output_field_columns;
|
||||
let mut field_columns = field_columns.into_iter();
|
||||
|
||||
let join_plan = self.join_on_non_field_columns(
|
||||
@@ -732,7 +727,7 @@ impl PromPlanner {
|
||||
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
|
||||
// under this case we only join on time index
|
||||
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
|
||||
!is_comparison_op,
|
||||
is_comparison_op,
|
||||
modifier,
|
||||
)?;
|
||||
let join_plan_schema = join_plan.schema().clone();
|
||||
@@ -822,11 +817,10 @@ impl PromPlanner {
|
||||
|
||||
// Preserve `__tsid` if present, so it can still be used internally downstream. It's
|
||||
// stripped from the final output anyway.
|
||||
if context.use_tsid
|
||||
&& let Ok(tsid_col) =
|
||||
schema.qualified_field_with_name(Some(table_ref), DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
if let Some(tsid_col) =
|
||||
Self::optional_tsid_projection(schema, Some(table_ref), context.use_tsid)
|
||||
{
|
||||
project_exprs.push(DfExpr::Column(tsid_col.into()));
|
||||
project_exprs.push(tsid_col);
|
||||
}
|
||||
|
||||
let plan = LogicalPlanBuilder::from(input)
|
||||
@@ -3405,56 +3399,58 @@ impl PromPlanner {
|
||||
)
|
||||
}
|
||||
|
||||
fn can_use_tsid_for_binary_join(
|
||||
fn align_binary_field_columns<'a>(
|
||||
left_field_columns: &'a [String],
|
||||
right_field_columns: &'a [String],
|
||||
) -> (Vec<String>, Vec<(&'a String, &'a String)>) {
|
||||
let field_pairs = left_field_columns
|
||||
.iter()
|
||||
.zip(right_field_columns.iter())
|
||||
.collect::<Vec<_>>();
|
||||
let output_field_columns = field_pairs
|
||||
.iter()
|
||||
.map(|(left_col_name, _)| (*left_col_name).clone())
|
||||
.collect();
|
||||
(output_field_columns, field_pairs)
|
||||
}
|
||||
|
||||
fn plan_has_tsid_column(plan: &LogicalPlan) -> bool {
|
||||
plan.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
}
|
||||
|
||||
fn optional_tsid_projection(
|
||||
schema: &DFSchemaRef,
|
||||
table_ref: Option<&TableReference>,
|
||||
keep_tsid: bool,
|
||||
) -> Option<DfExpr> {
|
||||
keep_tsid.then_some(()).and_then(|_| {
|
||||
schema
|
||||
.qualified_field_with_name(table_ref, DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
.ok()
|
||||
.map(|field| DfExpr::Column(field.into()))
|
||||
})
|
||||
}
|
||||
|
||||
fn binary_join_key_columns(
|
||||
&self,
|
||||
left: &LogicalPlan,
|
||||
right: &LogicalPlan,
|
||||
only_join_time_index: bool,
|
||||
allow_tsid_join: bool,
|
||||
is_comparison_op: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> bool {
|
||||
if only_join_time_index || !allow_tsid_join {
|
||||
return false;
|
||||
}
|
||||
) -> (BTreeSet<String>, BTreeSet<String>) {
|
||||
let use_tsid_join = !is_comparison_op
|
||||
&& !only_join_time_index
|
||||
&& modifier.as_ref().is_none_or(|modifier| {
|
||||
modifier.matching.is_none()
|
||||
&& matches!(modifier.card, VectorMatchCardinality::OneToOne)
|
||||
})
|
||||
&& Self::plan_has_tsid_column(left)
|
||||
&& Self::plan_has_tsid_column(right);
|
||||
|
||||
let modifier_allows_tsid = modifier.as_ref().is_none_or(|modifier| {
|
||||
modifier.matching.is_none() && matches!(modifier.card, VectorMatchCardinality::OneToOne)
|
||||
});
|
||||
|
||||
modifier_allows_tsid
|
||||
&& left
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
&& right
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
}
|
||||
|
||||
/// Build a inner join on time index column and tag columns to concat two logical plans.
|
||||
/// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn join_on_non_field_columns(
|
||||
&self,
|
||||
left: LogicalPlan,
|
||||
right: LogicalPlan,
|
||||
left_table_ref: TableReference,
|
||||
right_table_ref: TableReference,
|
||||
left_time_index_column: Option<String>,
|
||||
right_time_index_column: Option<String>,
|
||||
only_join_time_index: bool,
|
||||
allow_tsid_join: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> Result<LogicalPlan> {
|
||||
let use_tsid_join = Self::can_use_tsid_for_binary_join(
|
||||
&left,
|
||||
&right,
|
||||
only_join_time_index,
|
||||
allow_tsid_join,
|
||||
modifier,
|
||||
);
|
||||
let (mut left_tag_columns, mut right_tag_columns) = if use_tsid_join {
|
||||
(
|
||||
BTreeSet::from([DATA_SCHEMA_TSID_COLUMN_NAME.to_string()]),
|
||||
@@ -3474,30 +3470,51 @@ impl PromPlanner {
|
||||
(left_tag_columns, right_tag_columns)
|
||||
};
|
||||
|
||||
// apply modifier
|
||||
if !use_tsid_join && let Some(modifier) = modifier {
|
||||
// apply label modifier
|
||||
if let Some(matching) = &modifier.matching {
|
||||
match matching {
|
||||
// keeps columns mentioned in `on`
|
||||
LabelModifier::Include(on) => {
|
||||
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
|
||||
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
|
||||
right_tag_columns =
|
||||
right_tag_columns.intersection(&mask).cloned().collect();
|
||||
}
|
||||
// removes columns memtioned in `ignoring`
|
||||
LabelModifier::Exclude(ignoring) => {
|
||||
// doesn't check existence of label
|
||||
for label in &ignoring.labels {
|
||||
let _ = left_tag_columns.remove(label);
|
||||
let _ = right_tag_columns.remove(label);
|
||||
}
|
||||
if !use_tsid_join
|
||||
&& let Some(modifier) = modifier
|
||||
&& let Some(matching) = &modifier.matching
|
||||
{
|
||||
match matching {
|
||||
LabelModifier::Include(on) => {
|
||||
let mask = on.labels.iter().cloned().collect::<BTreeSet<_>>();
|
||||
left_tag_columns = left_tag_columns.intersection(&mask).cloned().collect();
|
||||
right_tag_columns = right_tag_columns.intersection(&mask).cloned().collect();
|
||||
}
|
||||
LabelModifier::Exclude(ignoring) => {
|
||||
for label in &ignoring.labels {
|
||||
let _ = left_tag_columns.remove(label);
|
||||
let _ = right_tag_columns.remove(label);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(left_tag_columns, right_tag_columns)
|
||||
}
|
||||
|
||||
/// Build a inner join on time index column and tag columns to concat two logical plans.
|
||||
/// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn join_on_non_field_columns(
|
||||
&self,
|
||||
left: LogicalPlan,
|
||||
right: LogicalPlan,
|
||||
left_table_ref: TableReference,
|
||||
right_table_ref: TableReference,
|
||||
left_time_index_column: Option<String>,
|
||||
right_time_index_column: Option<String>,
|
||||
only_join_time_index: bool,
|
||||
is_comparison_op: bool,
|
||||
modifier: &Option<BinModifier>,
|
||||
) -> Result<LogicalPlan> {
|
||||
let (mut left_tag_columns, mut right_tag_columns) = self.binary_join_key_columns(
|
||||
&left,
|
||||
&right,
|
||||
only_join_time_index,
|
||||
is_comparison_op,
|
||||
modifier,
|
||||
);
|
||||
|
||||
// push time index column if it exists
|
||||
if let (Some(left_time_index_column), Some(right_time_index_column)) =
|
||||
(left_time_index_column, right_time_index_column)
|
||||
@@ -3919,24 +3936,10 @@ impl PromPlanner {
|
||||
.iter()
|
||||
.chain(self.ctx.time_index_column.iter())
|
||||
.map(|col| Ok(DfExpr::Column(Column::new(table_ref.clone(), col))));
|
||||
let tsid_iter = self
|
||||
.ctx
|
||||
.use_tsid
|
||||
.then_some(())
|
||||
.filter(|_| {
|
||||
input
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
|
||||
})
|
||||
.into_iter()
|
||||
.map(|_| {
|
||||
Ok(DfExpr::Column(Column::new(
|
||||
table_ref.clone(),
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
)))
|
||||
});
|
||||
let tsid_iter =
|
||||
Self::optional_tsid_projection(input.schema(), table_ref.as_ref(), self.ctx.use_tsid)
|
||||
.into_iter()
|
||||
.map(Ok);
|
||||
|
||||
// build computation exprs
|
||||
let result_field_columns = self
|
||||
@@ -4195,6 +4198,18 @@ mod test {
|
||||
assert!(!plan_str.contains("Distinct:"), "{plan_str}");
|
||||
}
|
||||
|
||||
fn build_eval_stmt(expr: &str) -> EvalStmt {
|
||||
EvalStmt {
|
||||
expr: parser::parse(expr).unwrap(),
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_test_table_provider(
|
||||
table_name_tuples: &[(String, String)],
|
||||
num_tag: usize,
|
||||
@@ -4825,16 +4840,7 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_binary_join_uses_tsid_when_available() {
|
||||
let prom_expr = parser::parse("some_metric / some_alt_metric").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let eval_stmt = build_eval_stmt("some_metric / some_alt_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
@@ -4866,17 +4872,7 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_is_preserved_for_nested_default_binary_joins() {
|
||||
let prom_expr =
|
||||
parser::parse("(some_metric - some_alt_metric) / some_third_metric").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let eval_stmt = build_eval_stmt("(some_metric - some_alt_metric) / some_third_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
@@ -4906,17 +4902,7 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn repeated_tsid_binary_operand_keeps_tsid_join_keys() {
|
||||
let prom_expr =
|
||||
parser::parse("((some_metric - some_alt_metric) / some_metric) * 100").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let eval_stmt = build_eval_stmt("((some_metric - some_alt_metric) / some_metric) * 100");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
@@ -4942,18 +4928,8 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn repeated_tsid_binary_operand_keeps_shorter_field_side() {
|
||||
let prom_expr =
|
||||
parser::parse("((two_field_metric - one_field_metric) / one_field_metric) * 100")
|
||||
.unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let eval_stmt =
|
||||
build_eval_stmt("((two_field_metric - one_field_metric) / one_field_metric) * 100");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid_fields(
|
||||
&[
|
||||
@@ -5000,16 +4976,7 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn tsid_binary_join_uses_shorter_field_side() {
|
||||
let prom_expr = parser::parse("one_field_metric / two_field_metric").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let eval_stmt = build_eval_stmt("one_field_metric / two_field_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid_fields(
|
||||
&[
|
||||
@@ -5053,16 +5020,7 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn label_matching_modifier_disables_tsid_binary_join() {
|
||||
let prom_expr = parser::parse("some_metric / ignoring(tag_0) some_alt_metric").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let eval_stmt = build_eval_stmt("some_metric / ignoring(tag_0) some_alt_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
@@ -5091,16 +5049,7 @@ mod test {
|
||||
|
||||
#[tokio::test]
|
||||
async fn comparison_binary_join_does_not_use_tsid() {
|
||||
let prom_expr = parser::parse("some_metric > some_alt_metric").unwrap();
|
||||
let eval_stmt = EvalStmt {
|
||||
expr: prom_expr,
|
||||
start: UNIX_EPOCH,
|
||||
end: UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(100_000))
|
||||
.unwrap(),
|
||||
interval: Duration::from_secs(5),
|
||||
lookback_delta: Duration::from_secs(1),
|
||||
};
|
||||
let eval_stmt = build_eval_stmt("some_metric > some_alt_metric");
|
||||
|
||||
let table_provider = build_test_table_provider_with_tsid(
|
||||
&[
|
||||
|
||||
@@ -0,0 +1,185 @@
|
||||
-- Regression test for TSID-backed PromQL binary joins on metric-engine tables.
|
||||
-- Default arithmetic joins should use `__tsid`, while label modifiers and comparison
|
||||
-- filters must continue to use label-based matching.
|
||||
CREATE TABLE tsid_binary_join_physical (
|
||||
ts TIMESTAMP(3) TIME INDEX,
|
||||
greptime_value DOUBLE,
|
||||
) ENGINE = metric WITH ("physical_metric_table" = "");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE tsid_binary_join_left (
|
||||
host STRING NULL,
|
||||
job STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
greptime_value DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host, job),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_binary_join_physical'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE tsid_binary_join_right (
|
||||
host STRING NULL,
|
||||
job STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
greptime_value DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host, job),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_binary_join_physical'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO tsid_binary_join_left (host, job, ts, greptime_value) VALUES
|
||||
('host1', 'job1', 0, 12),
|
||||
('host2', 'job2', 0, 18),
|
||||
('host1', 'job1', 5000, 15),
|
||||
('host2', 'job2', 5000, 21);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES
|
||||
('host1', 'job1', 0, 3),
|
||||
('host2', 'job2', 0, 6),
|
||||
('host1', 'job1', 5000, 5),
|
||||
('host2', 'job2', 5000, 7);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- Default vector-vector arithmetic should join on `__tsid` and time index.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED
|
||||
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED
|
||||
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(job@1, job@2), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([job@1, ts@2], 32), input_partitions=32 REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, job@2 as job, ts@4 as ts] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([job@2, ts@4], 32), input_partitions=32 REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- Comparison filters must keep label-based matching semantics.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right;
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_ProjectionExec: expr=[ts@4 as ts, greptime_value@0 as greptime_value, host@1 as host, job@2 as job, __tsid@3 as __tsid] REDACTED
|
||||
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(host@1, host@1), (job@2, job@2), (ts@4, ts@3)], filter=greptime_value@1 < greptime_value@0, projection=[greptime_value@0, host@1, job@2, __tsid@3, ts@4], NullsEqual: true REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([host@1, job@2, ts@4], 32), input_partitions=32 REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([host@1, job@2, ts@3], 32), input_partitions=32 REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, host@1 as host, job@2 as job, ts@4 as ts] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
+-------+------+---------------------+------------------------------------------------------------------------------+
|
||||
| host | job | ts | tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value |
|
||||
+-------+------+---------------------+------------------------------------------------------------------------------+
|
||||
| host1 | job1 | 1970-01-01T00:00:00 | 4.0 |
|
||||
| host1 | job1 | 1970-01-01T00:00:05 | 3.0 |
|
||||
| host2 | job2 | 1970-01-01T00:00:00 | 3.0 |
|
||||
| host2 | job2 | 1970-01-01T00:00:05 | 3.0 |
|
||||
+-------+------+---------------------+------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE tsid_binary_join_right;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE tsid_binary_join_left;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE tsid_binary_join_physical;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
-- Regression test for TSID-backed PromQL binary joins on metric-engine tables.
|
||||
-- Default arithmetic joins should use `__tsid`, while label modifiers and comparison
|
||||
-- filters must continue to use label-based matching.
|
||||
|
||||
CREATE TABLE tsid_binary_join_physical (
|
||||
ts TIMESTAMP(3) TIME INDEX,
|
||||
greptime_value DOUBLE,
|
||||
) ENGINE = metric WITH ("physical_metric_table" = "");
|
||||
|
||||
CREATE TABLE tsid_binary_join_left (
|
||||
host STRING NULL,
|
||||
job STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
greptime_value DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host, job),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_binary_join_physical'
|
||||
);
|
||||
|
||||
CREATE TABLE tsid_binary_join_right (
|
||||
host STRING NULL,
|
||||
job STRING NULL,
|
||||
ts TIMESTAMP(3) NOT NULL,
|
||||
greptime_value DOUBLE NULL,
|
||||
TIME INDEX (ts),
|
||||
PRIMARY KEY(host, job),
|
||||
)
|
||||
ENGINE = metric
|
||||
WITH(
|
||||
on_physical_table = 'tsid_binary_join_physical'
|
||||
);
|
||||
|
||||
INSERT INTO tsid_binary_join_left (host, job, ts, greptime_value) VALUES
|
||||
('host1', 'job1', 0, 12),
|
||||
('host2', 'job2', 0, 18),
|
||||
('host1', 'job1', 5000, 15),
|
||||
('host2', 'job2', 5000, 21);
|
||||
|
||||
INSERT INTO tsid_binary_join_right (host, job, ts, greptime_value) VALUES
|
||||
('host1', 'job1', 0, 3),
|
||||
('host2', 'job2', 0, 6),
|
||||
('host1', 'job1', 5000, 5),
|
||||
('host2', 'job2', 5000, 7);
|
||||
|
||||
-- Default vector-vector arithmetic should join on `__tsid` and time index.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
-- Label modifiers must disable the TSID shortcut and keep matching on the remaining labels.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right;
|
||||
|
||||
-- Comparison filters must keep label-based matching semantics.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
|
||||
|
||||
DROP TABLE tsid_binary_join_right;
|
||||
DROP TABLE tsid_binary_join_left;
|
||||
DROP TABLE tsid_binary_join_physical;
|
||||
Reference in New Issue
Block a user