diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index b3a503a499..bd77b20d5f 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -682,6 +682,40 @@ pub fn column_schemas_to_defs( .collect() } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RepartitionRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub from_exprs: Vec, + pub into_exprs: Vec, +} + +pub(crate) fn to_repartition_request( + alter_table: AlterTable, + query_ctx: &QueryContextRef, +) -> Result { + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(alter_table.table_name(), query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else { + return InvalidSqlSnafu { + err_msg: "expected REPARTITION operation", + } + .fail(); + }; + + Ok(RepartitionRequest { + catalog_name, + schema_name, + table_name, + from_exprs: operation.from_exprs, + into_exprs: operation.into_exprs, + }) +} + /// Converts a SQL alter table statement into a gRPC alter table expression. pub(crate) fn to_alter_table_expr( alter_table: AlterTable, @@ -764,6 +798,12 @@ pub(crate) fn to_alter_table_expr( AlterTableOperation::UnsetTableOptions { keys } => { AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys }) } + AlterTableOperation::Repartition { .. } => { + return NotSupportedSnafu { + feat: "ALTER TABLE ... REPARTITION", + } + .fail(); + } AlterTableOperation::SetIndex { options } => { let option = match options { sql::statements::alter::SetIndexOperation::Fulltext { @@ -1391,6 +1431,50 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; assert!(modify_column_type.target_type_extension.is_none()); } + #[test] + fn test_to_repartition_request() { + let sql = r#" +ALTER TABLE metrics REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +);"#; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::AlterTable(alter_table) = stmt else { + unreachable!() + }; + + let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap(); + assert_eq!("greptime", request.catalog_name); + assert_eq!("public", request.schema_name); + assert_eq!("metrics", request.table_name); + assert_eq!( + request + .from_exprs + .into_iter() + .map(|x| x.to_string()) + .collect::>(), + vec!["device_id < 100".to_string()] + ); + assert_eq!( + request + .into_exprs + .into_iter() + .map(|x| x.to_string()) + .collect::>(), + vec![ + "device_id < 100 AND area < 'South'".to_string(), + "device_id < 100 AND area >= 'South'".to_string() + ] + ); + } + fn new_test_table_names() -> Vec { vec![ TableName { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 8bd454a71d..aa287e75d7 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -66,7 +66,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use sql::parser::{ParseOptions, ParserContext}; #[cfg(feature = "enterprise")] use sql::statements::alter::trigger::AlterTrigger; -use sql::statements::alter::{AlterDatabase, AlterTable}; +use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation}; #[cfg(feature = "enterprise")] use sql::statements::create::trigger::CreateTrigger; use sql::statements::create::{ @@ -87,10 +87,10 @@ use crate::error::{ ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, - InvalidViewNameSnafu, InvalidViewStmtSnafu, PartitionExprToPbSnafu, Result, SchemaInUseSnafu, - SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, - TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, - ViewAlreadyExistsSnafu, + InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result, + SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, + TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, + UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, }; use crate::expr_helper; use crate::statement::StatementExecutor; @@ -1194,6 +1194,17 @@ impl StatementExecutor { alter_table: AlterTable, query_context: QueryContextRef, ) -> Result { + if matches!( + alter_table.alter_operation(), + AlterTableOperation::Repartition { .. } + ) { + let _request = expr_helper::to_repartition_request(alter_table, &query_context)?; + return NotSupportedSnafu { + feat: "ALTER TABLE REPARTITION", + } + .fail(); + } + let expr = expr_helper::to_alter_table_expr(alter_table, &query_context)?; self.alter_table_inner(expr, query_context).await } diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs index bbbcb1d2f6..5f31ed8dc5 100644 --- a/src/sql/src/parsers/alter_parser.rs +++ b/src/sql/src/parsers/alter_parser.rs @@ -20,7 +20,7 @@ use std::collections::HashMap; use common_query::AddColumnLocation; use datatypes::schema::COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE; use snafu::{ResultExt, ensure}; -use sqlparser::ast::Ident; +use sqlparser::ast::{Expr, Ident}; use sqlparser::keywords::Keyword; use sqlparser::parser::{Parser, ParserError}; use sqlparser::tokenizer::{Token, TokenWithSpan}; @@ -34,8 +34,8 @@ use crate::parsers::utils::{ }; use crate::statements::alter::{ AddColumn, AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation, - DropDefaultsOperation, KeyValueOption, SetDefaultsOperation, SetIndexOperation, - UnsetIndexOperation, + DropDefaultsOperation, KeyValueOption, RepartitionOperation, SetDefaultsOperation, + SetIndexOperation, UnsetIndexOperation, }; use crate::statements::statement::Statement; use crate::util::parse_option_string; @@ -124,6 +124,8 @@ impl ParserContext<'_> { self.parse_alter_table_modify()? } else if w.value.eq_ignore_ascii_case("UNSET") { self.parse_alter_table_unset()? + } else if w.value.eq_ignore_ascii_case("REPARTITION") { + self.parse_alter_table_repartition()? } else { match w.keyword { Keyword::ADD => self.parse_alter_table_add()?, @@ -166,7 +168,7 @@ impl ParserContext<'_> { AlterTableOperation::SetTableOptions { options } } _ => self.expected( - "ADD or DROP or MODIFY or RENAME or SET after ALTER TABLE", + "ADD or DROP or MODIFY or RENAME or SET or REPARTITION after ALTER TABLE", self.parser.peek_token(), )?, } @@ -189,6 +191,65 @@ impl ParserContext<'_> { Ok(AlterTableOperation::UnsetTableOptions { keys }) } + fn parse_alter_table_repartition(&mut self) -> Result { + let _ = self.parser.next_token(); + + let from_exprs = self.parse_repartition_expr_list()?; + self.parser + .expect_keyword(Keyword::INTO) + .context(error::SyntaxSnafu)?; + let into_exprs = self.parse_repartition_expr_list()?; + + if matches!(self.parser.peek_token().token, Token::Comma) { + return self.expected("end of REPARTITION clause", self.parser.peek_token()); + } + + Ok(AlterTableOperation::Repartition { + operation: RepartitionOperation::new(from_exprs, into_exprs), + }) + } + + fn parse_repartition_expr_list(&mut self) -> Result> { + self.parser + .expect_token(&Token::LParen) + .context(error::SyntaxSnafu)?; + + if matches!(self.parser.peek_token().token, Token::RParen) { + return self.expected( + "expression inside REPARTITION clause", + self.parser.peek_token(), + ); + } + + let mut exprs = Vec::new(); + loop { + let expr = self.parser.parse_expr().context(error::SyntaxSnafu)?; + exprs.push(expr); + + match self.parser.peek_token().token { + Token::Comma => { + self.parser.next_token(); + if matches!(self.parser.peek_token().token, Token::RParen) { + self.parser.next_token(); + break; + } + } + Token::RParen => { + self.parser.next_token(); + break; + } + _ => { + return self.expected( + "comma or right parenthesis after repartition expression", + self.parser.peek_token(), + ); + } + } + } + + Ok(exprs) + } + fn parse_alter_table_add(&mut self) -> Result { let _ = self.parser.next_token(); if let Some(constraint) = self @@ -809,6 +870,70 @@ mod tests { } } + #[test] + fn test_parse_alter_table_repartition() { + let sql = r#" +ALTER TABLE t REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South', +);"#; + let mut result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + assert_matches!(statement, Statement::AlterTable { .. }); + if let Statement::AlterTable(alter_table) = statement { + assert_matches!( + alter_table.alter_operation(), + AlterTableOperation::Repartition { .. } + ); + + if let AlterTableOperation::Repartition { operation } = alter_table.alter_operation() { + assert_eq!(operation.from_exprs.len(), 1); + assert_eq!(operation.from_exprs[0].to_string(), "device_id < 100"); + assert_eq!(operation.into_exprs.len(), 2); + assert_eq!( + operation.into_exprs[0].to_string(), + "device_id < 100 AND area < 'South'" + ); + assert_eq!( + operation.into_exprs[1].to_string(), + "device_id < 100 AND area >= 'South'" + ); + } + } + } + + #[test] + fn test_parse_alter_table_repartition_multiple() { + let sql = r#" +ALTER TABLE metrics REPARTITION +( + a < 10, + a >= 10 +) INTO ( + a < 20 +), +( + b < 20 +) INTO ( + b < 10, + b >= 10, +);"#; + + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap_err(); + assert_eq!( + result.output_msg(), + "Invalid SQL syntax: sql parser error: Expected end of REPARTITION clause, found: ," + ); + } + #[test] fn test_parse_alter_drop_column() { let sql = "ALTER TABLE my_metric_1 DROP a"; @@ -966,7 +1091,7 @@ mod tests { let err = result.output_msg(); assert_eq!( err, - "Invalid SQL syntax: sql parser error: Expected ADD or DROP or MODIFY or RENAME or SET after ALTER TABLE, found: table_t" + "Invalid SQL syntax: sql parser error: Expected ADD or DROP or MODIFY or RENAME or SET or REPARTITION after ALTER TABLE, found: table_t" ); let sql = "ALTER TABLE test_table RENAME table_t"; diff --git a/src/sql/src/statements/alter.rs b/src/sql/src/statements/alter.rs index 6507701f49..ae5bce08c8 100644 --- a/src/sql/src/statements/alter.rs +++ b/src/sql/src/statements/alter.rs @@ -102,6 +102,10 @@ pub enum AlterTableOperation { SetDefaults { defaults: Vec, }, + /// `REPARTITION (...) INTO (...)` + Repartition { + operation: RepartitionOperation, + }, } #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] @@ -114,6 +118,38 @@ pub struct SetDefaultsOperation { pub default_constraint: Expr, } +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] +pub struct RepartitionOperation { + pub from_exprs: Vec, + pub into_exprs: Vec, +} + +impl RepartitionOperation { + pub fn new(from_exprs: Vec, into_exprs: Vec) -> Self { + Self { + from_exprs, + into_exprs, + } + } +} + +impl Display for RepartitionOperation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let from = self + .from_exprs + .iter() + .map(|expr| expr.to_string()) + .join(", "); + let into = self + .into_exprs + .iter() + .map(|expr| expr.to_string()) + .join(", "); + + write!(f, "({from}) INTO ({into})") + } +} + #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] pub enum SetIndexOperation { /// `MODIFY COLUMN SET FULLTEXT INDEX [WITH ]` @@ -196,6 +232,9 @@ impl Display for AlterTableOperation { let keys = keys.iter().map(|k| format!("'{k}'")).join(","); write!(f, "UNSET {keys}") } + AlterTableOperation::Repartition { operation } => { + write!(f, "REPARTITION {operation}") + } AlterTableOperation::SetIndex { options } => match options { SetIndexOperation::Fulltext { column_name, diff --git a/tests/cases/standalone/common/alter/repartition.result b/tests/cases/standalone/common/alter/repartition.result new file mode 100644 index 0000000000..e318324631 --- /dev/null +++ b/tests/cases/standalone/common/alter/repartition.result @@ -0,0 +1,48 @@ +CREATE TABLE alter_repartition_table( + device_id INT, + area STRING, + ty STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(device_id) +) PARTITION ON COLUMNS (device_id, area) ( + device_id < 100, + device_id >= 100 AND device_id < 200, + device_id >= 200 +); + +Affected Rows: 0 + +-- valid grammar, currently not implemented +ALTER TABLE alter_repartition_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +); + +Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION + +-- invalid: empty source clause +ALTER TABLE alter_repartition_table REPARTITION () INTO ( + device_id < 100 +); + +Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected expression inside REPARTITION clause, found: ) + +-- invalid: more than one INTO clause +ALTER TABLE alter_repartition_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50 +), ( + device_id >= 50 +) INTO ( + device_id >= 50 +); + +Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected end of REPARTITION clause, found: , + +DROP TABLE alter_repartition_table; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/repartition.sql b/tests/cases/standalone/common/alter/repartition.sql new file mode 100644 index 0000000000..64010b222a --- /dev/null +++ b/tests/cases/standalone/common/alter/repartition.sql @@ -0,0 +1,37 @@ +CREATE TABLE alter_repartition_table( + device_id INT, + area STRING, + ty STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(device_id) +) PARTITION ON COLUMNS (device_id, area) ( + device_id < 100, + device_id >= 100 AND device_id < 200, + device_id >= 200 +); + +-- valid grammar, currently not implemented +ALTER TABLE alter_repartition_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +); + +-- invalid: empty source clause +ALTER TABLE alter_repartition_table REPARTITION () INTO ( + device_id < 100 +); + +-- invalid: more than one INTO clause +ALTER TABLE alter_repartition_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50 +), ( + device_id >= 50 +) INTO ( + device_id >= 50 +); + +DROP TABLE alter_repartition_table;