feat(repartition): implement validation logic for repartition table (#7538)

* feat(repartition): implement validation logic for repartition_table

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: minor refactor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: update sqlness

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-01-08 20:18:39 +08:00
committed by GitHub
parent f3e2d333e4
commit aadfcd7821
5 changed files with 473 additions and 12 deletions

View File

@@ -736,6 +736,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to deserialize partition expression: {}", source))]
DeserializePartitionExpr {
#[snafu(source)]
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid configuration value."))]
InvalidConfigValue {
source: session::session_config::Error,
@@ -973,7 +981,9 @@ impl ErrorExt for Error {
Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. }
| Error::MissingInsertBody { .. } => StatusCode::Internal,
Error::ExecuteAdminFunction { .. } | Error::EncodeJson { .. } => StatusCode::Unexpected,
Error::ExecuteAdminFunction { .. }
| Error::EncodeJson { .. }
| Error::DeserializePartitionExpr { .. } => StatusCode::Unexpected,
Error::ViewNotFound { .. }
| Error::ViewInfoNotFound { .. }
| Error::TableNotFound { .. } => StatusCode::TableNotFound,

View File

@@ -79,18 +79,19 @@ use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::error::{
self, AlterExprToRequestSnafu, BuildDfLogicalPlanSnafu, CatalogSnafu, ColumnDataTypeSnafu,
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu,
InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result,
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
DeserializePartitionExprSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
};
use crate::expr_helper;
use crate::expr_helper::{self, RepartitionRequest};
use crate::statement::StatementExecutor;
use crate::statement::show::create_partitions_stmt;
@@ -1262,17 +1263,167 @@ impl StatementExecutor {
alter_table.alter_operation(),
AlterTableOperation::Repartition { .. }
) {
let _request = expr_helper::to_repartition_request(alter_table, &query_context)?;
return NotSupportedSnafu {
feat: "ALTER TABLE REPARTITION",
}
.fail();
let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
return self.repartition_table(request, &query_context).await;
}
let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?;
self.alter_table_inner(expr, query_context).await
}
#[tracing::instrument(skip_all)]
pub async fn repartition_table(
&self,
request: RepartitionRequest,
query_context: &QueryContextRef,
) -> Result<Output> {
// Check if the schema is read-only.
ensure!(
!is_readonly_schema(&request.schema_name),
SchemaReadOnlySnafu {
name: request.schema_name.clone()
}
);
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
// Get the table from the catalog.
let table = self
.catalog_manager
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
Some(query_context),
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_id = table.table_info().ident.table_id;
// Get existing partition expressions from the table route.
let (physical_table_id, physical_table_route) = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.context(TableMetadataManagerSnafu)?;
ensure!(
physical_table_id == table_id,
NotSupportedSnafu {
feat: "REPARTITION on logical tables"
}
);
let table_info = table.table_info();
// Get partition column names from the table metadata.
let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
// Repartition requires the table to have partition columns.
ensure!(
!existing_partition_columns.is_empty(),
InvalidPartitionRuleSnafu {
reason: format!(
"table {} does not have partition columns, cannot repartition",
table_ref
)
}
);
// Repartition operations involving columns outside the existing partition columns are not supported.
// This restriction ensures repartition only applies to current partition columns.
let column_name_and_type = existing_partition_columns
.iter()
.map(|column| (&column.name, column.data_type.clone()))
.collect();
let timezone = query_context.timezone();
// Convert SQL Exprs to PartitionExprs.
let from_partition_exprs = request
.from_exprs
.iter()
.map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
.collect::<Result<Vec<_>>>()?;
let into_partition_exprs = request
.into_exprs
.iter()
.map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
.collect::<Result<Vec<_>>>()?;
// Parse existing partition expressions from region routes.
let mut existing_partition_exprs =
Vec::with_capacity(physical_table_route.region_routes.len());
for route in &physical_table_route.region_routes {
let expr_json = route.region.partition_expr();
if !expr_json.is_empty() {
match PartitionExpr::from_json_str(&expr_json) {
Ok(Some(expr)) => existing_partition_exprs.push(expr),
Ok(None) => {
// Empty
}
Err(e) => {
return Err(e).context(DeserializePartitionExprSnafu);
}
}
}
}
// Validate that from_partition_exprs are a subset of existing partition exprs.
// We compare PartitionExpr directly since it implements Eq.
for from_expr in &from_partition_exprs {
ensure!(
existing_partition_exprs.contains(from_expr),
InvalidPartitionRuleSnafu {
reason: format!(
"partition expression '{}' does not exist in table {}",
from_expr, table_ref
)
}
);
}
// Build the new partition expressions:
// new_exprs = existing_exprs - from_exprs + into_exprs
let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
.into_iter()
.filter(|expr| !from_partition_exprs.contains(expr))
.chain(into_partition_exprs.clone().into_iter())
.collect();
let new_partition_exprs_len = new_partition_exprs.len();
// Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
let _ = MultiDimPartitionRule::try_new(
existing_partition_columns
.iter()
.map(|c| c.name.clone())
.collect(),
vec![],
new_partition_exprs,
true,
)
.context(InvalidPartitionSnafu)?;
info!(
"Repartition table {} (table_id={}) from {:?} to {:?}, new partition count: {}",
table_ref,
table_id,
from_partition_exprs,
into_partition_exprs,
new_partition_exprs_len
);
// TODO(weny): Implement the repartition procedure submission.
// The repartition procedure infrastructure is not yet fully integrated with the DDL task system.
NotSupportedSnafu {
feat: "ALTER TABLE REPARTITION",
}
.fail()
}
#[tracing::instrument(skip_all)]
pub async fn alter_table_inner(
&self,

View File

@@ -239,6 +239,12 @@ impl TableMeta {
.map(|idx| &columns_schemas[*idx].name)
}
pub fn partition_columns(&self) -> impl Iterator<Item = &ColumnSchema> {
self.partition_key_indices
.iter()
.map(|idx| &self.schema.column_schemas()[*idx])
}
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
///
/// The returned builder would derive the next column id of this meta.

View File

@@ -0,0 +1,163 @@
-- Test split partition and merge partition error cases
-- Setup: Create a physical table with partitions
CREATE TABLE repartition_test_table(
device_id INT,
area STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(device_id)
) PARTITION ON COLUMNS (device_id) (
device_id < 100,
device_id >= 100 AND device_id < 200,
device_id >= 200
);
Affected Rows: 0
-- Setup: Create a logical table (metric engine)
CREATE TABLE physical_metric_table(
ts TIMESTAMP TIME INDEX,
val DOUBLE
) ENGINE = metric WITH ("physical_metric_table" = "");
Affected Rows: 0
CREATE TABLE logical_metric_table(
ts TIMESTAMP TIME INDEX,
val DOUBLE,
device_id STRING PRIMARY KEY
) ENGINE = metric WITH ("on_physical_table" = "physical_metric_table");
Affected Rows: 0
-- Test 0: Logical table cannot be repartitioned
ALTER TABLE logical_metric_table REPARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 50 AND device_id < 100
);
Error: 1001(Unsupported), Not supported: REPARTITION on logical tables
ALTER TABLE logical_metric_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 50 AND device_id < 100
);
Error: 1001(Unsupported), Not supported: REPARTITION on logical tables
ALTER TABLE logical_metric_table MERGE PARTITION (
device_id < 100,
device_id >= 100 AND device_id < 200
);
Error: 1001(Unsupported), Not supported: REPARTITION on logical tables
-- Test 1: New partition rule contains non-partition column (ts is not a partition column)
ALTER TABLE repartition_test_table REPARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 50 AND device_id < 100 AND ts < 1000
);
Error: 1004(InvalidArguments), Cannot find column by name: ts
ALTER TABLE repartition_test_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 50 AND device_id < 100 AND ts < 1000
);
Error: 1004(InvalidArguments), Cannot find column by name: ts
-- Test 2: From partition expr does not exist in existing partition exprs
-- device_id < 50 is not in the existing partitions (which are < 100, >= 100 AND < 200, >= 200)
ALTER TABLE repartition_test_table REPARTITION (
device_id < 50
) INTO (
device_id < 25,
device_id >= 25 AND device_id < 50
);
Error: 1004(InvalidArguments), Invalid partition rule: partition expression 'device_id < 50' does not exist in table greptime.public.repartition_test_table
ALTER TABLE repartition_test_table SPLIT PARTITION (
device_id < 50
) INTO (
device_id < 25,
device_id >= 25 AND device_id < 50
);
Error: 1004(InvalidArguments), Invalid partition rule: partition expression 'device_id < 50' does not exist in table greptime.public.repartition_test_table
ALTER TABLE repartition_test_table MERGE PARTITION (
device_id < 50,
device_id >= 50 AND device_id < 75
);
Error: 1004(InvalidArguments), Invalid partition rule: partition expression 'device_id < 50' does not exist in table greptime.public.repartition_test_table
-- Test 3: New partition rule is incomplete (cannot pass checker)
-- This creates a gap: device_id < 50 and device_id >= 100, missing [50, 100)
-- The existing partitions are: device_id < 100, device_id >= 100 AND device_id < 200, device_id >= 200
-- After removing device_id < 100 and adding device_id < 50 and device_id >= 100, we get:
-- device_id < 50, device_id >= 100 AND device_id < 200, device_id >= 200
-- This leaves a gap [50, 100)
ALTER TABLE repartition_test_table REPARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 100
);
Error: 1004(InvalidArguments), Checkpoint `device_id=50` is not covered
ALTER TABLE repartition_test_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 100
);
Error: 1004(InvalidArguments), Checkpoint `device_id=50` is not covered
-- Test 4: New partition rule has overlapping partitions
-- This creates overlapping ranges: device_id < 100 and device_id >= 50 AND device_id < 150
-- After removing device_id < 100, we have: device_id >= 100 AND device_id < 200, device_id >= 200
-- Adding the new ones: device_id < 100, device_id >= 50 AND device_id < 150
-- This overlaps: [0, 100) and [50, 150) overlap in [50, 100)
ALTER TABLE repartition_test_table REPARTITION (
device_id < 100
) INTO (
device_id < 100,
device_id >= 50 AND device_id < 150
);
Error: 1004(InvalidArguments), Checkpoint `device_id=50` is overlapped
ALTER TABLE repartition_test_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 100,
device_id >= 50 AND device_id < 150
);
Error: 1004(InvalidArguments), Checkpoint `device_id=50` is overlapped
-- Cleanup
DROP TABLE repartition_test_table;
Affected Rows: 0
DROP TABLE logical_metric_table;
Affected Rows: 0
DROP TABLE physical_metric_table;
Affected Rows: 0

View File

@@ -0,0 +1,131 @@
-- Test split partition and merge partition error cases
-- Setup: Create a physical table with partitions
CREATE TABLE repartition_test_table(
device_id INT,
area STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(device_id)
) PARTITION ON COLUMNS (device_id) (
device_id < 100,
device_id >= 100 AND device_id < 200,
device_id >= 200
);
-- Setup: Create a logical table (metric engine)
CREATE TABLE physical_metric_table(
ts TIMESTAMP TIME INDEX,
val DOUBLE
) ENGINE = metric WITH ("physical_metric_table" = "");
CREATE TABLE logical_metric_table(
ts TIMESTAMP TIME INDEX,
val DOUBLE,
device_id STRING PRIMARY KEY
) ENGINE = metric WITH ("on_physical_table" = "physical_metric_table");
-- Test 0: Logical table cannot be repartitioned
ALTER TABLE logical_metric_table REPARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 50 AND device_id < 100
);
ALTER TABLE logical_metric_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 50 AND device_id < 100
);
ALTER TABLE logical_metric_table MERGE PARTITION (
device_id < 100,
device_id >= 100 AND device_id < 200
);
-- Test 1: New partition rule contains non-partition column (ts is not a partition column)
ALTER TABLE repartition_test_table REPARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 50 AND device_id < 100 AND ts < 1000
);
ALTER TABLE repartition_test_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 50 AND device_id < 100 AND ts < 1000
);
-- Test 2: From partition expr does not exist in existing partition exprs
-- device_id < 50 is not in the existing partitions (which are < 100, >= 100 AND < 200, >= 200)
ALTER TABLE repartition_test_table REPARTITION (
device_id < 50
) INTO (
device_id < 25,
device_id >= 25 AND device_id < 50
);
ALTER TABLE repartition_test_table SPLIT PARTITION (
device_id < 50
) INTO (
device_id < 25,
device_id >= 25 AND device_id < 50
);
ALTER TABLE repartition_test_table MERGE PARTITION (
device_id < 50,
device_id >= 50 AND device_id < 75
);
-- Test 3: New partition rule is incomplete (cannot pass checker)
-- This creates a gap: device_id < 50 and device_id >= 100, missing [50, 100)
-- The existing partitions are: device_id < 100, device_id >= 100 AND device_id < 200, device_id >= 200
-- After removing device_id < 100 and adding device_id < 50 and device_id >= 100, we get:
-- device_id < 50, device_id >= 100 AND device_id < 200, device_id >= 200
-- This leaves a gap [50, 100)
ALTER TABLE repartition_test_table REPARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 100
);
ALTER TABLE repartition_test_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 50,
device_id >= 100
);
-- Test 4: New partition rule has overlapping partitions
-- This creates overlapping ranges: device_id < 100 and device_id >= 50 AND device_id < 150
-- After removing device_id < 100, we have: device_id >= 100 AND device_id < 200, device_id >= 200
-- Adding the new ones: device_id < 100, device_id >= 50 AND device_id < 150
-- This overlaps: [0, 100) and [50, 150) overlap in [50, 100)
ALTER TABLE repartition_test_table REPARTITION (
device_id < 100
) INTO (
device_id < 100,
device_id >= 50 AND device_id < 150
);
ALTER TABLE repartition_test_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 100,
device_id >= 50 AND device_id < 150
);
-- Cleanup
DROP TABLE repartition_test_table;
DROP TABLE logical_metric_table;
DROP TABLE physical_metric_table;