From aadfcd7821f02f8de7f8bae43684399a478767a4 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 8 Jan 2026 20:18:39 +0800 Subject: [PATCH] feat(repartition): implement validation logic for repartition table (#7538) * feat(repartition): implement validation logic for repartition_table Signed-off-by: WenyXu * refactor: minor refactor Signed-off-by: WenyXu * test: update sqlness Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/operator/src/error.rs | 12 +- src/operator/src/statement/ddl.rs | 173 ++++++++++++++++-- src/table/src/metadata.rs | 6 + .../common/alter/repartition_error.result | 163 +++++++++++++++++ .../common/alter/repartition_error.sql | 131 +++++++++++++ 5 files changed, 473 insertions(+), 12 deletions(-) create mode 100644 tests/cases/standalone/common/alter/repartition_error.result create mode 100644 tests/cases/standalone/common/alter/repartition_error.sql diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 68576db582..dffcf54573 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -736,6 +736,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to deserialize partition expression: {}", source))] + DeserializePartitionExpr { + #[snafu(source)] + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid configuration value."))] InvalidConfigValue { source: session::session_config::Error, @@ -973,7 +981,9 @@ impl ErrorExt for Error { Error::BuildDfLogicalPlan { .. } | Error::BuildTableMeta { .. } | Error::MissingInsertBody { .. } => StatusCode::Internal, - Error::ExecuteAdminFunction { .. } | Error::EncodeJson { .. } => StatusCode::Unexpected, + Error::ExecuteAdminFunction { .. } + | Error::EncodeJson { .. } + | Error::DeserializePartitionExpr { .. } => StatusCode::Unexpected, Error::ViewNotFound { .. } | Error::ViewInfoNotFound { .. } | Error::TableNotFound { .. } => StatusCode::TableNotFound, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index ed7b56f976..d1c380306b 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -79,18 +79,19 @@ use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions}; use table::table_name::TableName; +use table::table_reference::TableReference; use crate::error::{ self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, - EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, - InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, - InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result, - SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, - TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, + DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, + FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, + InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, + PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, + SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, }; -use crate::expr_helper; +use crate::expr_helper::{self, RepartitionRequest}; use crate::statement::StatementExecutor; use crate::statement::show::create_partitions_stmt; @@ -1262,17 +1263,167 @@ impl StatementExecutor { alter_table.alter_operation(), AlterTableOperation::Repartition { .. } ) { - let _request = expr_helper::to_repartition_request(alter_table, &query_context)?; - return NotSupportedSnafu { - feat: "ALTER TABLE REPARTITION", - } - .fail(); + let request = expr_helper::to_repartition_request(alter_table, &query_context)?; + return self.repartition_table(request, &query_context).await; } let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?; self.alter_table_inner(expr, query_context).await } + #[tracing::instrument(skip_all)] + pub async fn repartition_table( + &self, + request: RepartitionRequest, + query_context: &QueryContextRef, + ) -> Result { + // Check if the schema is read-only. + ensure!( + !is_readonly_schema(&request.schema_name), + SchemaReadOnlySnafu { + name: request.schema_name.clone() + } + ); + + let table_ref = TableReference::full( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ); + // Get the table from the catalog. + let table = self + .catalog_manager + .table( + &request.catalog_name, + &request.schema_name, + &request.table_name, + Some(query_context), + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + let table_id = table.table_info().ident.table_id; + // Get existing partition expressions from the table route. + let (physical_table_id, physical_table_route) = self + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await + .context(TableMetadataManagerSnafu)?; + + ensure!( + physical_table_id == table_id, + NotSupportedSnafu { + feat: "REPARTITION on logical tables" + } + ); + + let table_info = table.table_info(); + // Get partition column names from the table metadata. + let existing_partition_columns = table_info.meta.partition_columns().collect::>(); + // Repartition requires the table to have partition columns. + ensure!( + !existing_partition_columns.is_empty(), + InvalidPartitionRuleSnafu { + reason: format!( + "table {} does not have partition columns, cannot repartition", + table_ref + ) + } + ); + + // Repartition operations involving columns outside the existing partition columns are not supported. + // This restriction ensures repartition only applies to current partition columns. + let column_name_and_type = existing_partition_columns + .iter() + .map(|column| (&column.name, column.data_type.clone())) + .collect(); + let timezone = query_context.timezone(); + // Convert SQL Exprs to PartitionExprs. + let from_partition_exprs = request + .from_exprs + .iter() + .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone)) + .collect::>>()?; + + let into_partition_exprs = request + .into_exprs + .iter() + .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone)) + .collect::>>()?; + + // Parse existing partition expressions from region routes. + let mut existing_partition_exprs = + Vec::with_capacity(physical_table_route.region_routes.len()); + for route in &physical_table_route.region_routes { + let expr_json = route.region.partition_expr(); + if !expr_json.is_empty() { + match PartitionExpr::from_json_str(&expr_json) { + Ok(Some(expr)) => existing_partition_exprs.push(expr), + Ok(None) => { + // Empty + } + Err(e) => { + return Err(e).context(DeserializePartitionExprSnafu); + } + } + } + } + + // Validate that from_partition_exprs are a subset of existing partition exprs. + // We compare PartitionExpr directly since it implements Eq. + for from_expr in &from_partition_exprs { + ensure!( + existing_partition_exprs.contains(from_expr), + InvalidPartitionRuleSnafu { + reason: format!( + "partition expression '{}' does not exist in table {}", + from_expr, table_ref + ) + } + ); + } + + // Build the new partition expressions: + // new_exprs = existing_exprs - from_exprs + into_exprs + let new_partition_exprs: Vec = existing_partition_exprs + .into_iter() + .filter(|expr| !from_partition_exprs.contains(expr)) + .chain(into_partition_exprs.clone().into_iter()) + .collect(); + let new_partition_exprs_len = new_partition_exprs.len(); + + // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker. + let _ = MultiDimPartitionRule::try_new( + existing_partition_columns + .iter() + .map(|c| c.name.clone()) + .collect(), + vec![], + new_partition_exprs, + true, + ) + .context(InvalidPartitionSnafu)?; + + info!( + "Repartition table {} (table_id={}) from {:?} to {:?}, new partition count: {}", + table_ref, + table_id, + from_partition_exprs, + into_partition_exprs, + new_partition_exprs_len + ); + + // TODO(weny): Implement the repartition procedure submission. + // The repartition procedure infrastructure is not yet fully integrated with the DDL task system. + NotSupportedSnafu { + feat: "ALTER TABLE REPARTITION", + } + .fail() + } + #[tracing::instrument(skip_all)] pub async fn alter_table_inner( &self, diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 145f9308df..5e40a0e845 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -239,6 +239,12 @@ impl TableMeta { .map(|idx| &columns_schemas[*idx].name) } + pub fn partition_columns(&self) -> impl Iterator { + self.partition_key_indices + .iter() + .map(|idx| &self.schema.column_schemas()[*idx]) + } + /// Returns the new [TableMetaBuilder] after applying given `alter_kind`. /// /// The returned builder would derive the next column id of this meta. diff --git a/tests/cases/standalone/common/alter/repartition_error.result b/tests/cases/standalone/common/alter/repartition_error.result new file mode 100644 index 0000000000..95fd00a6f8 --- /dev/null +++ b/tests/cases/standalone/common/alter/repartition_error.result @@ -0,0 +1,163 @@ +-- Test split partition and merge partition error cases +-- Setup: Create a physical table with partitions +CREATE TABLE repartition_test_table( + device_id INT, + area STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(device_id) +) PARTITION ON COLUMNS (device_id) ( + device_id < 100, + device_id >= 100 AND device_id < 200, + device_id >= 200 +); + +Affected Rows: 0 + +-- Setup: Create a logical table (metric engine) +CREATE TABLE physical_metric_table( + ts TIMESTAMP TIME INDEX, + val DOUBLE +) ENGINE = metric WITH ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE logical_metric_table( + ts TIMESTAMP TIME INDEX, + val DOUBLE, + device_id STRING PRIMARY KEY +) ENGINE = metric WITH ("on_physical_table" = "physical_metric_table"); + +Affected Rows: 0 + +-- Test 0: Logical table cannot be repartitioned +ALTER TABLE logical_metric_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 50 AND device_id < 100 +); + +Error: 1001(Unsupported), Not supported: REPARTITION on logical tables + +ALTER TABLE logical_metric_table SPLIT PARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 50 AND device_id < 100 +); + +Error: 1001(Unsupported), Not supported: REPARTITION on logical tables + +ALTER TABLE logical_metric_table MERGE PARTITION ( + device_id < 100, + device_id >= 100 AND device_id < 200 +); + +Error: 1001(Unsupported), Not supported: REPARTITION on logical tables + +-- Test 1: New partition rule contains non-partition column (ts is not a partition column) +ALTER TABLE repartition_test_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 50 AND device_id < 100 AND ts < 1000 +); + +Error: 1004(InvalidArguments), Cannot find column by name: ts + +ALTER TABLE repartition_test_table SPLIT PARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 50 AND device_id < 100 AND ts < 1000 +); + +Error: 1004(InvalidArguments), Cannot find column by name: ts + +-- Test 2: From partition expr does not exist in existing partition exprs +-- device_id < 50 is not in the existing partitions (which are < 100, >= 100 AND < 200, >= 200) +ALTER TABLE repartition_test_table REPARTITION ( + device_id < 50 +) INTO ( + device_id < 25, + device_id >= 25 AND device_id < 50 +); + +Error: 1004(InvalidArguments), Invalid partition rule: partition expression 'device_id < 50' does not exist in table greptime.public.repartition_test_table + +ALTER TABLE repartition_test_table SPLIT PARTITION ( + device_id < 50 +) INTO ( + device_id < 25, + device_id >= 25 AND device_id < 50 +); + +Error: 1004(InvalidArguments), Invalid partition rule: partition expression 'device_id < 50' does not exist in table greptime.public.repartition_test_table + +ALTER TABLE repartition_test_table MERGE PARTITION ( + device_id < 50, + device_id >= 50 AND device_id < 75 +); + +Error: 1004(InvalidArguments), Invalid partition rule: partition expression 'device_id < 50' does not exist in table greptime.public.repartition_test_table + +-- Test 3: New partition rule is incomplete (cannot pass checker) +-- This creates a gap: device_id < 50 and device_id >= 100, missing [50, 100) +-- The existing partitions are: device_id < 100, device_id >= 100 AND device_id < 200, device_id >= 200 +-- After removing device_id < 100 and adding device_id < 50 and device_id >= 100, we get: +-- device_id < 50, device_id >= 100 AND device_id < 200, device_id >= 200 +-- This leaves a gap [50, 100) +ALTER TABLE repartition_test_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 100 +); + +Error: 1004(InvalidArguments), Checkpoint `device_id=50` is not covered + +ALTER TABLE repartition_test_table SPLIT PARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 100 +); + +Error: 1004(InvalidArguments), Checkpoint `device_id=50` is not covered + +-- Test 4: New partition rule has overlapping partitions +-- This creates overlapping ranges: device_id < 100 and device_id >= 50 AND device_id < 150 +-- After removing device_id < 100, we have: device_id >= 100 AND device_id < 200, device_id >= 200 +-- Adding the new ones: device_id < 100, device_id >= 50 AND device_id < 150 +-- This overlaps: [0, 100) and [50, 150) overlap in [50, 100) +ALTER TABLE repartition_test_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100, + device_id >= 50 AND device_id < 150 +); + +Error: 1004(InvalidArguments), Checkpoint `device_id=50` is overlapped + +ALTER TABLE repartition_test_table SPLIT PARTITION ( + device_id < 100 +) INTO ( + device_id < 100, + device_id >= 50 AND device_id < 150 +); + +Error: 1004(InvalidArguments), Checkpoint `device_id=50` is overlapped + +-- Cleanup +DROP TABLE repartition_test_table; + +Affected Rows: 0 + +DROP TABLE logical_metric_table; + +Affected Rows: 0 + +DROP TABLE physical_metric_table; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/repartition_error.sql b/tests/cases/standalone/common/alter/repartition_error.sql new file mode 100644 index 0000000000..58986cd3d8 --- /dev/null +++ b/tests/cases/standalone/common/alter/repartition_error.sql @@ -0,0 +1,131 @@ +-- Test split partition and merge partition error cases + +-- Setup: Create a physical table with partitions +CREATE TABLE repartition_test_table( + device_id INT, + area STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(device_id) +) PARTITION ON COLUMNS (device_id) ( + device_id < 100, + device_id >= 100 AND device_id < 200, + device_id >= 200 +); + +-- Setup: Create a logical table (metric engine) +CREATE TABLE physical_metric_table( + ts TIMESTAMP TIME INDEX, + val DOUBLE +) ENGINE = metric WITH ("physical_metric_table" = ""); + +CREATE TABLE logical_metric_table( + ts TIMESTAMP TIME INDEX, + val DOUBLE, + device_id STRING PRIMARY KEY +) ENGINE = metric WITH ("on_physical_table" = "physical_metric_table"); + +-- Test 0: Logical table cannot be repartitioned + +ALTER TABLE logical_metric_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 50 AND device_id < 100 +); + +ALTER TABLE logical_metric_table SPLIT PARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 50 AND device_id < 100 +); + +ALTER TABLE logical_metric_table MERGE PARTITION ( + device_id < 100, + device_id >= 100 AND device_id < 200 +); + +-- Test 1: New partition rule contains non-partition column (ts is not a partition column) + +ALTER TABLE repartition_test_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 50 AND device_id < 100 AND ts < 1000 +); + +ALTER TABLE repartition_test_table SPLIT PARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 50 AND device_id < 100 AND ts < 1000 +); + + +-- Test 2: From partition expr does not exist in existing partition exprs +-- device_id < 50 is not in the existing partitions (which are < 100, >= 100 AND < 200, >= 200) + +ALTER TABLE repartition_test_table REPARTITION ( + device_id < 50 +) INTO ( + device_id < 25, + device_id >= 25 AND device_id < 50 +); + +ALTER TABLE repartition_test_table SPLIT PARTITION ( + device_id < 50 +) INTO ( + device_id < 25, + device_id >= 25 AND device_id < 50 +); + +ALTER TABLE repartition_test_table MERGE PARTITION ( + device_id < 50, + device_id >= 50 AND device_id < 75 +); + +-- Test 3: New partition rule is incomplete (cannot pass checker) +-- This creates a gap: device_id < 50 and device_id >= 100, missing [50, 100) +-- The existing partitions are: device_id < 100, device_id >= 100 AND device_id < 200, device_id >= 200 +-- After removing device_id < 100 and adding device_id < 50 and device_id >= 100, we get: +-- device_id < 50, device_id >= 100 AND device_id < 200, device_id >= 200 +-- This leaves a gap [50, 100) + +ALTER TABLE repartition_test_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 100 +); + +ALTER TABLE repartition_test_table SPLIT PARTITION ( + device_id < 100 +) INTO ( + device_id < 50, + device_id >= 100 +); + +-- Test 4: New partition rule has overlapping partitions +-- This creates overlapping ranges: device_id < 100 and device_id >= 50 AND device_id < 150 +-- After removing device_id < 100, we have: device_id >= 100 AND device_id < 200, device_id >= 200 +-- Adding the new ones: device_id < 100, device_id >= 50 AND device_id < 150 +-- This overlaps: [0, 100) and [50, 150) overlap in [50, 100) + +ALTER TABLE repartition_test_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100, + device_id >= 50 AND device_id < 150 +); + +ALTER TABLE repartition_test_table SPLIT PARTITION ( + device_id < 100 +) INTO ( + device_id < 100, + device_id >= 50 AND device_id < 150 +); + +-- Cleanup +DROP TABLE repartition_test_table; +DROP TABLE logical_metric_table; +DROP TABLE physical_metric_table;