feat: allow creating logical tabel with same partition rule with physical table's (#7177)

* feat: allow creating logical tabel with same partition rule with physical table's

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-11-05 14:37:17 +08:00
committed by GitHub
parent 3001c2d719
commit aac3ede261
3 changed files with 171 additions and 7 deletions

View File

@@ -216,13 +216,13 @@ impl StatementExecutor {
.table_options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
if let Some(partitions) = partitions.as_ref()
&& !partitions.exprs.is_empty()
{
self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx)
.await?;
}
// Create logical tables
ensure!(
partitions.is_none(),
InvalidPartitionRuleSnafu {
reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table",
}
);
self.create_logical_tables(std::slice::from_ref(create_table), query_ctx)
.await?
.into_iter()
@@ -399,6 +399,73 @@ impl StatementExecutor {
.collect())
}
async fn validate_logical_table_partition_rule(
&self,
create_table: &CreateTableExpr,
partitions: &Partitions,
query_ctx: &QueryContextRef,
) -> Result<()> {
let (_, mut logical_partition_exprs) =
parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?;
let physical_table_name = create_table
.table_options
.get(LOGICAL_TABLE_METADATA_KEY)
.with_context(|| CreateLogicalTablesSnafu {
reason: format!(
"expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table"
),
})?;
let physical_table = self
.catalog_manager
.table(
&create_table.catalog_name,
&create_table.schema_name,
physical_table_name,
Some(query_ctx),
)
.await
.context(CatalogSnafu)?
.context(TableNotFoundSnafu {
table_name: physical_table_name.clone(),
})?;
let physical_table_info = physical_table.table_info();
let partition_rule = self
.partition_manager
.find_table_partition_rule(&physical_table_info)
.await
.context(error::FindTablePartitionRuleSnafu {
table_name: physical_table_name.clone(),
})?;
let multi_dim_rule = partition_rule
.as_ref()
.as_any()
.downcast_ref::<MultiDimPartitionRule>()
.context(InvalidPartitionRuleSnafu {
reason: "physical table partition rule is not range-based",
})?;
// TODO(ruihang): project physical partition exprs to logical partition column
let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec();
logical_partition_exprs.sort_unstable();
physical_partition_exprs.sort_unstable();
ensure!(
physical_partition_exprs == logical_partition_exprs,
InvalidPartitionRuleSnafu {
reason: format!(
"logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}",
logical_partition_exprs, physical_partition_exprs
),
}
);
Ok(())
}
#[cfg(feature = "enterprise")]
#[tracing::instrument(skip_all)]
pub async fn create_trigger(
@@ -1610,6 +1677,51 @@ pub fn parse_partitions(
Ok((partition_exprs, partition_columns))
}
fn parse_partitions_for_logical_validation(
create_table: &CreateTableExpr,
partitions: &Partitions,
query_ctx: &QueryContextRef,
) -> Result<(Vec<String>, Vec<PartitionExpr>)> {
let partition_columns = partitions
.column_list
.iter()
.map(|ident| ident.value.clone())
.collect::<Vec<_>>();
let column_name_and_type = partition_columns
.iter()
.map(|pc| {
let column = create_table
.column_defs
.iter()
.find(|c| &c.name == pc)
.context(ColumnNotFoundSnafu { msg: pc.clone() })?;
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
.context(ColumnDataTypeSnafu)?,
);
Ok((column_name, data_type))
})
.collect::<Result<HashMap<_, _>>>()?;
let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
for expr in &partitions.exprs {
let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?;
partition_exprs.push(partition_expr);
}
MultiDimPartitionRule::try_new(
partition_columns.clone(),
vec![],
partition_exprs.clone(),
true,
)
.context(InvalidPartitionSnafu)?;
Ok((partition_columns, partition_exprs))
}
/// Verifies an alter and returns whether it is necessary to perform the alter.
///
/// # Returns

View File

@@ -37,13 +37,38 @@ with (
on_physical_table = "metric_engine_partition",
);
Error: 1004(InvalidArguments), Invalid partition rule: logical table in metric engine should not have partition rule, it will be inherited from physical table
Affected Rows: 0
create table invalid_logical_partition (
ts timestamp time index,
host string primary key,
cpu double,
)
partition on columns (host) (
host <= 'host1',
host > 'host1' and host <= 'host2',
host > 'host2' and host <= 'host3',
host > 'host3'
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
);
Error: 1004(InvalidArguments), Invalid partition rule: logical table partition rule must match the corresponding physical table's
logical table partition exprs: [PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host1"))) }, PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host3"))) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host1"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host2"))) }) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host2"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host3"))) }) }]
physical table partition exprs: [PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host1"))) }, PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host2"))) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host1"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host2"))) }) }]
create table logical_table_2 (
ts timestamp time index,
host string primary key,
cpu double,
)
partition on columns (host) (
host <= 'host1',
host > 'host1' and host <= 'host2',
host > 'host2'
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
@@ -415,6 +440,10 @@ EXPLAIN select * from logical_table_4;
|_|_|
+-+-+
drop table logical_table_1;
Affected Rows: 0
drop table logical_table_2;
Affected Rows: 0

View File

@@ -29,11 +29,32 @@ with (
on_physical_table = "metric_engine_partition",
);
create table invalid_logical_partition (
ts timestamp time index,
host string primary key,
cpu double,
)
partition on columns (host) (
host <= 'host1',
host > 'host1' and host <= 'host2',
host > 'host2' and host <= 'host3',
host > 'host3'
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
);
create table logical_table_2 (
ts timestamp time index,
host string primary key,
cpu double,
)
partition on columns (host) (
host <= 'host1',
host > 'host1' and host <= 'host2',
host > 'host2'
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
@@ -149,6 +170,8 @@ select * from logical_table_4;
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
EXPLAIN select * from logical_table_4;
drop table logical_table_1;
drop table logical_table_2;
drop table logical_table_3;