From aac3ede261b36ca432376be69103c8ca5c4fd73b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 5 Nov 2025 14:37:17 +0800 Subject: [PATCH] feat: allow creating logical tabel with same partition rule with physical table's (#7177) * feat: allow creating logical tabel with same partition rule with physical table's Signed-off-by: Ruihang Xia * fix errors Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/operator/src/statement/ddl.rs | 124 +++++++++++++++++- .../create/metric_engine_partition.result | 31 ++++- .../common/create/metric_engine_partition.sql | 23 ++++ 3 files changed, 171 insertions(+), 7 deletions(-) diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 295e33e43e..89ca7f2b78 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -216,13 +216,13 @@ impl StatementExecutor { .table_options .contains_key(LOGICAL_TABLE_METADATA_KEY) { + if let Some(partitions) = partitions.as_ref() + && !partitions.exprs.is_empty() + { + self.validate_logical_table_partition_rule(create_table, partitions, &query_ctx) + .await?; + } // Create logical tables - ensure!( - partitions.is_none(), - InvalidPartitionRuleSnafu { - reason: "logical table in metric engine should not have partition rule, it will be inherited from physical table", - } - ); self.create_logical_tables(std::slice::from_ref(create_table), query_ctx) .await? .into_iter() @@ -399,6 +399,73 @@ impl StatementExecutor { .collect()) } + async fn validate_logical_table_partition_rule( + &self, + create_table: &CreateTableExpr, + partitions: &Partitions, + query_ctx: &QueryContextRef, + ) -> Result<()> { + let (_, mut logical_partition_exprs) = + parse_partitions_for_logical_validation(create_table, partitions, query_ctx)?; + + let physical_table_name = create_table + .table_options + .get(LOGICAL_TABLE_METADATA_KEY) + .with_context(|| CreateLogicalTablesSnafu { + reason: format!( + "expect `{LOGICAL_TABLE_METADATA_KEY}` option on creating logical table" + ), + })?; + + let physical_table = self + .catalog_manager + .table( + &create_table.catalog_name, + &create_table.schema_name, + physical_table_name, + Some(query_ctx), + ) + .await + .context(CatalogSnafu)? + .context(TableNotFoundSnafu { + table_name: physical_table_name.clone(), + })?; + + let physical_table_info = physical_table.table_info(); + let partition_rule = self + .partition_manager + .find_table_partition_rule(&physical_table_info) + .await + .context(error::FindTablePartitionRuleSnafu { + table_name: physical_table_name.clone(), + })?; + + let multi_dim_rule = partition_rule + .as_ref() + .as_any() + .downcast_ref::() + .context(InvalidPartitionRuleSnafu { + reason: "physical table partition rule is not range-based", + })?; + + // TODO(ruihang): project physical partition exprs to logical partition column + let mut physical_partition_exprs = multi_dim_rule.exprs().to_vec(); + logical_partition_exprs.sort_unstable(); + physical_partition_exprs.sort_unstable(); + + ensure!( + physical_partition_exprs == logical_partition_exprs, + InvalidPartitionRuleSnafu { + reason: format!( + "logical table partition rule must match the corresponding physical table's\n logical table partition exprs:\t\t {:?}\n physical table partition exprs:\t {:?}", + logical_partition_exprs, physical_partition_exprs + ), + } + ); + + Ok(()) + } + #[cfg(feature = "enterprise")] #[tracing::instrument(skip_all)] pub async fn create_trigger( @@ -1610,6 +1677,51 @@ pub fn parse_partitions( Ok((partition_exprs, partition_columns)) } +fn parse_partitions_for_logical_validation( + create_table: &CreateTableExpr, + partitions: &Partitions, + query_ctx: &QueryContextRef, +) -> Result<(Vec, Vec)> { + let partition_columns = partitions + .column_list + .iter() + .map(|ident| ident.value.clone()) + .collect::>(); + + let column_name_and_type = partition_columns + .iter() + .map(|pc| { + let column = create_table + .column_defs + .iter() + .find(|c| &c.name == pc) + .context(ColumnNotFoundSnafu { msg: pc.clone() })?; + let column_name = &column.name; + let data_type = ConcreteDataType::from( + ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone()) + .context(ColumnDataTypeSnafu)?, + ); + Ok((column_name, data_type)) + }) + .collect::>>()?; + + let mut partition_exprs = Vec::with_capacity(partitions.exprs.len()); + for expr in &partitions.exprs { + let partition_expr = convert_one_expr(expr, &column_name_and_type, &query_ctx.timezone())?; + partition_exprs.push(partition_expr); + } + + MultiDimPartitionRule::try_new( + partition_columns.clone(), + vec![], + partition_exprs.clone(), + true, + ) + .context(InvalidPartitionSnafu)?; + + Ok((partition_columns, partition_exprs)) +} + /// Verifies an alter and returns whether it is necessary to perform the alter. /// /// # Returns diff --git a/tests/cases/standalone/common/create/metric_engine_partition.result b/tests/cases/standalone/common/create/metric_engine_partition.result index 00995a6cf6..293c0a9bb8 100644 --- a/tests/cases/standalone/common/create/metric_engine_partition.result +++ b/tests/cases/standalone/common/create/metric_engine_partition.result @@ -37,13 +37,38 @@ with ( on_physical_table = "metric_engine_partition", ); -Error: 1004(InvalidArguments), Invalid partition rule: logical table in metric engine should not have partition rule, it will be inherited from physical table +Affected Rows: 0 + +create table invalid_logical_partition ( + ts timestamp time index, + host string primary key, + cpu double, +) +partition on columns (host) ( + host <= 'host1', + host > 'host1' and host <= 'host2', + host > 'host2' and host <= 'host3', + host > 'host3' +) +engine = metric +with ( + on_physical_table = "metric_engine_partition", +); + +Error: 1004(InvalidArguments), Invalid partition rule: logical table partition rule must match the corresponding physical table's + logical table partition exprs: [PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host1"))) }, PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host3"))) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host1"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host2"))) }) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host2"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host3"))) }) }] + physical table partition exprs: [PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host1"))) }, PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host2"))) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host1"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host2"))) }) }] create table logical_table_2 ( ts timestamp time index, host string primary key, cpu double, ) +partition on columns (host) ( + host <= 'host1', + host > 'host1' and host <= 'host2', + host > 'host2' +) engine = metric with ( on_physical_table = "metric_engine_partition", @@ -415,6 +440,10 @@ EXPLAIN select * from logical_table_4; |_|_| +-+-+ +drop table logical_table_1; + +Affected Rows: 0 + drop table logical_table_2; Affected Rows: 0 diff --git a/tests/cases/standalone/common/create/metric_engine_partition.sql b/tests/cases/standalone/common/create/metric_engine_partition.sql index 7c0e539662..de995468dc 100644 --- a/tests/cases/standalone/common/create/metric_engine_partition.sql +++ b/tests/cases/standalone/common/create/metric_engine_partition.sql @@ -29,11 +29,32 @@ with ( on_physical_table = "metric_engine_partition", ); +create table invalid_logical_partition ( + ts timestamp time index, + host string primary key, + cpu double, +) +partition on columns (host) ( + host <= 'host1', + host > 'host1' and host <= 'host2', + host > 'host2' and host <= 'host3', + host > 'host3' +) +engine = metric +with ( + on_physical_table = "metric_engine_partition", +); + create table logical_table_2 ( ts timestamp time index, host string primary key, cpu double, ) +partition on columns (host) ( + host <= 'host1', + host > 'host1' and host <= 'host2', + host > 'host2' +) engine = metric with ( on_physical_table = "metric_engine_partition", @@ -149,6 +170,8 @@ select * from logical_table_4; -- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED EXPLAIN select * from logical_table_4; +drop table logical_table_1; + drop table logical_table_2; drop table logical_table_3;