feat: support alter table partition syntax (#8177)

* feat(sql): support alter table partition syntax

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: support repartition source proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update greptime-proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-05-26 23:06:14 +08:00
committed by GitHub
parent 5943b41067
commit f513b77ccc
8 changed files with 316 additions and 65 deletions

2
Cargo.lock generated
View File

@@ -5826,7 +5826,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=dfd2a6d7d3d9c718cb159fcf9abae144b74fc503#dfd2a6d7d3d9c718cb159fcf9abae144b74fc503"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7224c2ad6d11db612fbdb621c36135fc37ffce35#7224c2ad6d11db612fbdb621c36135fc37ffce35"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",

View File

@@ -158,7 +158,7 @@ fs2 = "0.4"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "dfd2a6d7d3d9c718cb159fcf9abae144b74fc503" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7224c2ad6d11db612fbdb621c36135fc37ffce35" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -15,8 +15,9 @@
use std::sync::Arc;
use std::time::Duration;
use api::v1::Repartition;
use api::v1::alter_table_expr::Kind;
use api::v1::repartition::Source;
use api::v1::{PartitionExprs, Repartition};
use common_error::ext::BoxedError;
use common_procedure::{
BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
@@ -48,7 +49,7 @@ use crate::error::{
self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
UnexpectedLogicalRouteTableSnafu, UnexpectedSnafu, WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
@@ -280,15 +281,30 @@ impl DdlManager {
&self,
table_id: TableId,
table_name: TableName,
Repartition {
from_partition_exprs,
into_partition_exprs,
}: Repartition,
repartition: Repartition,
wait: bool,
timeout: Duration,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let into_partition_exprs = repartition.into_partition_exprs;
let source = repartition.source;
let from_partition_exprs = match source {
Some(Source::PartitionExprs(PartitionExprs { exprs })) => exprs,
Some(Source::Unpartitioned(_)) => {
return UnexpectedSnafu {
err_msg: "Unpartitioned repartition source is not supported yet".to_string(),
}
.fail();
}
None => {
// Reads the deprecated field for backward compatibility with old persisted DDL tasks.
#[allow(deprecated)]
repartition.from_partition_exprs
}
};
let procedure = self
.repartition_procedure_factory
.create(

View File

@@ -689,11 +689,17 @@ pub struct RepartitionRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub from_exprs: Vec<Expr>,
pub source: RepartitionSource,
pub into_exprs: Vec<Expr>,
pub options: OptionMap,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RepartitionSource {
Partitions { from_exprs: Vec<Expr> },
Unpartitioned { partition_columns: Vec<String> },
}
pub(crate) fn to_repartition_request(
alter_table: AlterTable,
query_ctx: &QueryContextRef,
@@ -708,19 +714,37 @@ pub(crate) fn to_repartition_request(
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let AlterTableOperation::Repartition { operation } = alter_operation else {
return InvalidSqlSnafu {
err_msg: "expected REPARTITION operation",
let (source, into_exprs) = match alter_operation {
AlterTableOperation::Repartition { operation } => (
RepartitionSource::Partitions {
from_exprs: operation.from_exprs,
},
operation.into_exprs,
),
AlterTableOperation::Partition { partitions } => (
RepartitionSource::Unpartitioned {
partition_columns: partitions
.column_list
.into_iter()
.map(|ident| ident.value)
.collect(),
},
partitions.exprs,
),
_ => {
return InvalidSqlSnafu {
err_msg: "expected REPARTITION or PARTITION operation",
}
.fail();
}
.fail();
};
Ok(RepartitionRequest {
catalog_name,
schema_name,
table_name,
from_exprs: operation.from_exprs,
into_exprs: operation.into_exprs,
source,
into_exprs,
options,
})
}
@@ -814,6 +838,12 @@ pub(crate) fn to_alter_table_expr(
}
.fail();
}
AlterTableOperation::Partition { .. } => {
return NotSupportedSnafu {
feat: "ALTER TABLE ... PARTITION ON COLUMNS",
}
.fail();
}
AlterTableOperation::SetIndex { options } => {
let option = match options {
sql::statements::alter::SetIndexOperation::Fulltext {
@@ -1687,9 +1717,11 @@ ALTER TABLE metrics REPARTITION (
assert_eq!("greptime", request.catalog_name);
assert_eq!("public", request.schema_name);
assert_eq!("metrics", request.table_name);
let RepartitionSource::Partitions { from_exprs } = request.source else {
unreachable!()
};
assert_eq!(
request
.from_exprs
from_exprs
.into_iter()
.map(|x| x.to_string())
.collect::<Vec<_>>(),
@@ -1708,6 +1740,44 @@ ALTER TABLE metrics REPARTITION (
);
}
#[test]
fn test_to_repartition_request_with_unpartitioned_source() {
let sql = r#"
ALTER TABLE metrics PARTITION ON COLUMNS (device_id, area) (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
);"#;
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::AlterTable(alter_table) = stmt else {
unreachable!()
};
let request = to_repartition_request(alter_table, &QueryContext::arc()).unwrap();
assert_eq!("greptime", request.catalog_name);
assert_eq!("public", request.schema_name);
assert_eq!("metrics", request.table_name);
let RepartitionSource::Unpartitioned { partition_columns } = request.source else {
unreachable!()
};
assert_eq!(partition_columns, vec!["device_id", "area"]);
assert_eq!(
request
.into_exprs
.into_iter()
.map(|x| x.to_string())
.collect::<Vec<_>>(),
vec![
"device_id < 100 AND area < 'South'".to_string(),
"device_id < 100 AND area >= 'South'".to_string()
]
);
}
fn new_test_table_names() -> Vec<TableName> {
vec![
TableName {

View File

@@ -19,9 +19,10 @@ use std::time::Duration;
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_table_expr::Kind;
use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
use api::v1::repartition::Source;
use api::v1::{
AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
Repartition, column_def,
PartitionExprs, Repartition, UnpartitionedSource, column_def,
};
#[cfg(feature = "enterprise")]
use api::v1::{
@@ -102,7 +103,7 @@ use crate::error::{
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
ViewAlreadyExistsSnafu,
};
use crate::expr_helper::{self, RepartitionRequest};
use crate::expr_helper::{self, RepartitionRequest, RepartitionSource};
use crate::statement::StatementExecutor;
use crate::statement::show::create_partitions_stmt;
use crate::utils::{to_meta_query_context, to_meta_query_context_with_origin_frontend};
@@ -1408,7 +1409,7 @@ impl StatementExecutor {
) -> Result<Output> {
if matches!(
alter_table.alter_operation(),
AlterTableOperation::Repartition { .. }
AlterTableOperation::Repartition { .. } | AlterTableOperation::Partition { .. }
) {
let request = expr_helper::to_repartition_request(alter_table, &query_context)?;
return self.repartition_table(request, &query_context).await;
@@ -1468,32 +1469,59 @@ impl StatementExecutor {
);
let table_info = table.table_info();
// Get partition column names from the table metadata.
let existing_partition_columns = table_info.meta.partition_columns().collect::<Vec<_>>();
// Repartition requires the table to have partition columns.
ensure!(
!existing_partition_columns.is_empty(),
InvalidPartitionRuleSnafu {
reason: format!(
"table {} does not have partition columns, cannot repartition",
table_ref
)
let partition_columns = match &request.source {
RepartitionSource::Partitions { .. } => {
ensure!(
!existing_partition_columns.is_empty(),
InvalidPartitionRuleSnafu {
reason: format!(
"table {} does not have partition columns, cannot repartition",
table_ref
)
}
);
existing_partition_columns
}
);
RepartitionSource::Unpartitioned { partition_columns } => {
ensure!(
!partition_columns.is_empty(),
InvalidPartitionRuleSnafu {
reason: "PARTITION ON COLUMNS requires at least one partition column"
}
);
ensure!(
existing_partition_columns.is_empty(),
InvalidPartitionRuleSnafu {
reason: format!("table {} already has partition columns", table_ref)
}
);
let column_schemas = table_info.meta.schema.column_schemas();
partition_columns
.iter()
.map(|column_name| {
column_schemas
.iter()
.find(|column| &column.name == column_name)
.with_context(|| ColumnNotFoundSnafu { msg: column_name })
})
.collect::<Result<Vec<_>>>()?
}
};
// Repartition operations involving columns outside the existing partition columns are not supported.
// This restriction ensures repartition only applies to current partition columns.
let column_name_and_type = existing_partition_columns
let column_name_and_type = partition_columns
.iter()
.map(|column| (&column.name, column.data_type.clone()))
.collect();
let timezone = query_context.timezone();
// Convert SQL Exprs to PartitionExprs.
let from_partition_exprs = request
.from_exprs
.iter()
.map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
.collect::<Result<Vec<_>>>()?;
let from_partition_exprs = match &request.source {
RepartitionSource::Partitions { from_exprs } => from_exprs
.iter()
.map(|expr| convert_one_expr(expr, &column_name_and_type, &timezone))
.collect::<Result<Vec<_>>>()?,
RepartitionSource::Unpartitioned { .. } => vec![],
};
let mut into_partition_exprs = request
.into_exprs
@@ -1503,7 +1531,8 @@ impl StatementExecutor {
// `MERGE PARTITION` (and some `REPARTITION`) generates a single `OR` expression from
// multiple source partitions; try to simplify it for better readability and stability.
if from_partition_exprs.len() > 1
if matches!(&request.source, RepartitionSource::Partitions { .. })
&& from_partition_exprs.len() > 1
&& into_partition_exprs.len() == 1
&& let Some(expr) = into_partition_exprs.pop()
{
@@ -1530,34 +1559,36 @@ impl StatementExecutor {
// Validate that from_partition_exprs are a subset of existing partition exprs.
// We compare PartitionExpr directly since it implements Eq.
for from_expr in &from_partition_exprs {
ensure!(
existing_partition_exprs.contains(from_expr),
InvalidPartitionRuleSnafu {
reason: format!(
"partition expression '{}' does not exist in table {}",
from_expr, table_ref
)
}
);
if matches!(&request.source, RepartitionSource::Partitions { .. }) {
for from_expr in &from_partition_exprs {
ensure!(
existing_partition_exprs.contains(from_expr),
InvalidPartitionRuleSnafu {
reason: format!(
"partition expression '{}' does not exist in table {}",
from_expr, table_ref
)
}
);
}
}
// Build the new partition expressions:
// new_exprs = existing_exprs - from_exprs + into_exprs
let new_partition_exprs: Vec<PartitionExpr> = existing_partition_exprs
.into_iter()
.filter(|expr| !from_partition_exprs.contains(expr))
.chain(into_partition_exprs.clone().into_iter())
.collect();
let new_partition_exprs: Vec<PartitionExpr> = match &request.source {
RepartitionSource::Partitions { .. } => existing_partition_exprs
.into_iter()
.filter(|expr| !from_partition_exprs.contains(expr))
.chain(into_partition_exprs.clone().into_iter())
.collect(),
RepartitionSource::Unpartitioned { .. } => into_partition_exprs.clone(),
};
let new_partition_exprs_len = new_partition_exprs.len();
let from_partition_exprs_len = from_partition_exprs.len();
// Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
let _ = MultiDimPartitionRule::try_new(
existing_partition_columns
.iter()
.map(|c| c.name.clone())
.collect(),
partition_columns.iter().map(|c| c.name.clone()).collect(),
vec![],
new_partition_exprs,
true,
@@ -1574,16 +1605,28 @@ impl StatementExecutor {
};
let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
let source = match &request.source {
RepartitionSource::Partitions { .. } => Source::PartitionExprs(PartitionExprs {
exprs: from_partition_exprs_json,
}),
RepartitionSource::Unpartitioned { partition_columns } => {
Source::Unpartitioned(UnpartitionedSource {
partition_columns: partition_columns.clone(),
})
}
};
let repartition = Repartition {
into_partition_exprs: into_partition_exprs_json,
source: Some(source),
..Default::default()
};
let mut req = SubmitDdlTaskRequest::new(
to_meta_query_context(query_context.clone()),
DdlTask::new_alter_table(AlterTableExpr {
catalog_name: request.catalog_name.clone(),
schema_name: request.schema_name.clone(),
table_name: request.table_name.clone(),
kind: Some(Kind::Repartition(Repartition {
from_partition_exprs: from_partition_exprs_json,
into_partition_exprs: into_partition_exprs_json,
})),
kind: Some(Kind::Repartition(repartition)),
}),
);
req.wait = ddl_options.wait;

View File

@@ -134,6 +134,7 @@ impl ParserContext<'_> {
self.parse_alter_table_merge_partition()?
} else {
match w.keyword {
Keyword::PARTITION => self.parse_alter_table_partition()?,
Keyword::ADD => self.parse_alter_table_add()?,
Keyword::DROP => {
let _ = self.parser.next_token();
@@ -174,7 +175,7 @@ impl ParserContext<'_> {
AlterTableOperation::SetTableOptions { options }
}
_ => self.expected(
"ADD or DROP or MODIFY or RENAME or SET or REPARTITION or SPLIT or MERGE after ALTER TABLE",
"ADD or DROP or MODIFY or RENAME or SET or UNSET or REPARTITION or SPLIT or MERGE or PARTITION after ALTER TABLE",
self.parser.peek_token(),
)?,
}
@@ -218,6 +219,19 @@ impl ParserContext<'_> {
})
}
fn parse_alter_table_partition(&mut self) -> Result<AlterTableOperation> {
let _ = self.parser.next_token();
let partitions = self.parse_partition_on_columns()?;
if partitions.exprs.is_empty() {
return Err(ParserError::ParserError(
"PARTITION ON COLUMNS requires at least one partition expression".to_string(),
))
.context(error::SyntaxSnafu);
}
Ok(AlterTableOperation::Partition { partitions })
}
fn parse_alter_table_split_partition(&mut self) -> Result<AlterTableOperation> {
let _ = self.parser.next_token();
self.parser
@@ -976,6 +990,100 @@ ALTER TABLE t REPARTITION (
}
}
#[test]
fn test_parse_alter_table_partition_on_columns() {
let sql = r#"
ALTER TABLE sensor_readings PARTITION ON COLUMNS (device_id, area) (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South',
device_id >= 100 AND area <= 'East',
device_id >= 100 AND area > 'East'
);"#;
let mut result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
assert_matches!(statement, Statement::AlterTable { .. });
if let Statement::AlterTable(alter_table) = statement {
assert_matches!(
alter_table.alter_operation(),
AlterTableOperation::Partition { .. }
);
if let AlterTableOperation::Partition { partitions } = alter_table.alter_operation() {
assert_eq!(partitions.column_list.len(), 2);
assert_eq!(partitions.column_list[0].value, "device_id");
assert_eq!(partitions.column_list[1].value, "area");
assert_eq!(partitions.exprs.len(), 4);
assert_eq!(
partitions.exprs[0].to_string(),
"device_id < 100 AND area < 'South'"
);
assert_eq!(
partitions.exprs[3].to_string(),
"device_id >= 100 AND area > 'East'"
);
}
}
}
#[test]
fn test_parse_alter_table_partition_on_columns_with_options() {
let sql = r#"
ALTER TABLE sensor_readings PARTITION ON COLUMNS (device_id) (
device_id < 100,
device_id >= 100
) WITH (
TIMEOUT = '5m',
WAIT = false
);"#;
let mut result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
assert_matches!(statement, Statement::AlterTable { .. });
if let Statement::AlterTable(alter_table) = statement {
assert_matches!(
alter_table.alter_operation(),
AlterTableOperation::Partition { .. }
);
let options = alter_table.options().to_str_map();
assert_eq!(options.get("timeout").unwrap(), &"5m");
assert_eq!(options.get("wait").unwrap(), &"false");
assert_eq!(options.len(), 2);
}
}
#[test]
fn test_parse_alter_table_partition_on_columns_empty_columns() {
let sql = r#"
ALTER TABLE sensor_readings PARTITION ON COLUMNS () (
device_id < 100
);"#;
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result.is_err());
}
#[test]
fn test_parse_alter_table_partition_on_columns_empty_exprs() {
let sql = r#"
ALTER TABLE sensor_readings PARTITION ON COLUMNS (device_id) ();"#;
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
assert_eq!(
result.output_msg(),
"Invalid SQL syntax: sql parser error: PARTITION ON COLUMNS requires at least one partition expression"
);
}
#[test]
fn test_parse_alter_table_split_partition() {
let sql = r#"
@@ -1274,7 +1382,7 @@ ALTER TABLE metrics REPARTITION
let err = result.output_msg();
assert_eq!(
err,
"Invalid SQL syntax: sql parser error: Expected ADD or DROP or MODIFY or RENAME or SET or REPARTITION or SPLIT or MERGE after ALTER TABLE, found: table_t"
"Invalid SQL syntax: sql parser error: Expected ADD or DROP or MODIFY or RENAME or SET or UNSET or REPARTITION or SPLIT or MERGE or PARTITION after ALTER TABLE, found: table_t"
);
let sql = "ALTER TABLE test_table RENAME table_t";

View File

@@ -502,6 +502,12 @@ impl<'a> ParserContext<'a> {
if !self.parser.parse_keyword(Keyword::PARTITION) {
return Ok(None);
}
self.parse_partition_on_columns().map(Some)
}
/// Parses the "ON COLUMNS (...) (...)" part after "PARTITION".
pub(crate) fn parse_partition_on_columns(&mut self) -> Result<Partitions> {
self.parser
.expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
.context(error::UnexpectedSnafu {
@@ -520,7 +526,7 @@ impl<'a> ParserContext<'a> {
let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
Ok(Some(Partitions { column_list, exprs }))
Ok(Partitions { column_list, exprs })
}
fn parse_partition_entry(&mut self) -> Result<Expr> {

View File

@@ -26,6 +26,7 @@ use sqlparser::ast::{ColumnDef, DataType, Expr, Ident, ObjectName, TableConstrai
use sqlparser_derive::{Visit, VisitMut};
use crate::statements::OptionMap;
use crate::statements::create::Partitions;
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
pub struct AlterTable {
@@ -119,6 +120,10 @@ pub enum AlterTableOperation {
Repartition {
operation: RepartitionOperation,
},
/// `PARTITION ON COLUMNS (...) (...)`
Partition {
partitions: Partitions,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
@@ -248,6 +253,9 @@ impl Display for AlterTableOperation {
AlterTableOperation::Repartition { operation } => {
write!(f, "REPARTITION {operation}")
}
AlterTableOperation::Partition { partitions } => {
write!(f, "{partitions}")
}
AlterTableOperation::SetIndex { options } => match options {
SetIndexOperation::Fulltext {
column_name,