From 567d3e66e90af22cef4804dad807c85eaa81df2c Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 9 Jan 2026 16:37:21 +0800 Subject: [PATCH] feat: integrate repartition procedure into `DdlManager` (#7548) * feat: add repartition procedure factory support to DdlManager - Introduce RepartitionProcedureFactory trait for creating and registering repartition procedures - Implement DefaultRepartitionProcedureFactory for metasrv with full support - Implement StandaloneRepartitionProcedureFactory for standalone (unsupported) - Add procedure loader registration for RepartitionProcedure and RepartitionGroupProcedure - Add helper methods to TableMetadataAllocator for allocator access - Add error types for repartition procedure operations - Update DdlManager to accept and use RepartitionProcedureFactoryRef Signed-off-by: WenyXu * feat: integrate repartition procedure into DdlManager - Add submit_repartition_task() to handle repartition from alter table - Route Repartition operations in submit_alter_table_task() to repartition factory - Refactor: rename submit_procedure() to execute_procedure_and_wait() - Make all DDL operations wait for completion by default - Add submit_procedure() for fire-and-forget submissions - Add CreateRepartitionProcedure error type - Add placeholder Repartition handling in grpc-expr (unsupported) - Update greptime-proto dependency Signed-off-by: WenyXu * feat: implement ALTER TABLE REPARTITION procedure submission Signed-off-by: WenyXu * refactor(repartition): handle central region in apply staging manifest - Introduce ApplyStagingManifestInstructions struct to organize instructions - Add special handling for central region when applying staging manifests - Transition state from UpdateMetadata to RepartitionEnd after applying staging manifests - Remove next_state() method in RepartitionStart and inline state transitions - Improve logging and expression serialization in DDL statement executor - Move repartition tests from standalone to distributed test suite Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * chore: update proto Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/cmd/src/standalone.rs | 11 +- src/common/grpc-expr/src/alter.rs | 6 +- src/common/grpc-expr/src/error.rs | 8 + .../src/ddl/alter_table/region_request.rs | 6 +- src/common/meta/src/ddl/table_meta.rs | 11 + src/common/meta/src/ddl_manager.rs | 175 ++++++++++-- src/common/meta/src/error.rs | 16 ++ src/meta-srv/src/error.rs | 22 +- src/meta-srv/src/metasrv/builder.rs | 14 +- src/meta-srv/src/procedure/repartition.rs | 264 +++++++++++++++++- .../src/procedure/repartition/group.rs | 51 +++- .../group/apply_staging_manifest.rs | 68 ++++- .../repartition/group/repartition_start.rs | 18 +- src/operator/src/error.rs | 12 +- src/operator/src/statement/ddl.rs | 63 ++++- src/standalone/Cargo.toml | 1 + src/standalone/src/error.rs | 7 + src/standalone/src/lib.rs | 2 +- src/standalone/src/procedure.rs | 38 ++- tests-integration/src/standalone.rs | 3 + .../repartition/repartition.result | 101 +++++++ .../repartition}/repartition.sql | 17 +- .../common/alter/repartition.result | 66 ----- 25 files changed, 832 insertions(+), 153 deletions(-) create mode 100644 tests/cases/distributed/repartition/repartition.result rename tests/cases/{standalone/common/alter => distributed/repartition}/repartition.sql (76%) delete mode 100644 tests/cases/standalone/common/alter/repartition.result diff --git a/Cargo.lock b/Cargo.lock index 1cf1bfeb48..72de81f9f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5467,7 +5467,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0e316b86d765e4718d6f0ca77b1ad179f222b822#0e316b86d765e4718d6f0ca77b1ad179f222b822" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=58aeee49267fb1eafa6f9123f9d0c47dd0f62722#58aeee49267fb1eafa6f9123f9d0c47dd0f62722" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", @@ -12512,6 +12512,7 @@ dependencies = [ "servers", "snafu 0.8.6", "store-api", + "table", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 6829c0f18a..1315107887 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0e316b86d765e4718d6f0ca77b1ad179f222b822" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "58aeee49267fb1eafa6f9123f9d0c47dd0f62722" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 9569dfb1e8..b605c7a66f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -64,8 +64,8 @@ use plugins::frontend::context::{ use plugins::standalone::context::DdlManagerConfigureContext; use servers::tls::{TlsMode, TlsOption, merge_tls_option}; use snafu::ResultExt; -use standalone::StandaloneInformationExtension; use standalone::options::StandaloneOptions; +use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{OtherSnafu, Result, StartFlownodeSnafu}; @@ -509,8 +509,13 @@ impl StartCommand { region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }; - let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true) - .context(error::InitDdlManagerSnafu)?; + let ddl_manager = DdlManager::try_new( + ddl_context, + procedure_manager.clone(), + Arc::new(StandaloneRepartitionProcedureFactory), + true, + ) + .context(error::InitDdlManagerSnafu)?; let ddl_manager = if let Some(configurator) = plugins.get::>() diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index cdca83663b..706f86178e 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -34,7 +34,7 @@ use table::requests::{ }; use crate::error::{ - ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu, + self, ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu, InvalidSetFulltextOptionRequestSnafu, InvalidSetSkippingIndexOptionRequestSnafu, InvalidSetTableOptionRequestSnafu, InvalidUnsetTableOptionRequestSnafu, MissingAlterIndexOptionSnafu, MissingFieldSnafu, MissingTableMetaSnafu, @@ -251,6 +251,10 @@ pub fn alter_expr_to_request( .collect::>>()?; AlterKind::SetDefaults { defaults } } + Kind::Repartition(_) => error::UnexpectedSnafu { + err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest", + } + .fail()?, }; let request = AlterTableRequest { diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index aab1adb672..31f19ba4ea 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -161,6 +161,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Unexpected: {err_msg}"))] + Unexpected { + err_msg: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -188,6 +195,7 @@ impl ErrorExt for Error { Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, Error::SqlCommon { source, .. } => source.status_code(), Error::MissingTableMeta { .. } => StatusCode::Unexpected, + Error::Unexpected { .. } => StatusCode::Unexpected, } } diff --git a/src/common/meta/src/ddl/alter_table/region_request.rs b/src/common/meta/src/ddl/alter_table/region_request.rs index c087d11c27..3a6f541cb1 100644 --- a/src/common/meta/src/ddl/alter_table/region_request.rs +++ b/src/common/meta/src/ddl/alter_table/region_request.rs @@ -22,7 +22,7 @@ use snafu::OptionExt; use table::metadata::RawTableInfo; use crate::ddl::alter_table::AlterTableProcedure; -use crate::error::{InvalidProtoMsgSnafu, Result}; +use crate::error::{self, InvalidProtoMsgSnafu, Result}; impl AlterTableProcedure { /// Makes alter kind proto that all regions can reuse. @@ -112,6 +112,10 @@ fn create_proto_alter_kind( Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))), Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))), Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))), + Kind::Repartition(_) => error::UnexpectedSnafu { + err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest", + } + .fail()?, } } diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index 2ba94edc99..10782d7fa1 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -161,7 +161,18 @@ impl TableMetadataAllocator { }) } + /// Returns the table id allocator. pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef { self.table_id_allocator.clone() } + + /// Returns the wal options allocator. + pub fn wal_options_allocator(&self) -> WalOptionsAllocatorRef { + self.wal_options_allocator.clone() + } + + /// Returns the region routes allocator. + pub fn region_routes_allocator(&self) -> RegionRoutesAllocatorRef { + self.region_routes_allocator.clone() + } } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1db189c783..05c3438734 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -14,15 +14,19 @@ use std::sync::Arc; +use api::v1::Repartition; +use api::v1::alter_table_expr::Kind; use common_error::ext::BoxedError; use common_procedure::{ - BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher, + BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, + ProcedureWithId, watcher, }; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, info, tracing}; use derive_builder::Builder; use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::TableId; +use table::table_name::TableName; use crate::ddl::alter_database::AlterDatabaseProcedure; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; @@ -40,7 +44,8 @@ use crate::ddl::drop_view::DropViewProcedure; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{DdlContext, utils}; use crate::error::{ - EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, + CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu, + RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu, }; @@ -90,6 +95,7 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade pub struct DdlManager { ddl_context: DdlContext, procedure_manager: ProcedureManagerRef, + repartition_procedure_factory: RepartitionProcedureFactoryRef, #[cfg(feature = "enterprise")] trigger_ddl_manager: Option, } @@ -143,16 +149,37 @@ macro_rules! procedure_loader { }; } +pub type RepartitionProcedureFactoryRef = Arc; + +pub trait RepartitionProcedureFactory: Send + Sync { + fn create( + &self, + ddl_ctx: &DdlContext, + table_name: TableName, + table_id: TableId, + from_exprs: Vec, + to_exprs: Vec, + ) -> std::result::Result; + + fn register_loaders( + &self, + ddl_ctx: &DdlContext, + procedure_manager: &ProcedureManagerRef, + ) -> std::result::Result<(), BoxedError>; +} + impl DdlManager { /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. pub fn try_new( ddl_context: DdlContext, procedure_manager: ProcedureManagerRef, + repartition_procedure_factory: RepartitionProcedureFactoryRef, register_loaders: bool, ) -> Result { let manager = Self { ddl_context, procedure_manager, + repartition_procedure_factory, #[cfg(feature = "enterprise")] trigger_ddl_manager: None, }; @@ -204,9 +231,63 @@ impl DdlManager { .context(RegisterProcedureLoaderSnafu { type_name })?; } + self.repartition_procedure_factory + .register_loaders(&self.ddl_context, &self.procedure_manager) + .context(RegisterRepartitionProcedureLoaderSnafu)?; + Ok(()) } + /// Submits a repartition procedure for the specified table. + /// + /// This creates a repartition procedure using the provided `table_id`, + /// `table_name`, and `Repartition` configuration, and then either executes it + /// to completion or just submits it for asynchronous execution. + /// + /// The `Repartition` argument contains the original (`from_partition_exprs`) + /// and target (`into_partition_exprs`) partition expressions that define how + /// the table should be repartitioned. + /// + /// The `wait` flag controls whether this method waits for the repartition + /// procedure to finish: + /// - If `wait` is `true`, the procedure is executed and this method awaits + /// its completion, returning both the generated `ProcedureId` and the + /// final `Output` of the procedure. + /// - If `wait` is `false`, the procedure is only submitted to the procedure + /// manager for asynchronous execution, and this method returns the + /// `ProcedureId` along with `None` as the output. + async fn submit_repartition_task( + &self, + table_id: TableId, + table_name: TableName, + Repartition { + from_partition_exprs, + into_partition_exprs, + wait, + }: Repartition, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + + let procedure = self + .repartition_procedure_factory + .create( + &context, + table_name, + table_id, + from_partition_exprs, + into_partition_exprs, + ) + .context(CreateRepartitionProcedureSnafu)?; + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + if wait { + self.execute_procedure_and_wait(procedure_with_id).await + } else { + self.submit_procedure(procedure_with_id) + .await + .map(|p| (p, None)) + } + } + /// Submits and executes an alter table task. #[tracing::instrument(skip_all)] pub async fn submit_alter_table_task( @@ -214,13 +295,28 @@ impl DdlManager { table_id: TableId, alter_table_task: AlterTableTask, ) -> Result<(ProcedureId, Option)> { - let context = self.create_context(); + // make alter_table_task mutable so we can call .take() on its field + let mut alter_table_task = alter_table_task; + if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref() + && let Kind::Repartition(repartition) = + alter_table_task.alter_table.kind.take().unwrap() + { + let table_name = TableName::new( + alter_table_task.alter_table.catalog_name, + alter_table_task.alter_table.schema_name, + alter_table_task.alter_table.table_name, + ); + return self + .submit_repartition_task(table_id, table_name, repartition) + .await; + } + let context = self.create_context(); let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?; let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a create table task. @@ -235,7 +331,7 @@ impl DdlManager { let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a `[CreateViewTask]`. @@ -250,7 +346,7 @@ impl DdlManager { let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a create multiple logical table tasks. @@ -267,7 +363,7 @@ impl DdlManager { let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes alter multiple table tasks. @@ -284,7 +380,7 @@ impl DdlManager { let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a drop table task. @@ -299,7 +395,7 @@ impl DdlManager { let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a create database task. @@ -318,7 +414,7 @@ impl DdlManager { CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a drop table task. @@ -335,7 +431,7 @@ impl DdlManager { let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } pub async fn submit_alter_database( @@ -346,7 +442,7 @@ impl DdlManager { let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?; let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a create flow task. @@ -360,7 +456,7 @@ impl DdlManager { let procedure = CreateFlowProcedure::new(create_flow, query_context, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a drop flow task. @@ -373,7 +469,7 @@ impl DdlManager { let procedure = DropFlowProcedure::new(drop_flow, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a drop view task. @@ -386,7 +482,7 @@ impl DdlManager { let procedure = DropViewProcedure::new(drop_view, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a truncate table task. @@ -407,7 +503,7 @@ impl DdlManager { let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } /// Submits and executes a comment on task. @@ -420,10 +516,11 @@ impl DdlManager { let procedure = CommentOnProcedure::new(comment_on_task, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - self.submit_procedure(procedure_with_id).await + self.execute_procedure_and_wait(procedure_with_id).await } - async fn submit_procedure( + /// Executes a procedure and waits for the result. + async fn execute_procedure_and_wait( &self, procedure_with_id: ProcedureWithId, ) -> Result<(ProcedureId, Option)> { @@ -442,6 +539,18 @@ impl DdlManager { Ok((procedure_id, output)) } + /// Submits a procedure and returns the procedure id. + async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result { + let procedure_id = procedure_with_id.id; + let _ = self + .procedure_manager + .submit(procedure_with_id) + .await + .context(SubmitProcedureSnafu)?; + + Ok(procedure_id) + } + pub async fn submit_ddl_task( &self, ctx: &ExecutorContext, @@ -947,8 +1056,12 @@ async fn handle_comment_on_task( mod tests { use std::sync::Arc; + use common_error::ext::BoxedError; use common_procedure::local::LocalManager; use common_procedure::test_util::InMemoryPoisonStore; + use common_procedure::{BoxedProcedure, ProcedureManagerRef}; + use store_api::storage::TableId; + use table::table_name::TableName; use super::DdlManager; use crate::cache_invalidator::DummyCacheInvalidator; @@ -959,6 +1072,7 @@ mod tests { use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl}; + use crate::ddl_manager::RepartitionProcedureFactory; use crate::key::TableMetadataManager; use crate::key::flow::FlowMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; @@ -987,6 +1101,30 @@ mod tests { } } + struct DummyRepartitionProcedureFactory; + + #[async_trait::async_trait] + impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory { + fn create( + &self, + _ddl_ctx: &DdlContext, + _table_name: TableName, + _table_id: TableId, + _from_exprs: Vec, + _to_exprs: Vec, + ) -> std::result::Result { + unimplemented!() + } + + fn register_loaders( + &self, + _ddl_ctx: &DdlContext, + _procedure_manager: &ProcedureManagerRef, + ) -> std::result::Result<(), BoxedError> { + Ok(()) + } + } + #[test] fn test_try_new() { let kv_backend = Arc::new(MemoryKvBackend::new()); @@ -1023,6 +1161,7 @@ mod tests { region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(), + Arc::new(DummyRepartitionProcedureFactory), true, ); diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 831f17ea00..f048e85f2b 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -104,6 +104,20 @@ pub enum Error { source: common_procedure::error::Error, }, + #[snafu(display("Failed to register repartition procedure loader"))] + RegisterRepartitionProcedureLoader { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + + #[snafu(display("Failed to create repartition procedure"))] + CreateRepartitionProcedure { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to submit procedure"))] SubmitProcedure { #[snafu(implicit)] @@ -1170,6 +1184,8 @@ impl ErrorExt for Error { PutPoison { source, .. } => source.status_code(), ConvertColumnDef { source, .. } => source.status_code(), ProcedureStateReceiver { source, .. } => source.status_code(), + RegisterRepartitionProcedureLoader { source, .. } => source.status_code(), + CreateRepartitionProcedure { source, .. } => source.status_code(), ParseProcedureId { .. } | InvalidNumTopics { .. } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 11a1c28cfb..e001be4334 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -1038,7 +1038,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to serialize partition expression: {}", source))] + #[snafu(display("Failed to serialize partition expression"))] SerializePartitionExpr { #[snafu(source)] source: partition::error::Error, @@ -1046,6 +1046,20 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to deserialize partition expression"))] + DeserializePartitionExpr { + #[snafu(source)] + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Empty partition expression"))] + EmptyPartitionExpr { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Partition expression mismatch, region id: {}, expected: {}, actual: {}", region_id, @@ -1163,7 +1177,8 @@ impl ErrorExt for Error { | Error::MailboxChannelClosed { .. } | Error::IsNotLeader { .. } => StatusCode::IllegalState, Error::RetryLaterWithSource { source, .. } => source.status_code(), - Error::SerializePartitionExpr { source, .. } => source.status_code(), + Error::SerializePartitionExpr { source, .. } + | Error::DeserializePartitionExpr { source, .. } => source.status_code(), Error::Unsupported { .. } => StatusCode::Unsupported, @@ -1189,7 +1204,8 @@ impl ErrorExt for Error { | Error::RepartitionSourceRegionMissing { .. } | Error::RepartitionTargetRegionMissing { .. } | Error::PartitionExprMismatch { .. } - | Error::RepartitionSourceExprMismatch { .. } => StatusCode::InvalidArguments, + | Error::RepartitionSourceExprMismatch { .. } + | Error::EmptyPartitionExpr { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } | Error::InvalidRegionKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 4180a4b404..8881044c34 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -70,6 +70,7 @@ use crate::metasrv::{ use crate::peer::MetasrvPeerAllocator; use crate::procedure::region_migration::DefaultContextFactory; use crate::procedure::region_migration::manager::RegionMigrationManager; +use crate::procedure::repartition::DefaultRepartitionProcedureFactory; use crate::procedure::wal_prune::Context as WalPruneContext; use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker}; use crate::region::flush_trigger::RegionFlushTrigger; @@ -400,8 +401,17 @@ impl MetasrvBuilder { region_failure_detector_controller, }; let procedure_manager_c = procedure_manager.clone(); - let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true) - .context(error::InitDdlManagerSnafu)?; + let repartition_procedure_factory = Arc::new(DefaultRepartitionProcedureFactory::new( + mailbox.clone(), + options.grpc.server_addr.clone(), + )); + let ddl_manager = DdlManager::try_new( + ddl_context, + procedure_manager_c, + repartition_procedure_factory, + true, + ) + .context(error::InitDdlManagerSnafu)?; let ddl_manager = if let Some(configurator) = plugins .as_ref() diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index a296c4dbb2..c91270ecfa 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -27,24 +27,38 @@ use std::fmt::Debug; use common_error::ext::BoxedError; use common_meta::cache_invalidator::CacheInvalidatorRef; +use common_meta::ddl::DdlContext; use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef; use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef; +use common_meta::ddl_manager::RepartitionProcedureFactory; use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::RegionInfo; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use common_meta::node_manager::NodeManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::region_registry::LeaderRegionRegistryRef; use common_meta::rpc::router::RegionRoute; -use common_procedure::{Context as ProcedureContext, Status}; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata, +}; +use common_telemetry::error; +use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::TableId; +use table::table_name::TableName; use crate::error::{self, Result}; +use crate::procedure::repartition::group::{ + Context as RepartitionGroupContext, RepartitionGroupProcedure, +}; use crate::procedure::repartition::plan::RepartitionPlanEntry; +use crate::procedure::repartition::repartition_start::RepartitionStart; use crate::procedure::repartition::utils::get_datanode_table_value; use crate::service::mailbox::MailboxRef; @@ -60,6 +74,35 @@ pub struct PersistentContext { pub plans: Vec, } +impl PersistentContext { + pub fn new( + TableName { + catalog_name, + schema_name, + table_name, + }: TableName, + table_id: TableId, + ) -> Self { + Self { + catalog_name, + schema_name, + table_name, + table_id, + plans: vec![], + } + } + + pub fn lock_key(&self) -> Vec { + vec![ + CatalogLock::Read(&self.catalog_name).into(), + SchemaLock::read(&self.catalog_name, &self.schema_name).into(), + TableLock::Write(self.table_id).into(), + TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(), + ] + } +} + +#[derive(Clone)] pub struct Context { pub persistent_ctx: PersistentContext, pub table_metadata_manager: TableMetadataManagerRef, @@ -74,6 +117,26 @@ pub struct Context { } impl Context { + pub fn new( + ddl_ctx: &DdlContext, + mailbox: MailboxRef, + server_addr: String, + persistent_ctx: PersistentContext, + ) -> Self { + Self { + persistent_ctx, + table_metadata_manager: ddl_ctx.table_metadata_manager.clone(), + memory_region_keeper: ddl_ctx.memory_region_keeper.clone(), + node_manager: ddl_ctx.node_manager.clone(), + leader_region_registry: ddl_ctx.leader_region_registry.clone(), + mailbox, + server_addr, + cache_invalidator: ddl_ctx.cache_invalidator.clone(), + region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(), + wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(), + } + } + /// Retrieves the table route value for the given table id. /// /// Retry: @@ -207,3 +270,202 @@ pub(crate) trait State: Sync + Send + Debug { fn as_any(&self) -> &dyn Any; } + +pub struct RepartitionProcedure { + state: Box, + context: Context, +} + +#[derive(Debug, Serialize)] +struct RepartitionData<'a> { + state: &'a dyn State, + persistent_ctx: &'a PersistentContext, +} + +#[derive(Debug, Deserialize)] +struct RepartitionDataOwned { + state: Box, + persistent_ctx: PersistentContext, +} + +impl RepartitionProcedure { + const TYPE_NAME: &'static str = "metasrv-procedure::Repartition"; + + pub fn new( + from_exprs: Vec, + to_exprs: Vec, + context: Context, + ) -> Self { + let state = Box::new(RepartitionStart::new(from_exprs, to_exprs)); + + Self { state, context } + } + + pub fn from_json(json: &str, ctx_factory: F) -> ProcedureResult + where + F: FnOnce(PersistentContext) -> Context, + { + let RepartitionDataOwned { + state, + persistent_ctx, + } = serde_json::from_str(json).context(FromJsonSnafu)?; + let context = ctx_factory(persistent_ctx); + + Ok(Self { state, context }) + } +} + +#[async_trait::async_trait] +impl Procedure for RepartitionProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &mut self.state; + match state.next(&mut self.context, _ctx).await { + Ok((next, status)) => { + *state = next; + Ok(status) + } + Err(e) => { + if e.is_retryable() { + Err(ProcedureError::retry_later(e)) + } else { + error!( + e; + "Repartition procedure failed, table id: {}", + self.context.persistent_ctx.table_id, + ); + Err(ProcedureError::external(e)) + } + } + } + } + + fn rollback_supported(&self) -> bool { + // TODO(weny): support rollback. + false + } + + fn dump(&self) -> ProcedureResult { + let data = RepartitionData { + state: self.state.as_ref(), + persistent_ctx: &self.context.persistent_ctx, + }; + serde_json::to_string(&data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::new(self.context.persistent_ctx.lock_key()) + } + + fn user_metadata(&self) -> Option { + // TODO(weny): support user metadata. + None + } +} + +pub struct DefaultRepartitionProcedureFactory { + mailbox: MailboxRef, + server_addr: String, +} + +impl DefaultRepartitionProcedureFactory { + pub fn new(mailbox: MailboxRef, server_addr: String) -> Self { + Self { + mailbox, + server_addr, + } + } +} + +impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory { + fn create( + &self, + ddl_ctx: &DdlContext, + table_name: TableName, + table_id: TableId, + from_exprs: Vec, + to_exprs: Vec, + ) -> std::result::Result { + let persistent_ctx = PersistentContext::new(table_name, table_id); + let from_exprs = from_exprs + .iter() + .map(|e| { + PartitionExpr::from_json_str(e) + .context(error::DeserializePartitionExprSnafu)? + .context(error::EmptyPartitionExprSnafu) + }) + .collect::>>() + .map_err(BoxedError::new)?; + let to_exprs = to_exprs + .iter() + .map(|e| { + PartitionExpr::from_json_str(e) + .context(error::DeserializePartitionExprSnafu)? + .context(error::EmptyPartitionExprSnafu) + }) + .collect::>>() + .map_err(BoxedError::new)?; + + let procedure = RepartitionProcedure::new( + from_exprs, + to_exprs, + Context::new( + ddl_ctx, + self.mailbox.clone(), + self.server_addr.clone(), + persistent_ctx, + ), + ); + + Ok(Box::new(procedure)) + } + + fn register_loaders( + &self, + ddl_ctx: &DdlContext, + procedure_manager: &ProcedureManagerRef, + ) -> std::result::Result<(), BoxedError> { + // Registers the repartition procedure loader. + let mailbox = self.mailbox.clone(); + let server_addr = self.server_addr.clone(); + let moved_ddl_ctx = ddl_ctx.clone(); + procedure_manager + .register_loader( + RepartitionProcedure::TYPE_NAME, + Box::new(move |json| { + let mailbox = mailbox.clone(); + let server_addr = server_addr.clone(); + let ddl_ctx = moved_ddl_ctx.clone(); + let factory = move |persistent_ctx| { + Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx) + }; + RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _) + }), + ) + .map_err(BoxedError::new)?; + + // Registers the repartition group procedure loader. + let mailbox = self.mailbox.clone(); + let server_addr = self.server_addr.clone(); + let moved_ddl_ctx = ddl_ctx.clone(); + procedure_manager + .register_loader( + RepartitionGroupProcedure::TYPE_NAME, + Box::new(move |json| { + let mailbox = mailbox.clone(); + let server_addr = server_addr.clone(); + let ddl_ctx = moved_ddl_ctx.clone(); + let factory = move |persistent_ctx| { + RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx) + }; + RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _) + }), + ) + .map_err(BoxedError::new)?; + + Ok(()) + } +} diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index 7a5ae4a6f6..a80d3388cf 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -27,6 +27,7 @@ use std::time::Duration; use common_error::ext::BoxedError; use common_meta::cache_invalidator::CacheInvalidatorRef; +use common_meta::ddl::DdlContext; use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo}; use common_meta::key::table_route::TableRouteValue; @@ -34,7 +35,7 @@ use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; -use common_procedure::error::ToJsonSnafu; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, StringKey, UserMetadata, @@ -60,8 +61,20 @@ pub struct RepartitionGroupProcedure { context: Context, } +#[derive(Debug, Serialize)] +struct RepartitionGroupData<'a> { + persistent_ctx: &'a PersistentContext, + state: &'a dyn State, +} + +#[derive(Debug, Deserialize)] +struct RepartitionGroupDataOwned { + persistent_ctx: PersistentContext, + state: Box, +} + impl RepartitionGroupProcedure { - const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup"; + pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup"; pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self { let state = Box::new(RepartitionStart); @@ -77,12 +90,19 @@ impl RepartitionGroupProcedure { }, } } -} -#[derive(Debug, Serialize)] -pub struct RepartitionGroupData<'a> { - persistent_ctx: &'a PersistentContext, - state: &'a dyn State, + pub fn from_json(json: &str, ctx_factory: F) -> ProcedureResult + where + F: FnOnce(PersistentContext) -> Context, + { + let RepartitionGroupDataOwned { + state, + persistent_ctx, + } = serde_json::from_str(json).context(FromJsonSnafu)?; + let context = ctx_factory(persistent_ctx); + + Ok(Self { state, context }) + } } #[async_trait::async_trait] @@ -149,6 +169,23 @@ pub struct Context { pub server_addr: String, } +impl Context { + pub fn new( + ddl_ctx: &DdlContext, + mailbox: MailboxRef, + server_addr: String, + persistent_ctx: PersistentContext, + ) -> Self { + Self { + persistent_ctx, + cache_invalidator: ddl_ctx.cache_invalidator.clone(), + table_metadata_manager: ddl_ctx.table_metadata_manager.clone(), + mailbox, + server_addr, + } + } +} + /// The result of the group preparation phase, containing validated region routes. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct GroupPrepareResult { diff --git a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs index 0223d25794..973c14f5a5 100644 --- a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs +++ b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs @@ -31,7 +31,7 @@ use store_api::storage::RegionId; use crate::error::{self, Error, Result}; use crate::handler::HeartbeatMailbox; -use crate::procedure::repartition::group::update_metadata::UpdateMetadata; +use crate::procedure::repartition::group::repartition_end::RepartitionEnd; use crate::procedure::repartition::group::utils::{ HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, }; @@ -52,10 +52,7 @@ impl State for ApplyStagingManifest { ) -> Result<(Box, Status)> { self.apply_staging_manifests(ctx).await?; - Ok(( - Box::new(UpdateMetadata::ApplyStaging), - Status::executing(true), - )) + Ok((Box::new(RepartitionEnd), Status::executing(true))) } fn as_any(&self) -> &dyn Any { @@ -63,13 +60,18 @@ impl State for ApplyStagingManifest { } } +struct ApplyStagingManifestInstructions { + instructions: HashMap>, + central_region_instruction: Option<(Peer, common_meta::instruction::ApplyStagingManifest)>, +} + impl ApplyStagingManifest { fn build_apply_staging_manifest_instructions( staging_manifest_paths: &HashMap, target_routes: &[RegionRoute], targets: &[RegionDescriptor], central_region_id: RegionId, - ) -> Result>> { + ) -> Result { let target_partition_expr_by_region = targets .iter() .map(|target| { @@ -86,7 +88,25 @@ impl ApplyStagingManifest { let target_region_routes_by_peer = group_region_routes_by_peer(target_routes); let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len()); - for (peer, region_ids) in target_region_routes_by_peer { + let mut central_region_instruction = None; + for (peer, mut region_ids) in target_region_routes_by_peer { + // If the central region is in the target region ids, + // remove it and build the instruction for the central region. + if region_ids.contains(¢ral_region_id) { + region_ids.retain(|r| *r != central_region_id); + central_region_instruction = Some(( + peer.clone(), + common_meta::instruction::ApplyStagingManifest { + region_id: central_region_id, + partition_expr: target_partition_expr_by_region[¢ral_region_id].clone(), + central_region_id, + manifest_path: staging_manifest_paths[¢ral_region_id].clone(), + }, + )); + if region_ids.is_empty() { + continue; + } + } let apply_staging_manifests = region_ids .into_iter() .map(|region_id| common_meta::instruction::ApplyStagingManifest { @@ -99,7 +119,10 @@ impl ApplyStagingManifest { instructions.insert(peer.clone(), apply_staging_manifests); } - Ok(instructions) + Ok(ApplyStagingManifestInstructions { + instructions, + central_region_instruction, + }) } #[allow(dead_code)] @@ -112,7 +135,10 @@ impl ApplyStagingManifest { let targets = &ctx.persistent_ctx.targets; let target_routes = &prepare_result.target_routes; let central_region_id = prepare_result.central_region; - let instructions = Self::build_apply_staging_manifest_instructions( + let ApplyStagingManifestInstructions { + instructions, + central_region_instruction, + } = Self::build_apply_staging_manifest_instructions( staging_manifest_paths, target_routes, targets, @@ -155,8 +181,10 @@ impl ApplyStagingManifest { let results = join_all(tasks).await; let result = handle_multiple_results(&results); match result { - HandleMultipleResult::AllSuccessful => Ok(()), - HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu { + HandleMultipleResult::AllSuccessful => { + // Coninute + } + HandleMultipleResult::AllRetryable(retryable_errors) => return error::RetryLaterSnafu { reason: format!( "All retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}", table_id, group_id, @@ -168,7 +196,7 @@ impl ApplyStagingManifest { ), } .fail(), - HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu { + HandleMultipleResult::AllNonRetryable(non_retryable_errors) => return error::UnexpectedSnafu { violated: format!( "All non retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}", table_id, group_id, @@ -183,7 +211,7 @@ impl ApplyStagingManifest { HandleMultipleResult::PartialRetryable { retryable_errors, non_retryable_errors, - } => error::UnexpectedSnafu { + } => return error::UnexpectedSnafu { violated: format!( "Partial retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}, non retryable errors: {:?}", table_id, group_id, @@ -201,6 +229,20 @@ impl ApplyStagingManifest { } .fail(), } + + if let Some((peer, instruction)) = central_region_instruction { + info!("Applying staging manifest for central region: {:?}", peer); + Self::apply_staging_manifest( + &ctx.mailbox, + &ctx.server_addr, + &peer, + &[instruction], + operation_timeout, + ) + .await?; + } + + Ok(()) } async fn apply_staging_manifest( diff --git a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs index e261119f7c..fd61c4f8d3 100644 --- a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs @@ -128,14 +128,6 @@ impl RepartitionStart { central_region_datanode, }) } - - #[allow(dead_code)] - fn next_state() -> (Box, Status) { - ( - Box::new(UpdateMetadata::ApplyStaging), - Status::executing(true), - ) - } } #[async_trait::async_trait] @@ -157,7 +149,10 @@ impl State for RepartitionStart { _procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { if ctx.persistent_ctx.group_prepare_result.is_some() { - return Ok(Self::next_state()); + return Ok(( + Box::new(UpdateMetadata::ApplyStaging), + Status::executing(true), + )); } let table_id = ctx.persistent_ctx.table_id; let group_id = ctx.persistent_ctx.group_id; @@ -177,7 +172,10 @@ impl State for RepartitionStart { ctx.persistent_ctx.targets.len() ); - Ok(Self::next_state()) + Ok(( + Box::new(UpdateMetadata::ApplyStaging), + Status::executing(true), + )) } fn as_any(&self) -> &dyn Any { diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index dffcf54573..139847f01b 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -736,7 +736,14 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to deserialize partition expression: {}", source))] + #[snafu(display("Failed to serialize partition expression"))] + SerializePartitionExpr { + #[snafu(implicit)] + location: Location, + source: partition::error::Error, + }, + + #[snafu(display("Failed to deserialize partition expression"))] DeserializePartitionExpr { #[snafu(source)] source: partition::error::Error, @@ -983,7 +990,8 @@ impl ErrorExt for Error { | Error::MissingInsertBody { .. } => StatusCode::Internal, Error::ExecuteAdminFunction { .. } | Error::EncodeJson { .. } - | Error::DeserializePartitionExpr { .. } => StatusCode::Unexpected, + | Error::DeserializePartitionExpr { .. } + | Error::SerializePartitionExpr { .. } => StatusCode::Unexpected, Error::ViewNotFound { .. } | Error::ViewInfoNotFound { .. } | Error::TableNotFound { .. } => StatusCode::TableNotFound, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index d1c380306b..0db4a681b1 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -16,9 +16,11 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; +use api::v1::alter_table_expr::Kind; use api::v1::meta::CreateFlowTask as PbCreateFlowTask; use api::v1::{ - AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def, + AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, + Repartition, column_def, }; #[cfg(feature = "enterprise")] use api::v1::{ @@ -88,8 +90,9 @@ use crate::error::{ FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu, PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, - SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, - UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, + SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, + TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, + ViewAlreadyExistsSnafu, }; use crate::expr_helper::{self, RepartitionRequest}; use crate::statement::StatementExecutor; @@ -1408,20 +1411,56 @@ impl StatementExecutor { .context(InvalidPartitionSnafu)?; info!( - "Repartition table {} (table_id={}) from {:?} to {:?}, new partition count: {}", + "Submitting repartition task for table {} (table_id={}), from {} to {} partitions", table_ref, table_id, - from_partition_exprs, - into_partition_exprs, + from_partition_exprs.len(), new_partition_exprs_len ); - // TODO(weny): Implement the repartition procedure submission. - // The repartition procedure infrastructure is not yet fully integrated with the DDL task system. - NotSupportedSnafu { - feat: "ALTER TABLE REPARTITION", - } - .fail() + let serialize_exprs = |exprs: Vec| -> Result> { + let mut json_exprs = Vec::with_capacity(exprs.len()); + for expr in exprs { + json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?); + } + Ok(json_exprs) + }; + let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?; + let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?; + let req = SubmitDdlTaskRequest { + query_context: query_context.clone(), + task: DdlTask::new_alter_table(AlterTableExpr { + catalog_name: request.catalog_name.clone(), + schema_name: request.schema_name.clone(), + table_name: request.table_name.clone(), + kind: Some(Kind::Repartition(Repartition { + from_partition_exprs: from_partition_exprs_json, + into_partition_exprs: into_partition_exprs_json, + // TODO(weny): allow passing 'wait' from SQL options or QueryContext + wait: true, + })), + }), + }; + let invalidate_keys = vec![ + CacheIdent::TableId(table_id), + CacheIdent::TableName(TableName::new( + request.catalog_name, + request.schema_name, + request.table_name, + )), + ]; + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), req) + .await + .context(error::ExecuteDdlSnafu)?; + + // Invalidates local cache ASAP. + self.cache_invalidator + .invalidate(&Context::default(), &invalidate_keys) + .await + .context(error::InvalidateTableCacheSnafu)?; + + Ok(Output::new_with_affected_rows(0)) } #[tracing::instrument(skip_all)] diff --git a/src/standalone/Cargo.toml b/src/standalone/Cargo.toml index 26d858272a..55265fbe1b 100644 --- a/src/standalone/Cargo.toml +++ b/src/standalone/Cargo.toml @@ -37,4 +37,5 @@ serde.workspace = true servers.workspace = true snafu.workspace = true store-api.workspace = true +table.workspace = true tokio.workspace = true diff --git a/src/standalone/src/error.rs b/src/standalone/src/error.rs index 352271762c..2ae47ede18 100644 --- a/src/standalone/src/error.rs +++ b/src/standalone/src/error.rs @@ -36,6 +36,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Repartition procedure is not supported in standalone mode"))] + NoSupportRepartitionProcedure { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -45,6 +51,7 @@ impl ErrorExt for Error { match self { Error::OpenMetadataKvBackend { source, .. } => source.status_code(), Error::External { source, .. } => source.status_code(), + Error::NoSupportRepartitionProcedure { .. } => StatusCode::Unsupported, } } diff --git a/src/standalone/src/lib.rs b/src/standalone/src/lib.rs index 8c6f041fe3..a5f3db2a2a 100644 --- a/src/standalone/src/lib.rs +++ b/src/standalone/src/lib.rs @@ -20,4 +20,4 @@ pub mod procedure; pub use information_extension::StandaloneInformationExtension; pub use metadata::build_metadata_kvbackend; -pub use procedure::build_procedure_manager; +pub use procedure::{StandaloneRepartitionProcedureFactory, build_procedure_manager}; diff --git a/src/standalone/src/procedure.rs b/src/standalone/src/procedure.rs index 2443abf39f..cc45dd8bfc 100644 --- a/src/standalone/src/procedure.rs +++ b/src/standalone/src/procedure.rs @@ -14,12 +14,19 @@ use std::sync::Arc; +use common_error::ext::BoxedError; +use common_meta::ddl::DdlContext; +use common_meta::ddl_manager::RepartitionProcedureFactory; use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::kv_backend::KvBackendRef; use common_meta::state_store::KvStateStore; -use common_procedure::ProcedureManagerRef; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::options::ProcedureConfig; +use common_procedure::{BoxedProcedure, ProcedureManagerRef}; +use store_api::storage::TableId; +use table::table_name::TableName; + +use crate::error::NoSupportRepartitionProcedureSnafu; /// Builds the procedure manager. pub fn build_procedure_manager( @@ -43,3 +50,32 @@ pub fn build_procedure_manager( None, )) } + +/// No-op implementation of [`RepartitionProcedureFactory`] for standalone mode. +/// +/// In standalone deployments, repartition operations are not supported, so +/// this factory always returns a `NoSupportRepartitionProcedure` error +/// from [`RepartitionProcedureFactory::create`] and performs no registration +/// work in [`RepartitionProcedureFactory::register_loaders`]. +pub struct StandaloneRepartitionProcedureFactory; + +impl RepartitionProcedureFactory for StandaloneRepartitionProcedureFactory { + fn create( + &self, + _ddl_ctx: &DdlContext, + _table_name: TableName, + _table_id: TableId, + _from_exprs: Vec, + _to_exprs: Vec, + ) -> std::result::Result { + Err(BoxedError::new(NoSupportRepartitionProcedureSnafu.build())) + } + + fn register_loaders( + &self, + _ddl_ctx: &DdlContext, + _procedure_manager: &ProcedureManagerRef, + ) -> std::result::Result<(), BoxedError> { + Ok(()) + } +} diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 0c42c2f3da..af7047cdaf 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -52,6 +52,7 @@ use frontend::server::Services; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use servers::grpc::GrpcOptions; use snafu::ResultExt; +use standalone::StandaloneRepartitionProcedureFactory; use standalone::options::StandaloneOptions; use crate::test_util::{self, StorageType, TestGuard, create_tmp_dir_and_datanode_opts}; @@ -215,6 +216,7 @@ impl GreptimeDbStandaloneBuilder { flow_id_sequence, )); + let repartition_procedure_factory = Arc::new(StandaloneRepartitionProcedureFactory); let ddl_manager = Arc::new( DdlManager::try_new( DdlContext { @@ -229,6 +231,7 @@ impl GreptimeDbStandaloneBuilder { region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(), + repartition_procedure_factory, register_procedure_loaders, ) .unwrap(), diff --git a/tests/cases/distributed/repartition/repartition.result b/tests/cases/distributed/repartition/repartition.result new file mode 100644 index 0000000000..05775c91f4 --- /dev/null +++ b/tests/cases/distributed/repartition/repartition.result @@ -0,0 +1,101 @@ +CREATE TABLE alter_repartition_table( + device_id INT, + area STRING, + ty STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(device_id) +) PARTITION ON COLUMNS (device_id, area) ( + device_id < 100, + device_id >= 100 AND device_id < 200, + device_id >= 200 +); + +Affected Rows: 0 + +ALTER TABLE alter_repartition_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +); + +Affected Rows: 0 + +SHOW CREATE TABLE alter_repartition_table; + ++-------------------------+--------------------------------------------------------+ +| Table | Create Table | ++-------------------------+--------------------------------------------------------+ +| alter_repartition_table | CREATE TABLE IF NOT EXISTS "alter_repartition_table" ( | +| | "device_id" INT NULL, | +| | "area" STRING NULL, | +| | "ty" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("device_id") | +| | ) | +| | PARTITION ON COLUMNS ("device_id", "area") ( | +| | device_id < 100 AND area < 'South', | +| | device_id >= 100 AND device_id < 200, | +| | device_id >= 200, | +| | device_id < 100 AND area >= 'South' | +| | ) | +| | ENGINE=mito | +| | | ++-------------------------+--------------------------------------------------------+ + +ALTER TABLE alter_repartition_table MERGE PARTITION ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +); + +Affected Rows: 0 + +SHOW CREATE TABLE alter_repartition_table; + ++-------------------------+------------------------------------------------------------------------------+ +| Table | Create Table | ++-------------------------+------------------------------------------------------------------------------+ +| alter_repartition_table | CREATE TABLE IF NOT EXISTS "alter_repartition_table" ( | +| | "device_id" INT NULL, | +| | "area" STRING NULL, | +| | "ty" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("device_id") | +| | ) | +| | PARTITION ON COLUMNS ("device_id", "area") ( | +| | device_id < 100 AND area < 'South' OR device_id < 100 AND area >= 'South', | +| | device_id >= 100 AND device_id < 200, | +| | device_id >= 200 | +| | ) | +| | ENGINE=mito | +| | | ++-------------------------+------------------------------------------------------------------------------+ + +-- FIXME(weny): Object store is not configured for the test environment, +-- so staging manifest may not be applied in some cases. +-- invalid: empty source clause +ALTER TABLE alter_repartition_table REPARTITION () INTO ( + device_id < 100 +); + +Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected expression inside REPARTITION clause, found: ) + +-- invalid: more than one INTO clause +ALTER TABLE alter_repartition_table REPARTITION ( + device_id < 100 +) INTO ( + device_id < 50 +), ( + device_id >= 50 +) INTO ( + device_id >= 50 +); + +Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected end of REPARTITION clause, found: , + +DROP TABLE alter_repartition_table; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/repartition.sql b/tests/cases/distributed/repartition/repartition.sql similarity index 76% rename from tests/cases/standalone/common/alter/repartition.sql rename to tests/cases/distributed/repartition/repartition.sql index 008588b887..35fd01feb3 100644 --- a/tests/cases/standalone/common/alter/repartition.sql +++ b/tests/cases/distributed/repartition/repartition.sql @@ -10,7 +10,6 @@ CREATE TABLE alter_repartition_table( device_id >= 200 ); --- valid grammar, currently not implemented ALTER TABLE alter_repartition_table REPARTITION ( device_id < 100 ) INTO ( @@ -18,19 +17,17 @@ ALTER TABLE alter_repartition_table REPARTITION ( device_id < 100 AND area >= 'South' ); --- valid grammar, currently not implemented -ALTER TABLE alter_repartition_table SPLIT PARTITION ( - device_id < 100 -) INTO ( +SHOW CREATE TABLE alter_repartition_table; + +ALTER TABLE alter_repartition_table MERGE PARTITION ( device_id < 100 AND area < 'South', device_id < 100 AND area >= 'South' ); --- valid grammar, currently not implemented -ALTER TABLE alter_repartition_table MERGE PARTITION ( - device_id < 100, - device_id >= 100 AND device_id < 200 -); +SHOW CREATE TABLE alter_repartition_table; + +-- FIXME(weny): Object store is not configured for the test environment, +-- so staging manifest may not be applied in some cases. -- invalid: empty source clause ALTER TABLE alter_repartition_table REPARTITION () INTO ( diff --git a/tests/cases/standalone/common/alter/repartition.result b/tests/cases/standalone/common/alter/repartition.result deleted file mode 100644 index 1b95c2e540..0000000000 --- a/tests/cases/standalone/common/alter/repartition.result +++ /dev/null @@ -1,66 +0,0 @@ -CREATE TABLE alter_repartition_table( - device_id INT, - area STRING, - ty STRING, - ts TIMESTAMP TIME INDEX, - PRIMARY KEY(device_id) -) PARTITION ON COLUMNS (device_id, area) ( - device_id < 100, - device_id >= 100 AND device_id < 200, - device_id >= 200 -); - -Affected Rows: 0 - --- valid grammar, currently not implemented -ALTER TABLE alter_repartition_table REPARTITION ( - device_id < 100 -) INTO ( - device_id < 100 AND area < 'South', - device_id < 100 AND area >= 'South' -); - -Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION - --- valid grammar, currently not implemented -ALTER TABLE alter_repartition_table SPLIT PARTITION ( - device_id < 100 -) INTO ( - device_id < 100 AND area < 'South', - device_id < 100 AND area >= 'South' -); - -Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION - --- valid grammar, currently not implemented -ALTER TABLE alter_repartition_table MERGE PARTITION ( - device_id < 100, - device_id >= 100 AND device_id < 200 -); - -Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION - --- invalid: empty source clause -ALTER TABLE alter_repartition_table REPARTITION () INTO ( - device_id < 100 -); - -Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected expression inside REPARTITION clause, found: ) - --- invalid: more than one INTO clause -ALTER TABLE alter_repartition_table REPARTITION ( - device_id < 100 -) INTO ( - device_id < 50 -), ( - device_id >= 50 -) INTO ( - device_id >= 50 -); - -Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected end of REPARTITION clause, found: , - -DROP TABLE alter_repartition_table; - -Affected Rows: 0 -