From f513b77cccb76751e3932694572481f83eba546e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 26 May 2026 23:06:14 +0800 Subject: [PATCH] feat: support alter table partition syntax (#8177) * feat(sql): support alter table partition syntax Signed-off-by: WenyXu * feat: support repartition source proto Signed-off-by: WenyXu * chore: update greptime-proto Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/meta/src/ddl_manager.rs | 28 ++++-- src/operator/src/expr_helper.rs | 88 ++++++++++++++++-- src/operator/src/statement/ddl.rs | 133 ++++++++++++++++++--------- src/sql/src/parsers/alter_parser.rs | 112 +++++++++++++++++++++- src/sql/src/parsers/create_parser.rs | 8 +- src/sql/src/statements/alter.rs | 8 ++ 8 files changed, 316 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0494b4266..b801b342c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5826,7 +5826,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=dfd2a6d7d3d9c718cb159fcf9abae144b74fc503#dfd2a6d7d3d9c718cb159fcf9abae144b74fc503" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7224c2ad6d11db612fbdb621c36135fc37ffce35#7224c2ad6d11db612fbdb621c36135fc37ffce35" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", diff --git a/Cargo.toml b/Cargo.toml index eeddc7099f..32407f31cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,7 +158,7 @@ fs2 = "0.4" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "dfd2a6d7d3d9c718cb159fcf9abae144b74fc503" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7224c2ad6d11db612fbdb621c36135fc37ffce35" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 52af4a36af..550887e315 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -15,8 +15,9 @@ use std::sync::Arc; use std::time::Duration; -use api::v1::Repartition; use api::v1::alter_table_expr::Kind; +use api::v1::repartition::Source; +use api::v1::{PartitionExprs, Repartition}; use common_error::ext::BoxedError; use common_procedure::{ BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, @@ -48,7 +49,7 @@ use crate::error::{ self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, - UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu, + UnexpectedLogicalRouteTableSnafu, UnexpectedSnafu, WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; @@ -280,15 +281,30 @@ impl DdlManager { &self, table_id: TableId, table_name: TableName, - Repartition { - from_partition_exprs, - into_partition_exprs, - }: Repartition, + repartition: Repartition, wait: bool, timeout: Duration, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); + let into_partition_exprs = repartition.into_partition_exprs; + let source = repartition.source; + + let from_partition_exprs = match source { + Some(Source::PartitionExprs(PartitionExprs { exprs })) => exprs, + Some(Source::Unpartitioned(_)) => { + return UnexpectedSnafu { + err_msg: "Unpartitioned repartition source is not supported yet".to_string(), + } + .fail(); + } + None => { + // Reads the deprecated field for backward compatibility with old persisted DDL tasks. + #[allow(deprecated)] + repartition.from_partition_exprs + } + }; + let procedure = self .repartition_procedure_factory .create( diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index 378122030c..af6e7d1032 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -689,11 +689,17 @@ pub struct RepartitionRequest { pub catalog_name: String, pub schema_name: String, pub table_name: String, - pub from_exprs: Vec, + pub source: RepartitionSource, pub into_exprs: Vec, pub options: OptionMap, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RepartitionSource { + Partitions { from_exprs: Vec }, + Unpartitioned { partition_columns: Vec }, +} + pub(crate) fn to_repartition_request( alter_table: AlterTable, query_ctx: &QueryContextRef, @@ -708,19 +714,37 @@ pub(crate) fn to_repartition_request( .map_err(BoxedError::new) .context(ExternalSnafu)?; - let AlterTableOperation::Repartition { operation } = alter_operation else { - return InvalidSqlSnafu { - err_msg: "expected REPARTITION operation", + let (source, into_exprs) = match alter_operation { + AlterTableOperation::Repartition { operation } => ( + RepartitionSource::Partitions { + from_exprs: operation.from_exprs, + }, + operation.into_exprs, + ), + AlterTableOperation::Partition { partitions } => ( + RepartitionSource::Unpartitioned { + partition_columns: partitions + .column_list + .into_iter() + .map(|ident| ident.value) + .collect(), + }, + partitions.exprs, + ), + _ => { + return InvalidSqlSnafu { + err_msg: "expected REPARTITION or PARTITION operation", + } + .fail(); } - .fail(); }; Ok(RepartitionRequest { catalog_name, schema_name, table_name, - from_exprs: operation.from_exprs, - into_exprs: operation.into_exprs, + source, + into_exprs, options, }) } @@ -814,6 +838,12 @@ pub(crate) fn to_alter_table_expr( } .fail(); } + AlterTableOperation::Partition { .. } => { + return NotSupportedSnafu { + feat: "ALTER TABLE ... PARTITION ON COLUMNS", + } + .fail(); + } AlterTableOperation::SetIndex { options } => { let option = match options { sql::statements::alter::SetIndexOperation::Fulltext { @@ -1687,9 +1717,11 @@ ALTER TABLE metrics REPARTITION ( assert_eq!("greptime", request.catalog_name); assert_eq!("public", request.schema_name); assert_eq!("metrics", request.table_name); + let RepartitionSource::Partitions { from_exprs } = request.source else { + unreachable!() + }; assert_eq!( - request - .from_exprs + from_exprs .into_iter() .map(|x| x.to_string()) .collect::>(), @@ -1708,6 +1740,44 @@ ALTER TABLE metrics REPARTITION ( ); } + #[test] + fn test_to_repartition_request_with_unpartitioned_source() { + let sql = r#" +ALTER TABLE metrics PARTITION ON COLUMNS (device_id, area) ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +);"#; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::AlterTable(alter_table) = stmt else { + unreachable!() + }; + + let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap(); + assert_eq!("greptime", request.catalog_name); + assert_eq!("public", request.schema_name); + assert_eq!("metrics", request.table_name); + let RepartitionSource::Unpartitioned { partition_columns } = request.source else { + unreachable!() + }; + assert_eq!(partition_columns, vec!["device_id", "area"]); + assert_eq!( + request + .into_exprs + .into_iter() + .map(|x| x.to_string()) + .collect::>(), + vec![ + "device_id < 100 AND area < 'South'".to_string(), + "device_id < 100 AND area >= 'South'".to_string() + ] + ); + } + fn new_test_table_names() -> Vec { vec![ TableName { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 08d3206548..1fab624f46 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -19,9 +19,10 @@ use std::time::Duration; use api::helper::ColumnDataTypeWrapper; use api::v1::alter_table_expr::Kind; use api::v1::meta::CreateFlowTask as PbCreateFlowTask; +use api::v1::repartition::Source; use api::v1::{ AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, - Repartition, column_def, + PartitionExprs, Repartition, UnpartitionedSource, column_def, }; #[cfg(feature = "enterprise")] use api::v1::{ @@ -102,7 +103,7 @@ use crate::error::{ TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, }; -use crate::expr_helper::{self, RepartitionRequest}; +use crate::expr_helper::{self, RepartitionRequest, RepartitionSource}; use crate::statement::StatementExecutor; use crate::statement::show::create_partitions_stmt; use crate::utils::{to_meta_query_context, to_meta_query_context_with_origin_frontend}; @@ -1408,7 +1409,7 @@ impl StatementExecutor { ) -> Result { if matches!( alter_table.alter_operation(), - AlterTableOperation::Repartition { .. } + AlterTableOperation::Repartition { .. } | AlterTableOperation::Partition { .. } ) { let request = expr_helper::to_repartition_request(alter_table, &query_context)?; return self.repartition_table(request, &query_context).await; @@ -1468,32 +1469,59 @@ impl StatementExecutor { ); let table_info = table.table_info(); - // Get partition column names from the table metadata. let existing_partition_columns = table_info.meta.partition_columns().collect::>(); - // Repartition requires the table to have partition columns. - ensure!( - !existing_partition_columns.is_empty(), - InvalidPartitionRuleSnafu { - reason: format!( - "table {} does not have partition columns, cannot repartition", - table_ref - ) + let partition_columns = match &request.source { + RepartitionSource::Partitions { .. } => { + ensure!( + !existing_partition_columns.is_empty(), + InvalidPartitionRuleSnafu { + reason: format!( + "table {} does not have partition columns, cannot repartition", + table_ref + ) + } + ); + existing_partition_columns } - ); + RepartitionSource::Unpartitioned { partition_columns } => { + ensure!( + !partition_columns.is_empty(), + InvalidPartitionRuleSnafu { + reason: "PARTITION ON COLUMNS requires at least one partition column" + } + ); + ensure!( + existing_partition_columns.is_empty(), + InvalidPartitionRuleSnafu { + reason: format!("table {} already has partition columns", table_ref) + } + ); + let column_schemas = table_info.meta.schema.column_schemas(); + partition_columns + .iter() + .map(|column_name| { + column_schemas + .iter() + .find(|column| &column.name == column_name) + .with_context(|| ColumnNotFoundSnafu { msg: column_name }) + }) + .collect::>>()? + } + }; - // Repartition operations involving columns outside the existing partition columns are not supported. - // This restriction ensures repartition only applies to current partition columns. - let column_name_and_type = existing_partition_columns + let column_name_and_type = partition_columns .iter() .map(|column| (&column.name, column.data_type.clone())) .collect(); let timezone = query_context.timezone(); // Convert SQL Exprs to PartitionExprs. - let from_partition_exprs = request - .from_exprs - .iter() - .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone)) - .collect::>>()?; + let from_partition_exprs = match &request.source { + RepartitionSource::Partitions { from_exprs } => from_exprs + .iter() + .map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone)) + .collect::>>()?, + RepartitionSource::Unpartitioned { .. } => vec![], + }; let mut into_partition_exprs = request .into_exprs @@ -1503,7 +1531,8 @@ impl StatementExecutor { // `MERGE PARTITION` (and some `REPARTITION`) generates a single `OR` expression from // multiple source partitions; try to simplify it for better readability and stability. - if from_partition_exprs.len() > 1 + if matches!(&request.source, RepartitionSource::Partitions { .. }) + && from_partition_exprs.len() > 1 && into_partition_exprs.len() == 1 && let Some(expr) = into_partition_exprs.pop() { @@ -1530,34 +1559,36 @@ impl StatementExecutor { // Validate that from_partition_exprs are a subset of existing partition exprs. // We compare PartitionExpr directly since it implements Eq. - for from_expr in &from_partition_exprs { - ensure!( - existing_partition_exprs.contains(from_expr), - InvalidPartitionRuleSnafu { - reason: format!( - "partition expression '{}' does not exist in table {}", - from_expr, table_ref - ) - } - ); + if matches!(&request.source, RepartitionSource::Partitions { .. }) { + for from_expr in &from_partition_exprs { + ensure!( + existing_partition_exprs.contains(from_expr), + InvalidPartitionRuleSnafu { + reason: format!( + "partition expression '{}' does not exist in table {}", + from_expr, table_ref + ) + } + ); + } } // Build the new partition expressions: // new_exprs = existing_exprs - from_exprs + into_exprs - let new_partition_exprs: Vec = existing_partition_exprs - .into_iter() - .filter(|expr| !from_partition_exprs.contains(expr)) - .chain(into_partition_exprs.clone().into_iter()) - .collect(); + let new_partition_exprs: Vec = match &request.source { + RepartitionSource::Partitions { .. } => existing_partition_exprs + .into_iter() + .filter(|expr| !from_partition_exprs.contains(expr)) + .chain(into_partition_exprs.clone().into_iter()) + .collect(), + RepartitionSource::Unpartitioned { .. } => into_partition_exprs.clone(), + }; let new_partition_exprs_len = new_partition_exprs.len(); let from_partition_exprs_len = from_partition_exprs.len(); // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker. let _ = MultiDimPartitionRule::try_new( - existing_partition_columns - .iter() - .map(|c| c.name.clone()) - .collect(), + partition_columns.iter().map(|c| c.name.clone()).collect(), vec![], new_partition_exprs, true, @@ -1574,16 +1605,28 @@ impl StatementExecutor { }; let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?; let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?; + let source = match &request.source { + RepartitionSource::Partitions { .. } => Source::PartitionExprs(PartitionExprs { + exprs: from_partition_exprs_json, + }), + RepartitionSource::Unpartitioned { partition_columns } => { + Source::Unpartitioned(UnpartitionedSource { + partition_columns: partition_columns.clone(), + }) + } + }; + let repartition = Repartition { + into_partition_exprs: into_partition_exprs_json, + source: Some(source), + ..Default::default() + }; let mut req = SubmitDdlTaskRequest::new( to_meta_query_context(query_context.clone()), DdlTask::new_alter_table(AlterTableExpr { catalog_name: request.catalog_name.clone(), schema_name: request.schema_name.clone(), table_name: request.table_name.clone(), - kind: Some(Kind::Repartition(Repartition { - from_partition_exprs: from_partition_exprs_json, - into_partition_exprs: into_partition_exprs_json, - })), + kind: Some(Kind::Repartition(repartition)), }), ); req.wait = ddl_options.wait; diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs index e5e1575a20..4aa571005a 100644 --- a/src/sql/src/parsers/alter_parser.rs +++ b/src/sql/src/parsers/alter_parser.rs @@ -134,6 +134,7 @@ impl ParserContext<'_> { self.parse_alter_table_merge_partition()? } else { match w.keyword { + Keyword::PARTITION => self.parse_alter_table_partition()?, Keyword::ADD => self.parse_alter_table_add()?, Keyword::DROP => { let _ = self.parser.next_token(); @@ -174,7 +175,7 @@ impl ParserContext<'_> { AlterTableOperation::SetTableOptions { options } } _ => self.expected( - "ADD or DROP or MODIFY or RENAME or SET or REPARTITION or SPLIT or MERGE after ALTER TABLE", + "ADD or DROP or MODIFY or RENAME or SET or UNSET or REPARTITION or SPLIT or MERGE or PARTITION after ALTER TABLE", self.parser.peek_token(), )?, } @@ -218,6 +219,19 @@ impl ParserContext<'_> { }) } + fn parse_alter_table_partition(&mut self) -> Result { + let _ = self.parser.next_token(); + let partitions = self.parse_partition_on_columns()?; + if partitions.exprs.is_empty() { + return Err(ParserError::ParserError( + "PARTITION ON COLUMNS requires at least one partition expression".to_string(), + )) + .context(error::SyntaxSnafu); + } + + Ok(AlterTableOperation::Partition { partitions }) + } + fn parse_alter_table_split_partition(&mut self) -> Result { let _ = self.parser.next_token(); self.parser @@ -976,6 +990,100 @@ ALTER TABLE t REPARTITION ( } } + #[test] + fn test_parse_alter_table_partition_on_columns() { + let sql = r#" +ALTER TABLE sensor_readings PARTITION ON COLUMNS (device_id, area) ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South', + device_id >= 100 AND area <= 'East', + device_id >= 100 AND area > 'East' +);"#; + let mut result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + assert_matches!(statement, Statement::AlterTable { .. }); + if let Statement::AlterTable(alter_table) = statement { + assert_matches!( + alter_table.alter_operation(), + AlterTableOperation::Partition { .. } + ); + + if let AlterTableOperation::Partition { partitions } = alter_table.alter_operation() { + assert_eq!(partitions.column_list.len(), 2); + assert_eq!(partitions.column_list[0].value, "device_id"); + assert_eq!(partitions.column_list[1].value, "area"); + assert_eq!(partitions.exprs.len(), 4); + assert_eq!( + partitions.exprs[0].to_string(), + "device_id < 100 AND area < 'South'" + ); + assert_eq!( + partitions.exprs[3].to_string(), + "device_id >= 100 AND area > 'East'" + ); + } + } + } + + #[test] + fn test_parse_alter_table_partition_on_columns_with_options() { + let sql = r#" +ALTER TABLE sensor_readings PARTITION ON COLUMNS (device_id) ( + device_id < 100, + device_id >= 100 +) WITH ( + TIMEOUT = '5m', + WAIT = false +);"#; + let mut result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + assert_matches!(statement, Statement::AlterTable { .. }); + if let Statement::AlterTable(alter_table) = statement { + assert_matches!( + alter_table.alter_operation(), + AlterTableOperation::Partition { .. } + ); + let options = alter_table.options().to_str_map(); + assert_eq!(options.get("timeout").unwrap(), &"5m"); + assert_eq!(options.get("wait").unwrap(), &"false"); + assert_eq!(options.len(), 2); + } + } + + #[test] + fn test_parse_alter_table_partition_on_columns_empty_columns() { + let sql = r#" +ALTER TABLE sensor_readings PARTITION ON COLUMNS () ( + device_id < 100 +);"#; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + + assert!(result.is_err()); + } + + #[test] + fn test_parse_alter_table_partition_on_columns_empty_exprs() { + let sql = r#" +ALTER TABLE sensor_readings PARTITION ON COLUMNS (device_id) ();"#; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap_err(); + + assert_eq!( + result.output_msg(), + "Invalid SQL syntax: sql parser error: PARTITION ON COLUMNS requires at least one partition expression" + ); + } + #[test] fn test_parse_alter_table_split_partition() { let sql = r#" @@ -1274,7 +1382,7 @@ ALTER TABLE metrics REPARTITION let err = result.output_msg(); assert_eq!( err, - "Invalid SQL syntax: sql parser error: Expected ADD or DROP or MODIFY or RENAME or SET or REPARTITION or SPLIT or MERGE after ALTER TABLE, found: table_t" + "Invalid SQL syntax: sql parser error: Expected ADD or DROP or MODIFY or RENAME or SET or UNSET or REPARTITION or SPLIT or MERGE or PARTITION after ALTER TABLE, found: table_t" ); let sql = "ALTER TABLE test_table RENAME table_t"; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index a82590c603..b2614bb8d5 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -502,6 +502,12 @@ impl<'a> ParserContext<'a> { if !self.parser.parse_keyword(Keyword::PARTITION) { return Ok(None); } + + self.parse_partition_on_columns().map(Some) + } + + /// Parses the "ON COLUMNS (...) (...)" part after "PARTITION". + pub(crate) fn parse_partition_on_columns(&mut self) -> Result { self.parser .expect_keywords(&[Keyword::ON, Keyword::COLUMNS]) .context(error::UnexpectedSnafu { @@ -520,7 +526,7 @@ impl<'a> ParserContext<'a> { let exprs = self.parse_comma_separated(Self::parse_partition_entry)?; - Ok(Some(Partitions { column_list, exprs })) + Ok(Partitions { column_list, exprs }) } fn parse_partition_entry(&mut self) -> Result { diff --git a/src/sql/src/statements/alter.rs b/src/sql/src/statements/alter.rs index ab35e5bd34..72182a4b60 100644 --- a/src/sql/src/statements/alter.rs +++ b/src/sql/src/statements/alter.rs @@ -26,6 +26,7 @@ use sqlparser::ast::{ColumnDef, DataType, Expr, Ident, ObjectName, TableConstrai use sqlparser_derive::{Visit, VisitMut}; use crate::statements::OptionMap; +use crate::statements::create::Partitions; #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] pub struct AlterTable { @@ -119,6 +120,10 @@ pub enum AlterTableOperation { Repartition { operation: RepartitionOperation, }, + /// `PARTITION ON COLUMNS (...) (...)` + Partition { + partitions: Partitions, + }, } #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] @@ -248,6 +253,9 @@ impl Display for AlterTableOperation { AlterTableOperation::Repartition { operation } => { write!(f, "REPARTITION {operation}") } + AlterTableOperation::Partition { partitions } => { + write!(f, "{partitions}") + } AlterTableOperation::SetIndex { options } => match options { SetIndexOperation::Fulltext { column_name,