fix filter join case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-08 12:44:13 +08:00
parent 6b55a7a122
commit e914f58276
5 changed files with 140 additions and 64 deletions

View File

@@ -43,7 +43,6 @@ use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use session::context::QueryContextRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME;
use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::time::Instant;
@@ -429,35 +428,20 @@ impl MergeScanExec {
return None;
}
// Metric-engine scans can satisfy any hash distribution that includes `__tsid`.
// Equal requested keys also share the same `__tsid`, and equal `__tsid` values stay
// co-located across MergeScan partitions.
let overlaps = if self
.arrow_schema
.column_with_name(DATA_SCHEMA_TSID_COLUMN_NAME)
.is_some()
&& hash_exprs.iter().any(|expr| {
let all_partition_col_aliases: HashSet<_> = self
.partition_cols
.values()
.flat_map(|aliases| aliases.iter().map(|c| c.name()))
.collect();
let overlaps: Vec<_> = hash_exprs
.iter()
.filter(|expr| {
expr.as_any()
.downcast_ref::<Column>()
.is_some_and(|col_expr| col_expr.name() == DATA_SCHEMA_TSID_COLUMN_NAME)
}) {
hash_exprs
} else {
let all_partition_col_aliases: HashSet<_> = self
.partition_cols
.values()
.flat_map(|aliases| aliases.iter().map(|c| c.name()))
.collect();
hash_exprs
.iter()
.filter(|expr| {
expr.as_any()
.downcast_ref::<Column>()
.is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
})
.cloned()
.collect()
};
.is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
})
.cloned()
.collect();
if overlaps.is_empty() {
return None;

View File

@@ -248,12 +248,18 @@ mod tests {
fn test_merge_scan_exec(schema: SchemaRef) -> MergeScanExec {
let session_state = SessionStateBuilder::new().with_default_features().build();
let partition_cols = BTreeMap::from([(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([datafusion_common::Column::from_name(
DATA_SCHEMA_TSID_COLUMN_NAME,
)]),
)]);
let partition_cols = BTreeMap::from([
(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
BTreeSet::from([datafusion_common::Column::from_name(
DATA_SCHEMA_TSID_COLUMN_NAME,
)]),
),
(
"greptime_timestamp".to_string(),
BTreeSet::from([datafusion_common::Column::from_name("greptime_timestamp")]),
),
]);
let plan = LogicalPlanBuilder::empty(false).build().unwrap();
MergeScanExec::new(

View File

@@ -727,7 +727,6 @@ impl PromPlanner {
// if left plan or right plan tag is empty, means case like `scalar(...) + host` or `host + scalar(...)`
// under this case we only join on time index
left_context.tag_columns.is_empty() || right_context.tag_columns.is_empty(),
is_comparison_op,
modifier,
)?;
let join_plan_schema = join_plan.schema().clone();
@@ -769,7 +768,12 @@ impl PromPlanner {
}
_ => (&left_table_ref, &left_context),
};
self.project_binary_join_side(filtered, project_table_ref, project_context)
self.project_binary_join_side(
filtered,
project_table_ref,
project_context,
false,
)
} else {
self.projection_for_each_field_column(join_plan, bin_expr_builder)
}
@@ -782,6 +786,7 @@ impl PromPlanner {
input: LogicalPlan,
table_ref: &TableReference,
context: &PromPlannerContext,
keep_tsid: bool,
) -> Result<LogicalPlan> {
let schema = input.schema();
@@ -818,7 +823,7 @@ impl PromPlanner {
// Preserve `__tsid` if present, so it can still be used internally downstream. It's
// stripped from the final output anyway.
if let Some(tsid_col) =
Self::optional_tsid_projection(schema, Some(table_ref), context.use_tsid)
Self::optional_tsid_projection(schema, Some(table_ref), keep_tsid && context.use_tsid)
{
project_exprs.push(tsid_col);
}
@@ -832,6 +837,7 @@ impl PromPlanner {
// Update context to reflect the projected schema. Don't keep a table qualifier since
// the result is a derived expression.
self.ctx = context.clone();
self.ctx.use_tsid = keep_tsid && context.use_tsid;
self.ctx.table_name = None;
self.ctx.schema_name = None;
@@ -3439,11 +3445,9 @@ impl PromPlanner {
left: &LogicalPlan,
right: &LogicalPlan,
only_join_time_index: bool,
is_comparison_op: bool,
modifier: &Option<BinModifier>,
) -> (BTreeSet<String>, BTreeSet<String>) {
let use_tsid_join = !is_comparison_op
&& !only_join_time_index
let use_tsid_join = !only_join_time_index
&& modifier.as_ref().is_none_or(|modifier| {
modifier.matching.is_none()
&& matches!(modifier.card, VectorMatchCardinality::OneToOne)
@@ -3504,16 +3508,10 @@ impl PromPlanner {
left_time_index_column: Option<String>,
right_time_index_column: Option<String>,
only_join_time_index: bool,
is_comparison_op: bool,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
let (mut left_tag_columns, mut right_tag_columns) = self.binary_join_key_columns(
&left,
&right,
only_join_time_index,
is_comparison_op,
modifier,
);
let (mut left_tag_columns, mut right_tag_columns) =
self.binary_join_key_columns(&left, &right, only_join_time_index, modifier);
// push time index column if it exists
if let (Some(left_time_index_column), Some(right_time_index_column)) =
@@ -5048,9 +5046,50 @@ mod test {
}
#[tokio::test]
async fn comparison_binary_join_does_not_use_tsid() {
async fn comparison_binary_join_uses_tsid_but_filtered_result_drops_it() {
let eval_stmt = build_eval_stmt("some_metric > some_alt_metric");
let table_provider = build_test_table_provider_with_tsid(
&[
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
(
DEFAULT_SCHEMA_NAME.to_string(),
"some_alt_metric".to_string(),
),
],
2,
1,
)
.await;
let mut planner = PromPlanner {
table_provider,
ctx: PromPlannerContext::from_eval_stmt(&eval_stmt),
};
let plan = planner
.prom_expr_to_plan(&eval_stmt.expr, &build_query_engine_state())
.await
.unwrap();
let plan_str = plan.display_indent_schema().to_string();
assert!(
plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
"{plan_str}"
);
assert!(
!plan
.schema()
.fields()
.iter()
.any(|field| field.name() == DATA_SCHEMA_TSID_COLUMN_NAME),
"{plan_str}"
);
assert!(!planner.ctx.use_tsid, "{plan_str}");
}
#[tokio::test]
async fn comparison_bool_binary_join_uses_tsid_when_available() {
let eval_stmt = build_eval_stmt("some_metric > bool some_alt_metric");
let table_provider = build_test_table_provider_with_tsid(
&[
(DEFAULT_SCHEMA_NAME.to_string(), "some_metric".to_string()),
@@ -5069,11 +5108,12 @@ mod test {
.unwrap();
let plan_str = plan.display_indent_schema().to_string();
assert!(!plan_str.contains("__tsid ="), "{plan_str}");
assert!(
plan_str.contains("some_metric.tag_1 = some_alt_metric.tag_1"),
plan_str.contains("some_metric.__tsid = some_alt_metric.__tsid"),
"{plan_str}"
);
assert!(!plan_str.contains("tag_0 ="), "{plan_str}");
assert!(!plan_str.contains("tag_1 ="), "{plan_str}");
}
#[tokio::test]

View File

@@ -1,6 +1,6 @@
-- Regression test for TSID-backed PromQL binary joins on metric-engine tables.
-- Default arithmetic joins should use `__tsid`, while label modifiers and comparison
-- filters must continue to use label-based matching.
-- Default arithmetic and comparison joins should use `__tsid` when matching is the
-- default one-to-one case. Label modifiers still have to stay label-based.
CREATE TABLE tsid_binary_join_physical (
ts TIMESTAMP(3) TIME INDEX,
greptime_value DOUBLE,
@@ -68,10 +68,10 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, greptime_value@0 / greptime_value@1 as tsid_binary_join_left.greptime_value / tsid_binary_join_right.greptime_value] REDACTED
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED
|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2], 32), input_partitions=32 REDACTED
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4], 32), input_partitions=32 REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
@@ -124,7 +124,8 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join
|_|_| Total rows: 4_|
+-+-+-+
-- Comparison filters must keep label-based matching semantics.
-- Comparison filters can join on `__tsid`, but the filtered result must still behave like
-- a regular derived vector downstream.
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
@@ -136,12 +137,47 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[ts@4 as ts, greptime_value@0 as greptime_value, host@1 as host, job@2 as job, __tsid@3 as __tsid] REDACTED
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(host@1, host@1), (job@2, job@2), (ts@4, ts@3)], filter=greptime_value@1 < greptime_value@0, projection=[greptime_value@0, host@1, job@2, __tsid@3, ts@4], NullsEqual: true REDACTED
|_|_|_RepartitionExec: partitioning=Hash([host@1, job@2, ts@4], 32), input_partitions=32 REDACTED
| 0_| 0_|_ProjectionExec: expr=[ts@3 as ts, greptime_value@0 as greptime_value, host@1 as host, job@2 as job] REDACTED
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@3, __tsid@1), (ts@4, ts@2)], filter=greptime_value@1 < greptime_value@0, projection=[greptime_value@0, host@1, job@2, ts@4], NullsEqual: true REDACTED
|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4], 32), input_partitions=32 REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_RepartitionExec: partitioning=Hash([host@1, job@2, ts@3], 32), input_partitions=32 REDACTED
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, host@1 as host, job@2 as job, ts@4 as ts] REDACTED
|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2], 32), input_partitions=32 REDACTED
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_ProjectionExec: expr=[greptime_value@1 as greptime_value, host@3 as host, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
-- `bool` comparison should follow the same TSID-backed matching path.
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_ProjectionExec: expr=[host@2 as host, job@3 as job, ts@5 as ts, __tsid@4 as __tsid, CAST(greptime_value@1 < greptime_value@0 AS Float64) as tsid_binary_join_left.greptime_value > tsid_binary_join_right.greptime_value] REDACTED
|_|_|_HashJoinExec: mode=Partitioned, join_type=Inner, on=[(__tsid@1, __tsid@3), (ts@2, ts@4)], projection=[greptime_value@0, greptime_value@3, host@4, job@5, __tsid@6, ts@7], NullsEqual: true REDACTED
|_|_|_RepartitionExec: partitioning=Hash([__tsid@1, ts@2], 32), input_partitions=32 REDACTED
|_|_|_ProjectionExec: expr=[greptime_value@0 as greptime_value, __tsid@3 as __tsid, ts@4 as ts] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_RepartitionExec: partitioning=Hash([__tsid@3, ts@4], 32), input_partitions=32 REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..5000], lookback=[300000], interval=[5000], time index=[ts] REDACTED

View File

@@ -1,6 +1,6 @@
-- Regression test for TSID-backed PromQL binary joins on metric-engine tables.
-- Default arithmetic joins should use `__tsid`, while label modifiers and comparison
-- filters must continue to use label-based matching.
-- Default arithmetic and comparison joins should use `__tsid` when matching is the
-- default one-to-one case. Label modifiers still have to stay label-based.
CREATE TABLE tsid_binary_join_physical (
ts TIMESTAMP(3) TIME INDEX,
@@ -63,7 +63,8 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join_right;
-- Comparison filters must keep label-based matching semantics.
-- Comparison filters can join on `__tsid`, but the filtered result must still behave like
-- a regular derived vector downstream.
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
@@ -72,6 +73,15 @@ TQL ANALYZE (0, 5, '5s') tsid_binary_join_left / ignoring(host) tsid_binary_join
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > tsid_binary_join_right;
-- `bool` comparison should follow the same TSID-backed matching path.
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE (0, 5, '5s') tsid_binary_join_left > bool tsid_binary_join_right;
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 5, '5s') tsid_binary_join_left / tsid_binary_join_right;