fix: metrics without physical partition columns query push down (#6694)

* fix: metrics no part cols

Signed-off-by: discord9 <discord9@163.com>

* chore: typos

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* chore: rename stuff

Signed-off-by: discord9 <discord9@163.com>

* refactor: put partition rules in table

Signed-off-by: discord9 <discord9@163.com>

* more tests

Signed-off-by: discord9 <discord9@163.com>

* test: redact more

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-12 10:51:38 +08:00
committed by GitHub
parent e4454e0c7d
commit f159fcf599
6 changed files with 451 additions and 21 deletions

View File

@@ -44,6 +44,7 @@ use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use table::dist_table::DistTable;
use table::metadata::{TableId, TableInfoRef};
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table::PartitionRules;
use table::table_name::TableName;
use table::TableRef;
use tokio::sync::Semaphore;
@@ -132,6 +133,8 @@ impl KvBackendCatalogManager {
{
let mut new_table_info = (*table.table_info()).clone();
let mut phy_part_cols_not_in_logical_table = vec![];
// Remap partition key indices from physical table to logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
@@ -148,15 +151,30 @@ impl KvBackendCatalogManager {
.get(physical_index)
.and_then(|physical_column| {
// Find the corresponding index in the logical table schema
new_table_info
let idx = new_table_info
.meta
.schema
.column_index_by_name(physical_column.name.as_str())
.column_index_by_name(physical_column.name.as_str());
if idx.is_none() {
// not all part columns in physical table that are also in logical table
phy_part_cols_not_in_logical_table
.push(physical_column.name.clone());
}
idx
})
})
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
let partition_rules = if !phy_part_cols_not_in_logical_table.is_empty() {
Some(PartitionRules {
extra_phy_cols_not_in_logical_table: phy_part_cols_not_in_logical_table,
})
} else {
None
};
let new_table = DistTable::table_partitioned(Arc::new(new_table_info), partition_rules);
return Ok(new_table);
}

View File

@@ -482,14 +482,32 @@ impl PlanRewriter {
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
if provider.table().table_type() == TableType::Base {
let info = provider.table().table_info();
let table = provider.table();
if table.table_type() == TableType::Base {
let info = table.table_info();
let partition_key_indices = info.meta.partition_key_indices.clone();
let schema = info.meta.schema.clone();
let partition_cols = partition_key_indices
let mut partition_cols = partition_key_indices
.into_iter()
.map(|index| schema.column_name_by_index(index).to_string())
.collect::<Vec<String>>();
let partition_rules = table.partition_rules();
let exist_phy_part_cols_not_in_logical_table = partition_rules
.map(|r| !r.extra_phy_cols_not_in_logical_table.is_empty())
.unwrap_or(false);
if exist_phy_part_cols_not_in_logical_table && partition_cols.is_empty() {
// there are other physical partition columns that are not in logical table and part cols are empty
// so we need to add a placeholder for it to prevent certain optimization
// this is used to make sure the final partition columns(that optimizer see) are not empty
// notice if originally partition_cols is not empty, then there is no need to add this place holder,
// as subset of phy part cols can still be used for certain optimization, and it works as if
// those columns are always null
// This helps with distinguishing between non-partitioned table and partitioned table with all phy part cols not in logical table
partition_cols
.push("__OTHER_PHYSICAL_PART_COLS_PLACEHOLDER__".to_string());
}
self.partition_cols = Some(partition_cols);
}
}

View File

@@ -21,6 +21,7 @@ use store_api::storage::ScanRequest;
use crate::error::UnsupportedSnafu;
use crate::metadata::{FilterPushDownType, TableInfoRef};
use crate::table::PartitionRules;
use crate::{Table, TableRef};
#[derive(Clone)]
@@ -32,6 +33,20 @@ impl DistTable {
let table = Table::new(table_info, FilterPushDownType::Inexact, data_source);
Arc::new(table)
}
pub fn table_partitioned(
table_info: TableInfoRef,
partition_rule: Option<PartitionRules>,
) -> TableRef {
let data_source = Arc::new(DummyDataSource);
let table = Table::new_partitioned(
table_info,
FilterPushDownType::Inexact,
data_source,
partition_rule,
);
Arc::new(table)
}
}
pub struct DummyDataSource;

View File

@@ -52,6 +52,16 @@ lazy_static! {
};
}
/// Defines partition rules for a table.
/// TODO(discord9): add the entire partition exprs rules here later
pub struct PartitionRules {
/// The physical partition columns that are not in the logical table.
/// only used in kvbackend manager to store the physical partition columns that are not in the logical table.
/// This is used to avoid the partition columns in the physical table that are not in the logical table
/// to prevent certain optimizations, if table is not a logical table, this should be empty
pub extra_phy_cols_not_in_logical_table: Vec<String>,
}
pub type TableRef = Arc<Table>;
/// Table handle.
@@ -61,6 +71,7 @@ pub struct Table {
data_source: DataSourceRef,
/// Columns default [`Expr`]
column_defaults: HashMap<String, Expr>,
partition_rules: Option<PartitionRules>,
}
impl Table {
@@ -74,6 +85,22 @@ impl Table {
table_info,
filter_pushdown,
data_source,
partition_rules: None,
}
}
pub fn new_partitioned(
table_info: TableInfoRef,
filter_pushdown: FilterPushDownType,
data_source: DataSourceRef,
partition_rules: Option<PartitionRules>,
) -> Self {
Self {
column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()),
table_info,
filter_pushdown,
data_source,
partition_rules,
}
}
@@ -101,6 +128,10 @@ impl Table {
self.table_info.table_type
}
pub fn partition_rules(&self) -> Option<&PartitionRules> {
self.partition_rules.as_ref()
}
pub async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
self.data_source
.get_stream(request)

View File

@@ -51,19 +51,12 @@ with (
Affected Rows: 0
create table logical_table_3 (
ts timestamp time index,
a string,
z string,
cpu double,
primary key(a, z) -- trigger a physical table change with smaller and bigger column ids
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
);
insert into logical_table_2(ts, host, cpu) values
('2023-01-01 00:00:00', 'host1', 1.0),
('2023-01-01 00:00:01', 'host2', 2.0),
('2023-01-01 00:00:02', 'host3', 3.0);
Affected Rows: 0
Affected Rows: 3
show create table logical_table_2;
@@ -93,9 +86,273 @@ select count(*) from logical_table_2;
+----------+
| count(*) |
+----------+
| 0 |
| 3 |
+----------+
-- check if part col aggr push down works with only subset of phy part cols
select host, count(*) from logical_table_2 GROUP BY host ORDER BY host;
+-------+----------+
| host | count(*) |
+-------+----------+
| host1 | 1 |
| host2 | 1 |
| host3 | 1 |
+-------+----------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN
select host, count(*) from logical_table_2 GROUP BY host ORDER BY host;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| MergeSort: logical_table_2.host ASC NULLS LAST_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Sort: logical_table_2.host ASC NULLS LAST_|
|_|_Projection: logical_table_2.host, count(*)_|
|_|_Aggregate: groupBy=[[logical_table_2.host]], aggr=[[count(logical_table_2.ts) AS count(*)]]_|
|_|_TableScan: logical_table_2_|
|_| ]]_|
| physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
-- check if step aggr push down works with non-part col
select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts;
+---------------------+----------+
| ts | count(*) |
+---------------------+----------+
| 2023-01-01T00:00:00 | 1 |
| 2023-01-01T00:00:01 | 1 |
| 2023-01-01T00:00:02 | 1 |
+---------------------+----------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN
select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: logical_table_2.ts ASC NULLS LAST_|
|_|_Aggregate: groupBy=[[logical_table_2.ts]], aggr=[[__count_merge(__count_state(logical_table_2.ts)) AS count(*)]] |
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Aggregate: groupBy=[[logical_table_2.ts]], aggr=[[__count_state(logical_table_2.ts)]]_|
|_|_TableScan: logical_table_2_|
|_| ]]_|
| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(*)]_|
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: REDACTED
|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(*)]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
create table logical_table_3 (
ts timestamp time index,
a string,
z string,
cpu double,
primary key(a, z) -- trigger a physical table change with smaller and bigger column ids
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
);
Affected Rows: 0
show create table logical_table_3;
+-----------------+-------------------------------------------------+
| Table | Create Table |
+-----------------+-------------------------------------------------+
| logical_table_3 | CREATE TABLE IF NOT EXISTS "logical_table_3" ( |
| | "a" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "z" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a", "z") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | on_physical_table = 'metric_engine_partition' |
| | ) |
+-----------------+-------------------------------------------------+
insert into logical_table_3(ts, a, z, cpu) values
('2023-01-01 00:00:00', 'a1', 'z1', 1.0),
('2023-01-01 00:00:01', 'a2', 'z2', 2.0),
('2023-01-01 00:00:02', 'a3', 'z3', 3.0);
Affected Rows: 3
select count(*) from logical_table_3;
+----------+
| count(*) |
+----------+
| 3 |
+----------+
-- check if step aggr push down works with non-part col
select a, count(*) from logical_table_3 GROUP BY a ORDER BY a;
+----+----------+
| a | count(*) |
+----+----------+
| a1 | 1 |
| a2 | 1 |
| a3 | 1 |
+----+----------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN
select a, count(*) from logical_table_3 GROUP BY a ORDER BY a;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: logical_table_3.a ASC NULLS LAST_|
|_|_Aggregate: groupBy=[[logical_table_3.a]], aggr=[[__count_merge(__count_state(logical_table_3.ts)) AS count(*)]] |
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Aggregate: groupBy=[[logical_table_3.a]], aggr=[[__count_state(logical_table_3.ts)]]_|
|_|_TableScan: logical_table_3_|
|_| ]]_|
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)]_|
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: REDACTED
|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
-- create a logical table without partition columns on physical table
create table logical_table_4 (
ts timestamp time index,
cpu double,
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
);
Affected Rows: 0
show create table logical_table_4;
+-----------------+-------------------------------------------------+
| Table | Create Table |
+-----------------+-------------------------------------------------+
| logical_table_4 | CREATE TABLE IF NOT EXISTS "logical_table_4" ( |
| | "cpu" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | on_physical_table = 'metric_engine_partition' |
| | ) |
+-----------------+-------------------------------------------------+
insert into logical_table_4(ts, cpu) values
('2023-01-01 00:00:00', 1.0),
('2023-01-01 00:00:01', 2.0),
('2023-01-01 00:00:02', 3.0);
Affected Rows: 3
-- this should only return one row
select count(*) from logical_table_4;
+----------+
| count(*) |
+----------+
| 3 |
+----------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN select count(*) from logical_table_4;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Aggregate: groupBy=[[]], aggr=[[__count_merge(__count_state(logical_table_4.ts)) AS count(*)]]_|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Aggregate: groupBy=[[]], aggr=[[__count_state(logical_table_4.ts)]]_|
|_|_TableScan: logical_table_4_|
|_| ]]_|
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(*)]_|
|_|_CoalescePartitionsExec_|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[count(*)]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
-- check if step aggr push down works with non-part col
select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
+---------------------+----------+
| ts | count(*) |
+---------------------+----------+
| 2023-01-01T00:00:00 | 1 |
| 2023-01-01T00:00:01 | 1 |
| 2023-01-01T00:00:02 | 1 |
+---------------------+----------+
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN
select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: logical_table_4.ts ASC NULLS LAST_|
|_|_Aggregate: groupBy=[[logical_table_4.ts]], aggr=[[__count_merge(__count_state(logical_table_4.ts)) AS count(*)]] |
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Aggregate: groupBy=[[logical_table_4.ts]], aggr=[[__count_state(logical_table_4.ts)]]_|
|_|_TableScan: logical_table_4_|
|_| ]]_|
| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(*)]_|
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: REDACTED
|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(*)]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
drop table logical_table_2;
Affected Rows: 0
@@ -104,6 +361,10 @@ drop table logical_table_3;
Affected Rows: 0
drop table logical_table_4;
Affected Rows: 0
drop table metric_engine_partition;
Affected Rows: 0

View File

@@ -39,6 +39,37 @@ with (
on_physical_table = "metric_engine_partition",
);
insert into logical_table_2(ts, host, cpu) values
('2023-01-01 00:00:00', 'host1', 1.0),
('2023-01-01 00:00:01', 'host2', 2.0),
('2023-01-01 00:00:02', 'host3', 3.0);
show create table logical_table_2;
select count(*) from logical_table_2;
-- check if part col aggr push down works with only subset of phy part cols
select host, count(*) from logical_table_2 GROUP BY host ORDER BY host;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN
select host, count(*) from logical_table_2 GROUP BY host ORDER BY host;
-- check if step aggr push down works with non-part col
select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN
select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts;
create table logical_table_3 (
ts timestamp time index,
a string,
@@ -51,12 +82,68 @@ with (
on_physical_table = "metric_engine_partition",
);
show create table logical_table_2;
show create table logical_table_3;
select count(*) from logical_table_2;
insert into logical_table_3(ts, a, z, cpu) values
('2023-01-01 00:00:00', 'a1', 'z1', 1.0),
('2023-01-01 00:00:01', 'a2', 'z2', 2.0),
('2023-01-01 00:00:02', 'a3', 'z3', 3.0);
select count(*) from logical_table_3;
-- check if step aggr push down works with non-part col
select a, count(*) from logical_table_3 GROUP BY a ORDER BY a;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN
select a, count(*) from logical_table_3 GROUP BY a ORDER BY a;
-- create a logical table without partition columns on physical table
create table logical_table_4 (
ts timestamp time index,
cpu double,
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
);
show create table logical_table_4;
insert into logical_table_4(ts, cpu) values
('2023-01-01 00:00:00', 1.0),
('2023-01-01 00:00:01', 2.0),
('2023-01-01 00:00:02', 3.0);
-- this should only return one row
select count(*) from logical_table_4;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN select count(*) from logical_table_4;
-- check if step aggr push down works with non-part col
select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN
select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
drop table logical_table_2;
drop table logical_table_3;
drop table logical_table_4;
drop table metric_engine_partition;