feat: use tsid on promql planner (#7590)

* expose tsid on logical table's schema and use it on planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* detect table type on planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* filter out internal columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reset tsid flag

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-21 14:25:36 +08:00
committed by GitHub
parent 576eb0aadc
commit 966ade7565
3 changed files with 1031 additions and 40 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,107 @@
CREATE TABLE tsid_physical (
ts TIMESTAMP(3) TIME INDEX,
val DOUBLE,
) ENGINE = metric WITH ("physical_metric_table" = "");
Affected Rows: 0
CREATE TABLE tsid_metric (
job STRING NULL,
instance STRING NULL,
ts TIMESTAMP(3) NOT NULL,
val DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY(job, instance),
)
ENGINE = metric
WITH(
on_physical_table = 'tsid_physical'
);
Affected Rows: 0
INSERT INTO tsid_metric VALUES
('job1', 'instance1', 0, 1),
('job1', 'instance2', 0, 2),
('job1', 'instance1', 5000, 3),
('job1', 'instance2', 5000, 4),
('job1', 'instance1', 10000, 5),
('job1', 'instance2', 10000, 6);
Affected Rows: 6
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [ts@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[ts@1 as ts], aggr=[sum(tsid_metric.val)] REDACTED
|_|_|_ProjectionExec: expr=[val@0 as val, ts@2 as ts] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_SortExec: expr=[__tsid@1 ASC, ts@2 ASC], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[val@1 as val, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 3_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[job@0 ASC NULLS LAST, instance@1 ASC NULLS LAST, ts@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[job@0 as job, instance@1 as instance, ts@2 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[job@2 as job, instance@1 as instance, ts@4 as ts], aggr=[sum(tsid_metric.val), __tsid] REDACTED
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[ts] REDACTED
|_|_|_PromSeriesDivideExec: tags=["__tsid"] REDACTED
|_|_|_SortExec: expr=[__tsid@3 ASC, ts@4 ASC], preserve_partitioning=[true] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_ProjectionExec: expr=[val@1 as val, instance@3 as instance, job@4 as job, __tsid@2 as __tsid, ts@0 as ts] REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 6_|
+-+-+-+
DROP TABLE tsid_metric;
Affected Rows: 0
DROP TABLE tsid_physical;
Affected Rows: 0

View File

@@ -0,0 +1,47 @@
CREATE TABLE tsid_physical (
ts TIMESTAMP(3) TIME INDEX,
val DOUBLE,
) ENGINE = metric WITH ("physical_metric_table" = "");
CREATE TABLE tsid_metric (
job STRING NULL,
instance STRING NULL,
ts TIMESTAMP(3) NOT NULL,
val DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY(job, instance),
)
ENGINE = metric
WITH(
on_physical_table = 'tsid_physical'
);
INSERT INTO tsid_metric VALUES
('job1', 'instance1', 0, 1),
('job1', 'instance2', 0, 2),
('job1', 'instance1', 5000, 3),
('job1', 'instance2', 5000, 4),
('job1', 'instance1', 10000, 5),
('job1', 'instance2', 10000, 6);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum(tsid_metric);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE (0, 10, '5s') sum by (job, instance) (tsid_metric);
DROP TABLE tsid_metric;
DROP TABLE tsid_physical;