fix: use phy table when need fitler by tsid/table id (#7609)

* fix: use phy table when need fitler by tsid/table id

Signed-off-by: discord9 <discord9@163.com>

* feat: add subquery alias

Signed-off-by: discord9 <discord9@163.com>

* test: update after alias

Signed-off-by: discord9 <discord9@163.com>

* test: sort&redact

Signed-off-by: discord9 <discord9@163.com>

* test: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
discord9
2026-01-26 14:11:16 +08:00
committed by GitHub
parent 2f82e7525f
commit 085e9dfc7b
4 changed files with 312 additions and 3 deletions

View File

@@ -36,6 +36,7 @@ use query::QueryEngineRef;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parsers::utils::is_tql;
use store_api::metric_engine_consts::is_metric_engine_internal_column;
use store_api::storage::{RegionId, TableId};
use table::table_reference::TableReference;
use tokio::sync::{RwLock, oneshot};
@@ -607,6 +608,9 @@ impl BatchingEngine {
.iter()
.enumerate()
{
if is_metric_engine_internal_column(&col.name) {
continue;
}
// three cases:
// 1. val column
// 2. timestamp column

View File

@@ -1590,7 +1590,7 @@ impl PromPlanner {
let logical_table = self.table_from_source(&provider)?;
let scan_table_ref = table_ref.clone();
let mut scan_table_ref = table_ref.clone();
let mut scan_provider = provider;
let mut table_id_filter: Option<u32> = None;
@@ -1639,6 +1639,7 @@ impl PromPlanner {
if has_table_id && has_tsid {
scan_provider = physical_provider;
scan_table_ref = physical_table_ref;
table_id_filter = Some(logical_table.table_info().ident.table_id);
}
}
@@ -1726,6 +1727,8 @@ impl PromPlanner {
.eq(lit(table_id)),
)
.context(DataFusionPlanningSnafu)?
.alias(table_ref) // rename the relation back to logical table's name after filtering
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
}
@@ -4574,8 +4577,10 @@ mod test {
.unwrap();
let plan_str = plan.display_indent_schema().to_string();
assert!(plan_str.contains("TableScan: some_metric"));
assert!(!plan_str.contains("TableScan: phy"));
assert!(plan_str.contains("TableScan: phy"), "{plan}");
assert!(plan_str.contains("SubqueryAlias: some_metric"));
assert!(plan_str.contains("Filter: phy.__table_id = UInt32(1024)"));
assert!(!plan_str.contains("TableScan: some_metric"));
}
#[tokio::test]

View File

@@ -0,0 +1,187 @@
-- Simplified schema and queries for TSID on physical table
CREATE TABLE phy (
ts TIMESTAMP(3) TIME INDEX,
v DOUBLE NULL,
tag1 STRING,
tag2 STRING,
le STRING,
tag4 STRING,
tag5 STRING,
tag6 STRING NULL,
tag7 STRING NULL,
tag8 STRING NULL,
PRIMARY KEY (
tag1,
tag2,
le,
tag4,
tag5,
tag6,
tag7,
tag8
)
) ENGINE = metric WITH ("physical_metric_table" = "");
Affected Rows: 0
CREATE TABLE IF NOT EXISTS test_tsid (
tag1 STRING,
tag2 STRING,
ts TIMESTAMP(3) NOT NULL,
v DOUBLE NULL,
le STRING,
tag4 STRING,
tag5 STRING,
tag6 STRING NULL,
tag7 STRING NULL,
tag8 STRING NULL,
TIME INDEX (ts),
PRIMARY KEY (
tag1,
tag2,
le,
tag4,
tag5,
tag6,
tag7,
tag8
)
) ENGINE=metric WITH(
on_physical_table = 'phy'
);
Affected Rows: 0
INSERT INTO test_tsid
(ts, v, tag1, tag2, le, tag4, tag5, tag8, tag6, tag7)
VALUES
('2026-01-23T03:40:00Z', 2.0, 'istio-ingressgateway', 'outbound', '0.5', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'),
('2026-01-23T03:41:00Z', 5.0, 'istio-ingressgateway', 'outbound', '0.9', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'),
('2026-01-23T03:41:30Z', 10.0, 'istio-ingressgateway', 'outbound', '+Inf', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod');
Affected Rows: 3
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (
timestamp '2026-01-23 03:30:00+00' + (now() - now()),
timestamp '2026-01-23 03:45:00+00' + (now() - now()),
'1m'
) histogram_quantile(
0.50,
sum by (le, tag4, tag5) (
avg_over_time(test_tsid[30m])
)
);
+-------+------+---------------------+-------------------------------------+
| tag4 | tag5 | ts | sum(prom_avg_over_time(ts_range,v)) |
+-------+------+---------------------+-------------------------------------+
| svc-a | prod | 2026-01-23T03:40:00 | NaN |
| svc-a | prod | 2026-01-23T03:41:00 | NaN |
| svc-a | prod | 2026-01-23T03:42:00 | 0.9 |
| svc-a | prod | 2026-01-23T03:43:00 | 0.9 |
| svc-a | prod | 2026-01-23T03:44:00 | 0.9 |
| svc-a | prod | 2026-01-23T03:45:00 | 0.9 |
+-------+------+---------------------+-------------------------------------+
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED)
TQL EXPLAIN (
timestamp '2026-01-23 03:30:00+00' + (now() - now()),
timestamp '2026-01-23 03:45:00+00' + (now() - now()),
'1m'
) histogram_quantile(
0.50,
sum by (le, tag4, tag5) (
avg_over_time(test_tsid[30m])
)
);
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | HistogramFold: le=le, field=sum(prom_avg_over_time(ts_range,v)), quantile=0.5 |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: test_tsid.le ASC NULLS LAST, test_tsid.tag4 ASC NULLS LAST, test_tsid.tag5 ASC NULLS LAST, test_tsid.ts ASC NULLS LAST |
| | Aggregate: groupBy=[[test_tsid.le, test_tsid.tag4, test_tsid.tag5, test_tsid.ts]], aggr=[[sum(prom_avg_over_time(ts_range,v))]] |
| | Filter: prom_avg_over_time(ts_range,v) IS NOT NULL |
| | Projection: test_tsid.ts, prom_avg_over_time(ts_range, v) AS prom_avg_over_time(ts_range,v), test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8 |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[1800000], time index=[ts], values=["v"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["__tsid"] |
| | Sort: test_tsid.__tsid ASC NULLS FIRST, test_tsid.ts ASC NULLS FIRST |
| | Filter: test_tsid.ts >= TimestampMillisecond(-2100000, None) AND test_tsid.ts <= TimestampMillisecond(300000, None) |
| | Projection: test_tsid.v, test_tsid.le, test_tsid.tag1, test_tsid.tag2, test_tsid.tag4, test_tsid.tag5, test_tsid.tag6, test_tsid.tag7, test_tsid.tag8, test_tsid.__tsid, test_tsid.ts |
| | SubqueryAlias: test_tsid |
| | Filter: phy.__table_id=UInt32(REDACTED) |
| | TableScan: phy projection=[ts, v, tag1, tag2, le, tag4, tag5, tag6, tag7, tag8, __table_id, __tsid] |
| | ]] |
| physical_plan | HistogramFoldExec: le=@0, field=@4, quantile=0.5 |
| | SortExec: expr=[tag4@1 ASC NULLS LAST, tag5@2 ASC NULLS LAST, ts@3 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
CREATE FLOW IF NOT EXISTS test_tsid
SINK TO 'test_tsid_output'
EVAL INTERVAL '3600 s'
AS
TQL EVAL (
timestamp '2026-01-23 03:10:00+00' + (now() - now()),
timestamp '2026-01-23 03:50:00+00' + (now() - now()),
'1m'
)
histogram_quantile(
0.50,
sum by (le, tag4, tag5) (
avg_over_time(test_tsid[30m])
)
);
Affected Rows: 0
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_tsid');
+-------------------------------+
| ADMIN FLUSH_FLOW('test_tsid') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
SELECT * FROM "test_tsid_output"
ORDER BY ts
LIMIT 5;
+-------+------+---------------------+-------------------------------------+
| tag4 | tag5 | ts | sum(prom_avg_over_time(ts_range,v)) |
+-------+------+---------------------+-------------------------------------+
| svc-a | prod | 2026-01-23T03:40:00 | NaN |
| svc-a | prod | 2026-01-23T03:41:00 | NaN |
| svc-a | prod | 2026-01-23T03:42:00 | 0.9 |
| svc-a | prod | 2026-01-23T03:43:00 | 0.9 |
| svc-a | prod | 2026-01-23T03:44:00 | 0.9 |
+-------+------+---------------------+-------------------------------------+
DROP FLOW test_tsid;
Affected Rows: 0
DROP TABLE IF EXISTS "test_tsid_output";
Affected Rows: 0
DROP TABLE test_tsid;
Affected Rows: 0
DROP TABLE phy;
Affected Rows: 0

View File

@@ -0,0 +1,113 @@
-- Simplified schema and queries for TSID on physical table
CREATE TABLE phy (
ts TIMESTAMP(3) TIME INDEX,
v DOUBLE NULL,
tag1 STRING,
tag2 STRING,
le STRING,
tag4 STRING,
tag5 STRING,
tag6 STRING NULL,
tag7 STRING NULL,
tag8 STRING NULL,
PRIMARY KEY (
tag1,
tag2,
le,
tag4,
tag5,
tag6,
tag7,
tag8
)
) ENGINE = metric WITH ("physical_metric_table" = "");
CREATE TABLE IF NOT EXISTS test_tsid (
tag1 STRING,
tag2 STRING,
ts TIMESTAMP(3) NOT NULL,
v DOUBLE NULL,
le STRING,
tag4 STRING,
tag5 STRING,
tag6 STRING NULL,
tag7 STRING NULL,
tag8 STRING NULL,
TIME INDEX (ts),
PRIMARY KEY (
tag1,
tag2,
le,
tag4,
tag5,
tag6,
tag7,
tag8
)
) ENGINE=metric WITH(
on_physical_table = 'phy'
);
INSERT INTO test_tsid
(ts, v, tag1, tag2, le, tag4, tag5, tag8, tag6, tag7)
VALUES
('2026-01-23T03:40:00Z', 2.0, 'istio-ingressgateway', 'outbound', '0.5', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'),
('2026-01-23T03:41:00Z', 5.0, 'istio-ingressgateway', 'outbound', '0.9', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod'),
('2026-01-23T03:41:30Z', 10.0, 'istio-ingressgateway', 'outbound', '+Inf', 'svc-a', 'prod', 'peer.example', 'svc-b', 'prod');
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (
timestamp '2026-01-23 03:30:00+00' + (now() - now()),
timestamp '2026-01-23 03:45:00+00' + (now() - now()),
'1m'
) histogram_quantile(
0.50,
sum by (le, tag4, tag5) (
avg_over_time(test_tsid[30m])
)
);
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
-- SQLNESS REPLACE phy.__table_id\s=\sUInt32\(\d+\) phy.__table_id=UInt32(REDACTED)
TQL EXPLAIN (
timestamp '2026-01-23 03:30:00+00' + (now() - now()),
timestamp '2026-01-23 03:45:00+00' + (now() - now()),
'1m'
) histogram_quantile(
0.50,
sum by (le, tag4, tag5) (
avg_over_time(test_tsid[30m])
)
);
CREATE FLOW IF NOT EXISTS test_tsid
SINK TO 'test_tsid_output'
EVAL INTERVAL '3600 s'
AS
TQL EVAL (
timestamp '2026-01-23 03:10:00+00' + (now() - now()),
timestamp '2026-01-23 03:50:00+00' + (now() - now()),
'1m'
)
histogram_quantile(
0.50,
sum by (le, tag4, tag5) (
avg_over_time(test_tsid[30m])
)
);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_tsid');
SELECT * FROM "test_tsid_output"
ORDER BY ts
LIMIT 5;
DROP FLOW test_tsid;
DROP TABLE IF EXISTS "test_tsid_output";
DROP TABLE test_tsid;
DROP TABLE phy;