diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index c52d105bb5..3ff9d0e11d 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -36,6 +36,7 @@ use query::QueryEngineRef; use session::context::QueryContext; use snafu::{OptionExt, ResultExt, ensure}; use sql::parsers::utils::is_tql; +use store_api::metric_engine_consts::is_metric_engine_internal_column; use store_api::storage::{RegionId, TableId}; use table::table_reference::TableReference; use tokio::sync::{RwLock, oneshot}; @@ -607,6 +608,9 @@ impl BatchingEngine { .iter() .enumerate() { + if is_metric_engine_internal_column(&col.name) { + continue; + } // three cases: // 1. val column // 2. timestamp column diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 91deb507ce..bfcae8e80f 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -1590,7 +1590,7 @@ impl PromPlanner { let logical_table = self.table_from_source(&provider)?; - let scan_table_ref = table_ref.clone(); + let mut scan_table_ref = table_ref.clone(); let mut scan_provider = provider; let mut table_id_filter: Option = None; @@ -1639,6 +1639,7 @@ impl PromPlanner { if has_table_id && has_tsid { scan_provider = physical_provider; + scan_table_ref = physical_table_ref; table_id_filter = Some(logical_table.table_info().ident.table_id); } } @@ -1726,6 +1727,8 @@ impl PromPlanner { .eq(lit(table_id)), ) .context(DataFusionPlanningSnafu)? + .alias(table_ref) // rename the relation back to logical table's name after filtering + .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu)?; } @@ -4574,8 +4577,10 @@ mod test { .unwrap(); let plan_str = plan.display_indent_schema().to_string(); - assert!(plan_str.contains("TableScan: some_metric")); - assert!(!plan_str.contains("TableScan: phy")); + assert!(plan_str.contains("TableScan: phy"), "{plan}"); + assert!(plan_str.contains("SubqueryAlias: some_metric")); + assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)")); + assert!(!plan_str.contains("TableScan: some_metric")); } #[tokio::test] diff --git a/tests/cases/distributed/flow-tql/tsid_on_phy.result b/tests/cases/distributed/flow-tql/tsid_on_phy.result new file mode 100644 index 0000000000..2ce9523b86 --- /dev/null +++ b/tests/cases/distributed/flow-tql/tsid_on_phy.result @@ -0,0 +1,187 @@ +-- Simplified schema and queries for TSID on physical table +CREATE TABLE phy ( + ts TIMESTAMP(3) TIME INDEX, + v DOUBLE NULL, + tag1 STRING, + tag2 STRING, + le STRING, + tag4 STRING, + tag5 STRING, + tag6 STRING NULL, + tag7 STRING NULL, + tag8 STRING NULL, + PRIMARY KEY ( + tag1, + tag2, + le, + tag4, + tag5, + tag6, + tag7, + tag8 + ) +) ENGINE = metric WITH ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE IF NOT EXISTS test_tsid ( + tag1 STRING, + tag2 STRING, + ts TIMESTAMP(3) NOT NULL, + v DOUBLE NULL, + le STRING, + tag4 STRING, + tag5 STRING, + tag6 STRING NULL, + tag7 STRING NULL, + tag8 STRING NULL, + TIME INDEX (ts), + PRIMARY KEY ( + tag1, + tag2, + le, + tag4, + tag5, + tag6, + tag7, + tag8 + ) +) ENGINE=metric WITH( + on_physical_table = 'phy' +); + +Affected Rows: 0 + +INSERT INTO test_tsid + (ts, v, tag1, tag2, le, tag4, tag5, tag8, tag6, tag7) +VALUES + ('2026-01-23T03:40:00Z', 2.0, 'istio-ingressgateway', 'outbound', '0.5', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'), + ('2026-01-23T03:41:00Z', 5.0, 'istio-ingressgateway', 'outbound', '0.9', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'), + ('2026-01-23T03:41:30Z', 10.0, 'istio-ingressgateway', 'outbound', '+Inf', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'); + +Affected Rows: 3 + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL ( + timestamp '2026-01-23 03:30:00+00' + (now() - now()), + timestamp '2026-01-23 03:45:00+00' + (now() - now()), + '1m' +) histogram_quantile( + 0.50, + sum by (le, tag4, tag5) ( + avg_over_time(test_tsid[30m]) + ) +); + ++-------+------+---------------------+-------------------------------------+ +| tag4 | tag5 | ts | sum(prom_avg_over_time(ts_range,v)) | ++-------+------+---------------------+-------------------------------------+ +| svc-a | prod | 2026-01-23T03:40:00 | NaN | +| svc-a | prod | 2026-01-23T03:41:00 | NaN | +| svc-a | prod | 2026-01-23T03:42:00 | 0.9 | +| svc-a | prod | 2026-01-23T03:43:00 | 0.9 | +| svc-a | prod | 2026-01-23T03:44:00 | 0.9 | +| svc-a | prod | 2026-01-23T03:45:00 | 0.9 | ++-------+------+---------------------+-------------------------------------+ + +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED) +TQL EXPLAIN ( + timestamp '2026-01-23 03:30:00+00' + (now() - now()), + timestamp '2026-01-23 03:45:00+00' + (now() - now()), + '1m' +) histogram_quantile( + 0.50, + sum by (le, tag4, tag5) ( + avg_over_time(test_tsid[30m]) + ) +); + ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 | +| | MergeScan [is_placeholder=false, remote_input=[ | +| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST | +| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] | +| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL | +| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 | +| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[1800000], time index=[ts], values=["v"] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] | +| | PromSeriesDivide: tags=["__tsid"] | +| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST | +| | Filter: test_tsid.ts >= TimestampMillisecond(-2100000, None) AND test_tsid.ts <= TimestampMillisecond(300000, None) | +| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts | +| | SubqueryAlias: test_tsid | +| | Filter: phy.__table_id=UInt32(REDACTED) | +| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] | +| | ]] | +| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 | +| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | RepartitionExec: REDACTED +| | CooperativeExec | +| | MergeScanExec: REDACTED +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +CREATE FLOW IF NOT EXISTS test_tsid +SINK TO 'test_tsid_output' +EVAL INTERVAL '3600 s' +AS +TQL EVAL ( + timestamp '2026-01-23 03:10:00+00' + (now() - now()), + timestamp '2026-01-23 03:50:00+00' + (now() - now()), + '1m' +) +histogram_quantile( + 0.50, + sum by (le, tag4, tag5) ( + avg_over_time(test_tsid[30m]) + ) +); + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_tsid'); + ++-------------------------------+ +| ADMIN FLUSH_FLOW('test_tsid') | ++-------------------------------+ +| FLOW_FLUSHED | ++-------------------------------+ + +SELECT * FROM "test_tsid_output" +ORDER BY ts +LIMIT 5; + ++-------+------+---------------------+-------------------------------------+ +| tag4 | tag5 | ts | sum(prom_avg_over_time(ts_range,v)) | ++-------+------+---------------------+-------------------------------------+ +| svc-a | prod | 2026-01-23T03:40:00 | NaN | +| svc-a | prod | 2026-01-23T03:41:00 | NaN | +| svc-a | prod | 2026-01-23T03:42:00 | 0.9 | +| svc-a | prod | 2026-01-23T03:43:00 | 0.9 | +| svc-a | prod | 2026-01-23T03:44:00 | 0.9 | ++-------+------+---------------------+-------------------------------------+ + +DROP FLOW test_tsid; + +Affected Rows: 0 + +DROP TABLE IF EXISTS "test_tsid_output"; + +Affected Rows: 0 + +DROP TABLE test_tsid; + +Affected Rows: 0 + +DROP TABLE phy; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/flow-tql/tsid_on_phy.sql b/tests/cases/distributed/flow-tql/tsid_on_phy.sql new file mode 100644 index 0000000000..f4f94649f8 --- /dev/null +++ b/tests/cases/distributed/flow-tql/tsid_on_phy.sql @@ -0,0 +1,113 @@ +-- Simplified schema and queries for TSID on physical table + +CREATE TABLE phy ( + ts TIMESTAMP(3) TIME INDEX, + v DOUBLE NULL, + tag1 STRING, + tag2 STRING, + le STRING, + tag4 STRING, + tag5 STRING, + tag6 STRING NULL, + tag7 STRING NULL, + tag8 STRING NULL, + PRIMARY KEY ( + tag1, + tag2, + le, + tag4, + tag5, + tag6, + tag7, + tag8 + ) +) ENGINE = metric WITH ("physical_metric_table" = ""); + +CREATE TABLE IF NOT EXISTS test_tsid ( + tag1 STRING, + tag2 STRING, + ts TIMESTAMP(3) NOT NULL, + v DOUBLE NULL, + le STRING, + tag4 STRING, + tag5 STRING, + tag6 STRING NULL, + tag7 STRING NULL, + tag8 STRING NULL, + TIME INDEX (ts), + PRIMARY KEY ( + tag1, + tag2, + le, + tag4, + tag5, + tag6, + tag7, + tag8 + ) +) ENGINE=metric WITH( + on_physical_table = 'phy' +); + +INSERT INTO test_tsid + (ts, v, tag1, tag2, le, tag4, tag5, tag8, tag6, tag7) +VALUES + ('2026-01-23T03:40:00Z', 2.0, 'istio-ingressgateway', 'outbound', '0.5', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'), + ('2026-01-23T03:41:00Z', 5.0, 'istio-ingressgateway', 'outbound', '0.9', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'), + ('2026-01-23T03:41:30Z', 10.0, 'istio-ingressgateway', 'outbound', '+Inf', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL ( + timestamp '2026-01-23 03:30:00+00' + (now() - now()), + timestamp '2026-01-23 03:45:00+00' + (now() - now()), + '1m' +) histogram_quantile( + 0.50, + sum by (le, tag4, tag5) ( + avg_over_time(test_tsid[30m]) + ) +); + +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED) +TQL EXPLAIN ( + timestamp '2026-01-23 03:30:00+00' + (now() - now()), + timestamp '2026-01-23 03:45:00+00' + (now() - now()), + '1m' +) histogram_quantile( + 0.50, + sum by (le, tag4, tag5) ( + avg_over_time(test_tsid[30m]) + ) +); + +CREATE FLOW IF NOT EXISTS test_tsid +SINK TO 'test_tsid_output' +EVAL INTERVAL '3600 s' +AS +TQL EVAL ( + timestamp '2026-01-23 03:10:00+00' + (now() - now()), + timestamp '2026-01-23 03:50:00+00' + (now() - now()), + '1m' +) +histogram_quantile( + 0.50, + sum by (le, tag4, tag5) ( + avg_over_time(test_tsid[30m]) + ) +); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_tsid'); + +SELECT * FROM "test_tsid_output" +ORDER BY ts +LIMIT 5; + +DROP FLOW test_tsid; +DROP TABLE IF EXISTS "test_tsid_output"; +DROP TABLE test_tsid; +DROP TABLE phy;