From d0877997a246286d0f366f43c21a0fce3499e17d Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 13 Oct 2025 02:41:02 +0000 Subject: [PATCH] feat: scaffold repartition procedure with plan/resource stubs Signed-off-by: Zhenchi --- src/common/meta/Cargo.toml | 1 + src/common/meta/src/ddl.rs | 1 + src/common/meta/src/ddl/repartition.rs | 21 ++ src/common/meta/src/ddl/repartition/plan.rs | 145 ++++++++ .../meta/src/ddl/repartition/procedure.rs | 343 ++++++++++++++++++ src/common/meta/src/ddl_manager.rs | 17 +- 6 files changed, 527 insertions(+), 1 deletion(-) create mode 100644 src/common/meta/src/ddl/repartition.rs create mode 100644 src/common/meta/src/ddl/repartition/plan.rs create mode 100644 src/common/meta/src/ddl/repartition/procedure.rs diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 3ea7627f9c..582e12eb89 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -87,6 +87,7 @@ tokio-postgres-rustls = { version = "0.12", optional = true } tonic.workspace = true tracing.workspace = true typetag.workspace = true +uuid.workspace = true [dev-dependencies] chrono.workspace = true diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index e12331d4a2..9b9b6667ff 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -44,6 +44,7 @@ pub mod drop_table; pub mod drop_view; pub mod flow_meta; pub mod table_meta; +pub mod repartition; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] diff --git a/src/common/meta/src/ddl/repartition.rs b/src/common/meta/src/ddl/repartition.rs new file mode 100644 index 0000000000..984caa50db --- /dev/null +++ b/src/common/meta/src/ddl/repartition.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod plan; +pub mod procedure; + +pub use plan::{ + PartitionChange, PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand, +}; +pub use procedure::{RepartitionProcedure, RepartitionTask}; diff --git a/src/common/meta/src/ddl/repartition/plan.rs b/src/common/meta/src/ddl/repartition/plan.rs new file mode 100644 index 0000000000..b13f7a6599 --- /dev/null +++ b/src/common/meta/src/ddl/repartition/plan.rs @@ -0,0 +1,145 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; +use store_api::storage::{RegionId, TableId}; +use uuid::Uuid; + +/// Identifier of a plan group. +pub type PlanGroupId = Uuid; + +/// Logical description of the repartition plan. +/// +/// The plan is persisted by the procedure framework so it must remain +/// serializable/deserializable across versions. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RepartitionPlan { + /// Identifier of the physical table to repartition. + pub table_id: TableId, + /// Deterministic hash of the generated plan. Used for idempotence checks. + pub plan_hash: String, + /// Plan groups to execute. Each group is independent from others. + pub groups: Vec, + /// Aggregate resource expectation. + pub resource_demand: ResourceDemand, +} + +impl RepartitionPlan { + /// Creates an empty plan. Primarily used in tests and early skeleton code. + pub fn empty(table_id: TableId) -> Self { + Self { + table_id, + plan_hash: String::new(), + groups: Vec::new(), + resource_demand: ResourceDemand::default(), + } + } + + /// Returns `true` if the plan does not contain any work. + pub fn is_trivial(&self) -> bool { + self.groups.is_empty() + } +} + +/// A group of repartition operations that can be executed as a sub-procedure. +/// +/// Groups are designed to be independent so that failure in one group does not +/// propagate to others. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct PlanGroup { + /// Stable identifier of the group. + pub group_id: PlanGroupId, + /// Regions that provide the data source. + pub source_regions: Vec, + /// Target regions that should exist after this group finishes. + pub target_regions: Vec, + /// Ordered list of logical changes required by this group. + pub changes: Vec, + /// Estimated resource demand contributed by this group. + pub resource_hint: ResourceDemand, +} + +impl PlanGroup { + /// Convenience constructor for skeleton code and tests. + pub fn new(group_id: PlanGroupId) -> Self { + Self { + group_id, + source_regions: Vec::new(), + target_regions: Vec::new(), + changes: Vec::new(), + resource_hint: ResourceDemand::default(), + } + } +} + +/// Diff between the old and the new partition rules. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct PartitionRuleDiff { + /// Ordered list of changes to transform the old rule into the new rule. + pub changes: Vec, +} + +impl PartitionRuleDiff { + /// Returns `true` if there is no change between two rules. + pub fn is_empty(&self) -> bool { + self.changes.is_empty() + } +} + +/// Primitive repartition changes recognised by the planner. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum PartitionChange { + /// Split one region into multiple target regions. + Split { + from: RegionId, + to: Vec, + }, + /// Merge multiple regions into one target region. + Merge { + from: Vec, + to: RegionId, + }, + /// No-op placeholder for future operations (e.g. rule rewrite). + Unsupported, +} + +impl PartitionChange { + /// Returns the regions referenced by this change. + pub fn referenced_regions(&self) -> Vec { + match self { + PartitionChange::Split { from, to } => { + let mut regions = Vec::with_capacity(1 + to.len()); + regions.push(*from); + regions.extend(to.iter().copied()); + regions + } + PartitionChange::Merge { from, to } => { + let mut regions = Vec::with_capacity(from.len() + 1); + regions.extend(from.iter().copied()); + regions.push(*to); + regions + } + PartitionChange::Unsupported => Vec::new(), + } + } +} + +/// Resource estimation for executing a plan or a plan group. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct ResourceDemand { + /// Number of brand-new regions that must be allocated before execution. + pub new_regions: u32, + /// Rough estimate of data volume to rewrite (in bytes). + pub estimated_bytes: u64, +} diff --git a/src/common/meta/src/ddl/repartition/procedure.rs b/src/common/meta/src/ddl/repartition/procedure.rs new file mode 100644 index 0000000000..0457f83227 --- /dev/null +++ b/src/common/meta/src/ddl/repartition/procedure.rs @@ -0,0 +1,343 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! High-level state machine for table repartitioning. +//! +//! The [`RepartitionProcedure`] coordinates the full flow described in the RFC: +//! 1. **Prepare** – validate the request, lock the table, compute the rule diff, +//! and materialize a deterministic [`RepartitionPlan`]. +//! 2. **AllocateResources** – estimate [`ResourceDemand`] and talk to PaaS to +//! reserve the additional regions required by the plan. +//! 3. **DispatchSubprocedures** – split the plan into `PlanGroup`s, spawn +//! sub-procedures that stop compaction/snapshot, enter the no-ingestion +//! window, update metadata, push region-rule versions, generate new manifests, +//! and stage/acknowledge versioned writes via `RegionEdit`. +//! 4. **Finalize** – aggregate results, roll failed groups back (metadata, +//! manifests, staged data), restart unhealthy regions, unlock the table, and +//! trigger cache reload / optional compaction in the background. +//! +//! Each step is designed to be idempotent so the framework can retry safely. +//! This file currently contains the skeleton states and will be filled out with +//! concrete logic in follow-up patches. + +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureWithId, Status}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::storage::TableId; +use strum::AsRefStr; +use table::table_reference::TableReference; +use uuid::Uuid; + +use crate::ddl::repartition::{PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand}; +use crate::ddl::utils::map_to_procedure_error; +use crate::ddl::DdlContext; +use crate::error::Result; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; + +/// Procedure that orchestrates the repartition flow. +pub struct RepartitionProcedure { + #[allow(dead_code)] + context: DdlContext, + data: RepartitionData, +} + +impl RepartitionProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition"; + + pub fn new(task: RepartitionTask, context: DdlContext) -> Result { + Ok(Self { + context, + data: RepartitionData::new(task), + }) + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data: RepartitionData = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(Self { context, data }) + } + + async fn on_prepare(&mut self) -> Result { + if self.data.plan.is_none() { + let (mut plan, diff, demand) = self.generate_plan_stub()?; + plan.plan_hash = format!( + "{}:{}:{}", + self.data.task.table_id, + plan.groups.len(), + self.data.task.new_rule_payload.len() + ); + + self.data.resource_demand = Some(demand); + self.data.rule_diff = Some(diff); + self.data.plan = Some(plan); + } + + self.data.state = RepartitionState::AllocateResources; + Ok(Status::executing(true)) + } + + async fn on_allocate_resources(&mut self) -> Result { + if !self.data.resource_allocated { + let demand = self.data.resource_demand.unwrap_or_default(); + let allocated = self.allocate_resources_stub(demand).await?; + if !allocated { + if let Some(plan) = &self.data.plan { + self.data + .failed_groups + .extend(plan.groups.iter().map(|group| group.group_id)); + } + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + } + self.data.resource_allocated = true; + } + + self.data.state = RepartitionState::DispatchSubprocedures; + Ok(Status::executing(true)) + } + + async fn on_dispatch_subprocedures(&mut self) -> Result { + let Some(plan) = self.data.plan.as_ref() else { + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + }; + + let groups_to_schedule: Vec = plan + .groups + .iter() + .filter(|group| { + !self.data.succeeded_groups.contains(&group.group_id) + && !self.data.failed_groups.contains(&group.group_id) + }) + .map(|group| group.group_id) + .collect(); + + if groups_to_schedule.is_empty() { + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + } + + let subprocedures = self.spawn_group_stub_procedures(&groups_to_schedule); + self.data.pending_groups = groups_to_schedule; + self.data.state = RepartitionState::CollectSubprocedures; + + Ok(Status::suspended(subprocedures, true)) + } + + async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> Result { + self.data + .succeeded_groups + .extend(self.data.pending_groups.drain(..)); + + self.data.state = RepartitionState::Finalize; + Ok(Status::executing(true)) + } + + async fn on_finalize(&mut self) -> Result { + self.data.summary = Some(RepartitionSummary { + succeeded_groups: self.data.succeeded_groups.clone(), + failed_groups: self.data.failed_groups.clone(), + }); + self.data.state = RepartitionState::Finished; + Ok(Status::done()) + } + + fn generate_plan_stub(&self) -> Result<(RepartitionPlan, PartitionRuleDiff, ResourceDemand)> { + let mut plan = RepartitionPlan::empty(self.data.task.table_id); + let diff = PartitionRuleDiff::default(); + + if !self.data.task.new_rule_payload.is_empty() { + let group_id = Uuid::new_v4(); + plan.groups.push(PlanGroup::new(group_id)); + } + + let mut demand = ResourceDemand::default(); + demand.new_regions = plan.groups.len() as u32; + plan.resource_demand = demand; + + Ok((plan, diff, demand)) + } + + async fn allocate_resources_stub(&self, _demand: ResourceDemand) -> Result { + Ok(true) + } + + fn spawn_group_stub_procedures(&self, groups: &[PlanGroupId]) -> Vec { + groups + .iter() + .map(|group_id| { + ProcedureWithId::with_random_id(Box::new(RepartitionGroupStubProcedure::new( + *group_id, + ))) + }) + .collect() + } + + fn table_lock_key(&self) -> Vec { + let mut lock_key = Vec::with_capacity(3); + let table_ref = self.data.task.table_ref(); + lock_key.push(CatalogLock::Read(table_ref.catalog).into()); + lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); + lock_key.push(TableLock::Write(self.data.task.table_id).into()); + + lock_key + } +} + +#[async_trait::async_trait] +impl Procedure for RepartitionProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { + let state = self.data.state; + let status = match state { + RepartitionState::Prepare => self.on_prepare().await, + RepartitionState::AllocateResources => self.on_allocate_resources().await, + RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await, + RepartitionState::CollectSubprocedures => { + self.on_collect_subprocedures(ctx).await + } + RepartitionState::Finalize => self.on_finalize().await, + RepartitionState::Finished => Ok(Status::done()), + }; + + status.map_err(map_to_procedure_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::new(self.table_lock_key()) + } +} + +/// Serialized data of the repartition procedure. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RepartitionData { + state: RepartitionState, + task: RepartitionTask, + #[serde(default)] + plan: Option, + #[serde(default)] + rule_diff: Option, + #[serde(default)] + resource_demand: Option, + #[serde(default)] + resource_allocated: bool, + #[serde(default)] + pending_groups: Vec, + #[serde(default)] + succeeded_groups: Vec, + #[serde(default)] + failed_groups: Vec, + #[serde(default)] + summary: Option, +} + +impl RepartitionData { + fn new(task: RepartitionTask) -> Self { + Self { + state: RepartitionState::Prepare, + task, + plan: None, + rule_diff: None, + resource_demand: None, + resource_allocated: false, + pending_groups: Vec::new(), + succeeded_groups: Vec::new(), + failed_groups: Vec::new(), + summary: None, + } + } +} + +/// High level states of the repartition procedure. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] +enum RepartitionState { + /// Validates request parameters and generates deterministic plan. + Prepare, + /// Coordinates with the resource provider (PaaS) for region allocation. + AllocateResources, + /// Submits work items to sub-procedures. + DispatchSubprocedures, + /// Collects and inspects sub-procedure results. + CollectSubprocedures, + /// Finalises metadata and emits summary. + Finalize, + /// Terminal state. + Finished, +} + +/// Task payload passed from the DDL entry point. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RepartitionTask { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: TableId, + /// Serialized representation of the new partition rule. + /// + /// The actual format will be determined later; currently we keep the raw + /// payload so that the procedure can be wired end-to-end. + pub new_rule_payload: Vec, +} + +impl RepartitionTask { + fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.catalog_name, + schema: &self.schema_name, + table: &self.table_name, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct RepartitionSummary { + succeeded_groups: Vec, + failed_groups: Vec, +} + +struct RepartitionGroupStubProcedure { + group_id: PlanGroupId, +} + +impl RepartitionGroupStubProcedure { + fn new(group_id: PlanGroupId) -> Self { + Self { group_id } + } +} + +#[async_trait::async_trait] +impl Procedure for RepartitionGroupStubProcedure { + fn type_name(&self) -> &str { + "metasrv-procedure::RepartitionGroupStub" + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + Ok(Status::done()) + } + + fn dump(&self) -> ProcedureResult { + Ok(self.group_id.to_string()) + } + + fn lock_key(&self) -> LockKey { + LockKey::default() + } +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 9ade13052d..a45464272c 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -31,6 +31,7 @@ use crate::ddl::create_flow::CreateFlowProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::create_view::CreateViewProcedure; +use crate::ddl::repartition::{RepartitionProcedure, RepartitionTask}; use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::drop_flow::DropFlowProcedure; use crate::ddl::drop_table::DropTableProcedure; @@ -181,7 +182,8 @@ impl DdlManager { TruncateTableProcedure, CreateDatabaseProcedure, DropDatabaseProcedure, - DropViewProcedure + DropViewProcedure, + RepartitionProcedure ); for (type_name, loader_factory) in loaders { @@ -210,6 +212,19 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + /// Submits and executes a repartition task. + #[tracing::instrument(skip_all)] + pub async fn submit_repartition_task( + &self, + repartition_task: RepartitionTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = RepartitionProcedure::new(repartition_task, context)?; + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + /// Submits and executes a create table task. #[tracing::instrument(skip_all)] pub async fn submit_create_table_task(