diff --git a/Cargo.lock b/Cargo.lock index d15f70b631..a76ce955e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7426,6 +7426,7 @@ dependencies = [ "local-ip-address", "once_cell", "parking_lot 0.12.4", + "partition", "prometheus", "prost 0.13.5", "rand 0.9.1", diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 94496454e2..e12331d4a2 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -43,7 +43,6 @@ pub mod drop_flow; pub mod drop_table; pub mod drop_view; pub mod flow_meta; -pub mod repartition; pub mod table_meta; #[cfg(any(test, feature = "testing"))] pub mod test_util; diff --git a/src/common/meta/src/ddl/repartition/plan.rs b/src/common/meta/src/ddl/repartition/plan.rs deleted file mode 100644 index 0e38192e75..0000000000 --- a/src/common/meta/src/ddl/repartition/plan.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use serde::{Deserialize, Serialize}; -use store_api::storage::{RegionId, TableId}; -use uuid::Uuid; - -/// Identifier of a plan group. -pub type PlanGroupId = Uuid; - -/// Logical description of the repartition plan. -/// -/// The plan is persisted by the procedure framework so it must remain -/// serializable/deserializable across versions. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct RepartitionPlan { - /// Identifier of the physical table to repartition. - pub table_id: TableId, - /// Deterministic hash of the generated plan. Used for idempotence checks. - pub plan_hash: String, - /// Plan groups to execute. Each group is independent from others. - pub groups: Vec, - /// Aggregate resource expectation. - pub resource_demand: ResourceDemand, -} - -impl RepartitionPlan { - /// Creates an empty plan. Primarily used in tests and early skeleton code. - pub fn empty(table_id: TableId) -> Self { - Self { - table_id, - plan_hash: String::new(), - groups: Vec::new(), - resource_demand: ResourceDemand::default(), - } - } - - /// Returns `true` if the plan does not contain any work. - pub fn is_trivial(&self) -> bool { - self.groups.is_empty() - } -} - -/// A group of repartition operations that can be executed as a sub-procedure. -/// -/// Groups are designed to be independent so that failure in one group does not -/// propagate to others. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct PlanGroup { - /// Stable identifier of the group. - pub group_id: PlanGroupId, - /// Regions that provide the data source. - pub source_regions: Vec, - /// Target regions that should exist after this group finishes. - pub target_regions: Vec, - /// Ordered list of logical changes required by this group. - pub changes: Vec, - /// Estimated resource demand contributed by this group. - pub resource_hint: ResourceDemand, -} - -impl PlanGroup { - /// Convenience constructor for skeleton code and tests. - pub fn new(group_id: PlanGroupId) -> Self { - Self { - group_id, - source_regions: Vec::new(), - target_regions: Vec::new(), - changes: Vec::new(), - resource_hint: ResourceDemand::default(), - } - } -} - -/// Diff between the old and the new partition rules. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] -pub struct PartitionRuleDiff { - /// Ordered list of changes to transform the old rule into the new rule. - pub changes: Vec, -} - -impl PartitionRuleDiff { - /// Returns `true` if there is no change between two rules. - pub fn is_empty(&self) -> bool { - self.changes.is_empty() - } -} - -/// Primitive repartition changes recognised by the planner. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum PartitionChange { - /// Split one region into multiple target regions. - Split { from: RegionId, to: Vec }, - /// Merge multiple regions into one target region. - Merge { from: Vec, to: RegionId }, - /// No-op placeholder for future operations (e.g. rule rewrite). - Unsupported, -} - -impl PartitionChange { - /// Returns the regions referenced by this change. - pub fn referenced_regions(&self) -> Vec { - match self { - PartitionChange::Split { from, to } => { - let mut regions = Vec::with_capacity(1 + to.len()); - regions.push(*from); - regions.extend(to.iter().copied()); - regions - } - PartitionChange::Merge { from, to } => { - let mut regions = Vec::with_capacity(from.len() + 1); - regions.extend(from.iter().copied()); - regions.push(*to); - regions - } - PartitionChange::Unsupported => Vec::new(), - } - } -} - -/// Resource estimation for executing a plan or a plan group. -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] -pub struct ResourceDemand { - /// Number of brand-new regions that must be allocated before execution. - pub new_regions: u32, - /// Rough estimate of data volume to rewrite (in bytes). - pub estimated_bytes: u64, -} diff --git a/src/common/meta/src/ddl/repartition/procedure.rs b/src/common/meta/src/ddl/repartition/procedure.rs deleted file mode 100644 index e447e62932..0000000000 --- a/src/common/meta/src/ddl/repartition/procedure.rs +++ /dev/null @@ -1,345 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! High-level state machine for table repartitioning. -//! -//! The [`RepartitionProcedure`] coordinates the full flow described in the RFC: -//! 1. **Prepare** – validate the request, lock the table, compute the rule diff, -//! and materialize a deterministic [`RepartitionPlan`]. -//! 2. **AllocateResources** – estimate [`ResourceDemand`] and talk to PaaS to -//! reserve the additional regions required by the plan. -//! 3. **DispatchSubprocedures** – split the plan into `PlanGroup`s, spawn -//! sub-procedures that stop compaction/snapshot, enter the no-ingestion -//! window, update metadata, push region-rule versions, generate new manifests, -//! and stage/acknowledge versioned writes via `RegionEdit`. -//! 4. **Finalize** – aggregate results, roll failed groups back (metadata, -//! manifests, staged data), restart unhealthy regions, unlock the table, and -//! trigger cache reload / optional compaction in the background. -//! -//! Each step is designed to be idempotent so the framework can retry safely. -//! This file currently contains the skeleton states and will be filled out with -//! concrete logic in follow-up patches. - -use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; -use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureWithId, Status}; -use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use store_api::storage::TableId; -use strum::AsRefStr; -use table::table_reference::TableReference; -use uuid::Uuid; - -use crate::ddl::DdlContext; -use crate::ddl::repartition::{ - PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand, -}; -use crate::ddl::utils::map_to_procedure_error; -use crate::error::Result; -use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; - -/// Procedure that orchestrates the repartition flow. -pub struct RepartitionProcedure { - #[allow(dead_code)] - context: DdlContext, - data: RepartitionData, -} - -impl RepartitionProcedure { - pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition"; - - pub fn new(task: RepartitionTask, context: DdlContext) -> Result { - Ok(Self { - context, - data: RepartitionData::new(task), - }) - } - - pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { - let data: RepartitionData = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(Self { context, data }) - } - - async fn on_prepare(&mut self) -> Result { - if self.data.plan.is_none() { - let (mut plan, diff, demand) = self.generate_plan_stub()?; - plan.plan_hash = format!( - "{}:{}:{}", - self.data.task.table_id, - plan.groups.len(), - self.data.task.new_rule_payload.len() - ); - - self.data.resource_demand = Some(demand); - self.data.rule_diff = Some(diff); - self.data.plan = Some(plan); - } - - self.data.state = RepartitionState::AllocateResources; - Ok(Status::executing(true)) - } - - async fn on_allocate_resources(&mut self) -> Result { - if !self.data.resource_allocated { - let demand = self.data.resource_demand.unwrap_or_default(); - let allocated = self.allocate_resources_stub(demand).await?; - if !allocated { - if let Some(plan) = &self.data.plan { - self.data - .failed_groups - .extend(plan.groups.iter().map(|group| group.group_id)); - } - self.data.state = RepartitionState::Finalize; - return Ok(Status::executing(true)); - } - self.data.resource_allocated = true; - } - - self.data.state = RepartitionState::DispatchSubprocedures; - Ok(Status::executing(true)) - } - - async fn on_dispatch_subprocedures(&mut self) -> Result { - let Some(plan) = self.data.plan.as_ref() else { - self.data.state = RepartitionState::Finalize; - return Ok(Status::executing(true)); - }; - - let groups_to_schedule: Vec = plan - .groups - .iter() - .filter(|group| { - !self.data.succeeded_groups.contains(&group.group_id) - && !self.data.failed_groups.contains(&group.group_id) - }) - .map(|group| group.group_id) - .collect(); - - if groups_to_schedule.is_empty() { - self.data.state = RepartitionState::Finalize; - return Ok(Status::executing(true)); - } - - let subprocedures = self.spawn_group_stub_procedures(&groups_to_schedule); - self.data.pending_groups = groups_to_schedule; - self.data.state = RepartitionState::CollectSubprocedures; - - Ok(Status::suspended(subprocedures, true)) - } - - async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> Result { - self.data - .succeeded_groups - .append(&mut self.data.pending_groups); - - self.data.state = RepartitionState::Finalize; - Ok(Status::executing(true)) - } - - async fn on_finalize(&mut self) -> Result { - self.data.summary = Some(RepartitionSummary { - succeeded_groups: self.data.succeeded_groups.clone(), - failed_groups: self.data.failed_groups.clone(), - }); - self.data.state = RepartitionState::Finished; - Ok(Status::done()) - } - - fn generate_plan_stub(&self) -> Result<(RepartitionPlan, PartitionRuleDiff, ResourceDemand)> { - let mut plan = RepartitionPlan::empty(self.data.task.table_id); - let diff = PartitionRuleDiff::default(); - - if !self.data.task.new_rule_payload.is_empty() { - let group_id = Uuid::new_v4(); - plan.groups.push(PlanGroup::new(group_id)); - } - - let demand = ResourceDemand { - new_regions: plan.groups.len() as u32, - ..Default::default() - }; - plan.resource_demand = demand; - - Ok((plan, diff, demand)) - } - - async fn allocate_resources_stub(&self, _demand: ResourceDemand) -> Result { - Ok(true) - } - - fn spawn_group_stub_procedures(&self, groups: &[PlanGroupId]) -> Vec { - groups - .iter() - .map(|group_id| { - ProcedureWithId::with_random_id(Box::new(RepartitionGroupStubProcedure::new( - *group_id, - ))) - }) - .collect() - } - - fn table_lock_key(&self) -> Vec { - let mut lock_key = Vec::with_capacity(3); - let table_ref = self.data.task.table_ref(); - lock_key.push(CatalogLock::Read(table_ref.catalog).into()); - lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); - lock_key.push(TableLock::Write(self.data.task.table_id).into()); - - lock_key - } -} - -#[async_trait::async_trait] -impl Procedure for RepartitionProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { - let state = self.data.state; - let status = match state { - RepartitionState::Prepare => self.on_prepare().await, - RepartitionState::AllocateResources => self.on_allocate_resources().await, - RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await, - RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await, - RepartitionState::Finalize => self.on_finalize().await, - RepartitionState::Finished => Ok(Status::done()), - }; - - status.map_err(map_to_procedure_error) - } - - fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.data).context(ToJsonSnafu) - } - - fn lock_key(&self) -> LockKey { - LockKey::new(self.table_lock_key()) - } -} - -/// Serialized data of the repartition procedure. -#[derive(Debug, Clone, Serialize, Deserialize)] -struct RepartitionData { - state: RepartitionState, - task: RepartitionTask, - #[serde(default)] - plan: Option, - #[serde(default)] - rule_diff: Option, - #[serde(default)] - resource_demand: Option, - #[serde(default)] - resource_allocated: bool, - #[serde(default)] - pending_groups: Vec, - #[serde(default)] - succeeded_groups: Vec, - #[serde(default)] - failed_groups: Vec, - #[serde(default)] - summary: Option, -} - -impl RepartitionData { - fn new(task: RepartitionTask) -> Self { - Self { - state: RepartitionState::Prepare, - task, - plan: None, - rule_diff: None, - resource_demand: None, - resource_allocated: false, - pending_groups: Vec::new(), - succeeded_groups: Vec::new(), - failed_groups: Vec::new(), - summary: None, - } - } -} - -/// High level states of the repartition procedure. -#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] -enum RepartitionState { - /// Validates request parameters and generates deterministic plan. - Prepare, - /// Coordinates with the resource provider (PaaS) for region allocation. - AllocateResources, - /// Submits work items to sub-procedures. - DispatchSubprocedures, - /// Collects and inspects sub-procedure results. - CollectSubprocedures, - /// Finalises metadata and emits summary. - Finalize, - /// Terminal state. - Finished, -} - -/// Task payload passed from the DDL entry point. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RepartitionTask { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub table_id: TableId, - /// Serialized representation of the new partition rule. - /// - /// The actual format will be determined later; currently we keep the raw - /// payload so that the procedure can be wired end-to-end. - pub new_rule_payload: Vec, -} - -impl RepartitionTask { - fn table_ref(&self) -> TableReference<'_> { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -struct RepartitionSummary { - succeeded_groups: Vec, - failed_groups: Vec, -} - -struct RepartitionGroupStubProcedure { - group_id: PlanGroupId, -} - -impl RepartitionGroupStubProcedure { - fn new(group_id: PlanGroupId) -> Self { - Self { group_id } - } -} - -#[async_trait::async_trait] -impl Procedure for RepartitionGroupStubProcedure { - fn type_name(&self) -> &str { - "metasrv-procedure::RepartitionGroupStub" - } - - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - Ok(Status::done()) - } - - fn dump(&self) -> ProcedureResult { - Ok(self.group_id.to_string()) - } - - fn lock_key(&self) -> LockKey { - LockKey::default() - } -} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 95cf4547e1..9ade13052d 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -35,7 +35,6 @@ use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::drop_flow::DropFlowProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::drop_view::DropViewProcedure; -use crate::ddl::repartition::{RepartitionProcedure, RepartitionTask}; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{DdlContext, utils}; use crate::error::{ @@ -182,8 +181,7 @@ impl DdlManager { TruncateTableProcedure, CreateDatabaseProcedure, DropDatabaseProcedure, - DropViewProcedure, - RepartitionProcedure + DropViewProcedure ); for (type_name, loader_factory) in loaders { @@ -212,19 +210,6 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - /// Submits and executes a repartition task. - #[tracing::instrument(skip_all)] - pub async fn submit_repartition_task( - &self, - repartition_task: RepartitionTask, - ) -> Result<(ProcedureId, Option)> { - let context = self.create_context(); - let procedure = RepartitionProcedure::new(repartition_task, context)?; - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - - self.submit_procedure(procedure_with_id).await - } - /// Submits and executes a create table task. #[tracing::instrument(skip_all)] pub async fn submit_create_table_task( diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 90a4fdc17b..03b9282fc7 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -44,6 +44,7 @@ common-time.workspace = true common-version.workspace = true common-wal.workspace = true common-workload.workspace = true +partition.workspace = true dashmap.workspace = true datatypes.workspace = true deadpool = { workspace = true, optional = true } diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 88869d8482..da1a1b00e7 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef; use snafu::ResultExt; pub mod region_migration; +pub mod repartition; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs new file mode 100644 index 0000000000..cc666d1a8b --- /dev/null +++ b/src/meta-srv/src/procedure/repartition.rs @@ -0,0 +1,456 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod context; +mod group; +mod plan; + +use common_catalog::format_full_table_name; +use common_meta::ddl::DdlContext; +use common_meta::ddl::utils::map_to_procedure_error; +use common_meta::error::{self, Result as MetaResult}; +use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock}; +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureWithId, Status}; +use partition::expr::PartitionExpr; +use partition::subtask; +use serde::{Deserialize, Serialize}; +use serde_json; +use snafu::ResultExt; +use store_api::storage::TableId; +use strum::AsRefStr; +use table::table_reference::TableReference; +use uuid::Uuid; + +use self::context::RepartitionContext; +use self::group::RepartitionGroupProcedure; +use self::plan::{ + PartitionRuleDiff, PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand, +}; + +/// Procedure that orchestrates the repartition flow. +pub struct RepartitionProcedure { + context: DdlContext, + group_context: RepartitionContext, + data: RepartitionData, +} + +impl RepartitionProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition"; + + pub fn new(task: RepartitionTask, context: DdlContext) -> MetaResult { + let group_context = RepartitionContext::new(&context); + Ok(Self { + context, + group_context, + data: RepartitionData::new(task), + }) + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data: RepartitionData = serde_json::from_str(json).context(FromJsonSnafu)?; + let group_context = RepartitionContext::new(&context); + Ok(Self { + context, + group_context, + data, + }) + } + + async fn on_prepare(&mut self) -> MetaResult { + if self.data.plan.is_none() { + self.build_plan().await?; + } + + self.data.state = RepartitionState::AllocateResources; + Ok(Status::executing(true)) + } + + async fn on_allocate_resources(&mut self) -> MetaResult { + if !self.data.resource_allocated { + let demand = self.data.resource_demand.unwrap_or_default(); + let allocated = self.allocate_resources(demand).await?; + if !allocated { + if let Some(plan) = &self.data.plan { + self.data + .failed_groups + .extend(plan.entries.iter().map(|entry| entry.group_id)); + } + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + } + self.data.resource_allocated = true; + } + + self.data.state = RepartitionState::DispatchSubprocedures; + Ok(Status::executing(true)) + } + + async fn on_dispatch_subprocedures(&mut self) -> MetaResult { + let plan = match self.data.plan.as_ref() { + Some(plan) => plan, + None => { + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + } + }; + + let groups_to_schedule: Vec = plan + .entries + .iter() + .filter(|entry| { + !self.data.succeeded_groups.contains(&entry.group_id) + && !self.data.failed_groups.contains(&entry.group_id) + }) + .map(|entry| entry.group_id) + .collect(); + + if groups_to_schedule.is_empty() { + self.data.state = RepartitionState::Finalize; + return Ok(Status::executing(true)); + } + + let subprocedures = self.spawn_group_procedures(plan, &groups_to_schedule); + self.data.pending_groups = groups_to_schedule; + self.data.state = RepartitionState::CollectSubprocedures; + + Ok(Status::suspended(subprocedures, true)) + } + + async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> MetaResult { + self.data + .succeeded_groups + .append(&mut self.data.pending_groups); + + self.data.state = RepartitionState::Finalize; + Ok(Status::executing(true)) + } + + async fn on_finalize(&mut self) -> MetaResult { + self.data.summary = Some(RepartitionSummary { + succeeded_groups: self.data.succeeded_groups.clone(), + failed_groups: self.data.failed_groups.clone(), + }); + self.data.state = RepartitionState::Finished; + Ok(Status::done()) + } + + async fn build_plan(&mut self) -> MetaResult<()> { + let table_id = self.data.task.table_id; + let table_ref = self.data.task.table_ref(); + let table_name_str = + format_full_table_name(table_ref.catalog, table_ref.schema, table_ref.table); + + self.context + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await? + .ok_or_else(|| { + error::TableNotFoundSnafu { + table_name: table_name_str.clone(), + } + .build() + })?; + + let (physical_table_id, physical_route) = self + .context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await?; + + let from_exprs_json = self.data.task.from_exprs_json.clone(); + let into_exprs_json = self.data.task.into_exprs_json.clone(); + + let from_exprs = Self::deserialize_partition_exprs(&from_exprs_json)?; + let into_exprs = Self::deserialize_partition_exprs(&into_exprs_json)?; + + let existing_regions = physical_route + .region_routes + .iter() + .map(|route| (route.region.id, route.region.partition_expr())) + .collect::>(); + + let mut used_regions = vec![false; existing_regions.len()]; + let mut source_descriptor_by_index = Vec::with_capacity(from_exprs_json.len()); + for expr_json in &from_exprs_json { + let mut found = None; + for (idx, (region_id, expr)) in existing_regions.iter().enumerate() { + if !used_regions[idx] && expr == expr_json { + found = Some((*region_id, expr.clone())); + used_regions[idx] = true; + break; + } + } + + let (region_id, partition_expr_json) = found.ok_or_else(|| { + error::UnsupportedSnafu { + operation: format!( + "repartition source expression '{}' does not match any existing region", + expr_json + ), + } + .build() + })?; + + source_descriptor_by_index.push(RegionDescriptor { + region_id: Some(region_id), + partition_expr_json, + }); + } + + let subtasks = subtask::create_subtasks(&from_exprs, &into_exprs).map_err(|err| { + error::UnsupportedSnafu { + operation: format!("create_subtasks failed: {err}"), + } + .build() + })?; + + let mut plan = RepartitionPlan::empty(physical_table_id); + let mut diff = PartitionRuleDiff::default(); + let mut demand = ResourceDemand::default(); + + for subtask in subtasks { + let group_id = Uuid::new_v4(); + + let sources = subtask + .from_expr_indices + .iter() + .map(|&idx| source_descriptor_by_index[idx].clone()) + .collect::>(); + + let targets = subtask + .to_expr_indices + .iter() + .enumerate() + .map(|(position, &idx)| { + let reused_region = if subtask.from_expr_indices.len() == 1 { + if position == 0 { + sources.get(0).and_then(|descriptor| descriptor.region_id) + } else { + None + } + } else if subtask.to_expr_indices.len() == 1 { + sources.first().and_then(|descriptor| descriptor.region_id) + } else { + sources + .get(position) + .and_then(|descriptor| descriptor.region_id) + }; + + RegionDescriptor { + region_id: reused_region, + partition_expr_json: into_exprs_json[idx].clone(), + } + }) + .collect::>(); + + let entry = PlanEntry::new(group_id, subtask, sources, targets); + demand.add_entry(&entry); + diff.entries.push(group_id); + plan.entries.push(entry); + } + + plan.resource_demand = demand; + plan.route_snapshot = physical_route.clone(); + plan.plan_hash = format!( + "{}:{}->{}", + physical_table_id, + Self::expr_signature(&from_exprs_json), + Self::expr_signature(&into_exprs_json) + ); + + self.data.plan = Some(plan); + self.data.rule_diff = Some(diff); + self.data.resource_demand = Some(demand); + + Ok(()) + } + + async fn allocate_resources(&self, _demand: ResourceDemand) -> MetaResult { + Ok(true) + } + + fn spawn_group_procedures( + &self, + plan: &RepartitionPlan, + group_ids: &[PlanGroupId], + ) -> Vec { + group_ids + .iter() + .filter_map(|group_id| { + plan.entries + .iter() + .find(|entry| entry.group_id == *group_id) + .cloned() + }) + .map(|entry| { + let route_snapshot = plan.route_snapshot.region_routes.clone(); + ProcedureWithId::with_random_id(Box::new(RepartitionGroupProcedure::new( + entry, + plan.table_id, + route_snapshot, + self.group_context.clone(), + ))) + }) + .collect() + } + + fn table_lock_key(&self) -> Vec { + let mut lock_key = Vec::with_capacity(3); + let table_ref = self.data.task.table_ref(); + lock_key.push(CatalogLock::Read(table_ref.catalog).into()); + lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); + lock_key.push(TableLock::Write(self.data.task.table_id).into()); + + lock_key + } + + fn deserialize_partition_exprs(exprs_json: &[String]) -> MetaResult> { + exprs_json + .iter() + .map(|json| { + let expr = PartitionExpr::from_json_str(json).map_err(|err| { + error::UnsupportedSnafu { + operation: format!( + "deserialize partition expression '{json}' failed: {err}" + ), + } + .build() + })?; + + expr.ok_or_else(|| { + error::UnsupportedSnafu { + operation: format!("empty partition expression '{json}'"), + } + .build() + }) + }) + .collect() + } + + fn expr_signature(exprs: &[String]) -> String { + exprs.join("|") + } +} + +#[async_trait::async_trait] +impl Procedure for RepartitionProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { + let state = self.data.state; + let status = match state { + RepartitionState::Prepare => self.on_prepare().await, + RepartitionState::AllocateResources => self.on_allocate_resources().await, + RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await, + RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await, + RepartitionState::Finalize => self.on_finalize().await, + RepartitionState::Finished => Ok(Status::done()), + }; + + status.map_err(map_to_procedure_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::new(self.table_lock_key()) + } +} + +/// Serialized data of the repartition procedure. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RepartitionData { + state: RepartitionState, + task: RepartitionTask, + #[serde(default)] + plan: Option, + #[serde(default)] + rule_diff: Option, + #[serde(default)] + resource_demand: Option, + #[serde(default)] + resource_allocated: bool, + #[serde(default)] + pending_groups: Vec, + #[serde(default)] + succeeded_groups: Vec, + #[serde(default)] + failed_groups: Vec, + #[serde(default)] + summary: Option, +} + +impl RepartitionData { + fn new(task: RepartitionTask) -> Self { + Self { + state: RepartitionState::Prepare, + task, + plan: None, + rule_diff: None, + resource_demand: None, + resource_allocated: false, + pending_groups: Vec::new(), + succeeded_groups: Vec::new(), + failed_groups: Vec::new(), + summary: None, + } + } +} + +/// High level states of the repartition procedure. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] +enum RepartitionState { + Prepare, + AllocateResources, + DispatchSubprocedures, + CollectSubprocedures, + Finalize, + Finished, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct RepartitionSummary { + succeeded_groups: Vec, + failed_groups: Vec, +} + +/// Task payload passed from the DDL entry point. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RepartitionTask { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: TableId, + /// Partition expressions of source regions (JSON encoded `PartitionExpr`). + pub from_exprs_json: Vec, + /// Partition expressions of target regions (JSON encoded `PartitionExpr`). + pub into_exprs_json: Vec, +} + +impl RepartitionTask { + fn table_ref(&self) -> TableReference<'_> { + TableReference { + catalog: &self.catalog_name, + schema: &self.schema_name, + table: &self.table_name, + } + } +} diff --git a/src/common/meta/src/ddl/repartition.rs b/src/meta-srv/src/procedure/repartition/context.rs similarity index 53% rename from src/common/meta/src/ddl/repartition.rs rename to src/meta-srv/src/procedure/repartition/context.rs index 984caa50db..f36748c62f 100644 --- a/src/common/meta/src/ddl/repartition.rs +++ b/src/meta-srv/src/procedure/repartition/context.rs @@ -12,10 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod plan; -pub mod procedure; +use common_meta::ddl::DdlContext; +use common_meta::key::TableMetadataManagerRef; +use common_meta::node_manager::NodeManagerRef; -pub use plan::{ - PartitionChange, PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand, -}; -pub use procedure::{RepartitionProcedure, RepartitionTask}; +#[derive(Clone)] +pub struct RepartitionContext { + pub table_metadata_manager: TableMetadataManagerRef, + pub _node_manager: NodeManagerRef, +} + +impl RepartitionContext { + pub fn new(context: &DdlContext) -> Self { + Self { + table_metadata_manager: context.table_metadata_manager.clone(), + _node_manager: context.node_manager.clone(), + } + } +} diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs new file mode 100644 index 0000000000..cc5dfe1d91 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -0,0 +1,377 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; + +use common_meta::ddl::utils::map_to_procedure_error; +use common_meta::error::{self, Result as MetaResult}; +use common_meta::rpc::router::RegionRoute; +use common_procedure::error::{Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::storage::{RegionId, TableId}; +use strum::AsRefStr; + +use crate::procedure::repartition::context::RepartitionContext; +use crate::procedure::repartition::plan::PlanEntry; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] +pub enum GroupState { + Prepare, + Freeze, + UpdateMetadata, + UpdateRegionRule, + UpdateManifests, + Confirm, + Cleanup, + Finished, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct GroupProcedureData { + table_id: TableId, + entry: PlanEntry, + state: GroupState, + route_snapshot: Vec, + #[serde(default)] + prepare_result: Option, + #[serde(default)] + freeze_completed: bool, + #[serde(default)] + metadata_updated: bool, + #[serde(default)] + staged_regions: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct GroupPrepareResult { + source_routes: Vec, + target_routes: Vec>, + central_region: u64, +} + +pub struct RepartitionGroupProcedure { + context: RepartitionContext, + data: GroupProcedureData, +} + +impl RepartitionGroupProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup"; + + pub fn new( + entry: PlanEntry, + table_id: TableId, + route_snapshot: Vec, + context: RepartitionContext, + ) -> Self { + Self { + context, + data: GroupProcedureData { + table_id, + entry, + state: GroupState::Prepare, + route_snapshot, + prepare_result: None, + freeze_completed: false, + metadata_updated: false, + staged_regions: Vec::new(), + }, + } + } + + pub async fn step(&mut self) -> MetaResult { + match self.data.state { + GroupState::Prepare => { + self.on_prepare().await?; + self.data.state = GroupState::Freeze; + Ok(Status::executing(true)) + } + GroupState::Freeze => { + self.on_freeze().await?; + self.data.state = GroupState::UpdateMetadata; + Ok(Status::executing(true)) + } + GroupState::UpdateMetadata => { + self.on_update_metadata().await?; + self.data.state = GroupState::UpdateRegionRule; + Ok(Status::executing(true)) + } + GroupState::UpdateRegionRule => { + self.on_update_region_rule().await?; + self.data.state = GroupState::UpdateManifests; + Ok(Status::executing(true)) + } + GroupState::UpdateManifests => { + self.on_update_manifests().await?; + self.data.state = GroupState::Confirm; + Ok(Status::executing(true)) + } + GroupState::Confirm => { + self.on_confirm().await?; + self.data.state = GroupState::Cleanup; + Ok(Status::executing(true)) + } + GroupState::Cleanup => { + self.on_cleanup().await?; + self.data.state = GroupState::Finished; + Ok(Status::done()) + } + GroupState::Finished => Ok(Status::done()), + } + } + + async fn on_prepare(&mut self) -> MetaResult<()> { + if self.data.prepare_result.is_some() { + return Ok(()); + } + + let (_, latest_route) = self + .context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(self.data.table_id) + .await?; + + self.data.route_snapshot = latest_route.region_routes.clone(); + + let mut source_routes = Vec::with_capacity(self.data.entry.sources.len()); + for descriptor in &self.data.entry.sources { + let Some(region_id) = descriptor.region_id else { + return Err(error::UnsupportedSnafu { + operation: format!("group {} lacks source region id", self.data.entry.group_id), + } + .build()); + }; + + let Some(route) = latest_route + .region_routes + .iter() + .find(|route| route.region.id == region_id) + else { + return Err(error::UnsupportedSnafu { + operation: format!( + "group {} source region {region_id} missing from route snapshot", + self.data.entry.group_id + ), + } + .build()); + }; + source_routes.push(route.clone()); + } + + if source_routes.is_empty() { + return Err(error::UnsupportedSnafu { + operation: format!( + "group {} has no source regions after planning", + self.data.entry.group_id + ), + } + .build()); + } + + let mut target_routes = Vec::with_capacity(self.data.entry.targets.len()); + for descriptor in &self.data.entry.targets { + if let Some(region_id) = descriptor.region_id { + let Some(route) = latest_route + .region_routes + .iter() + .find(|route| route.region.id == region_id) + else { + return Err(error::UnsupportedSnafu { + operation: format!( + "group {} target region {region_id} missing from route snapshot", + self.data.entry.group_id + ), + } + .build()); + }; + + target_routes.push(Some(route.clone())); + } else { + target_routes.push(None); + } + } + + let central_region = source_routes[0].region.id.as_u64(); + self.data.prepare_result = Some(GroupPrepareResult { + source_routes, + target_routes, + central_region, + }); + + Ok(()) + } + + async fn on_freeze(&mut self) -> MetaResult<()> { + if self.data.freeze_completed { + return Ok(()); + } + + let prepare_result = self.prepare_result()?; + + for route in &prepare_result.source_routes { + self.pause_region(route).await?; + } + + for route in prepare_result.target_routes.iter().flatten() { + self.pause_region(route).await?; + } + + self.data.freeze_completed = true; + Ok(()) + } + + async fn on_update_metadata(&mut self) -> MetaResult<()> { + if self.data.metadata_updated { + return Ok(()); + } + + let prepare_result = self.prepare_result()?; + let region_ids = self.collect_existing_region_ids(prepare_result); + if region_ids.is_empty() { + self.data.metadata_updated = true; + return Ok(()); + } + + let mut staged_set: BTreeSet = self.data.staged_regions.iter().copied().collect(); + let route_manager = self.context.table_metadata_manager.table_route_manager(); + + for region_id in ®ion_ids { + if staged_set.insert(*region_id) { + route_manager + .set_region_staging_state(*region_id, true) + .await?; + } + } + + Self::mark_regions_staging(&mut self.data.route_snapshot, ®ion_ids); + self.data.staged_regions = staged_set.into_iter().collect(); + + self.data.metadata_updated = true; + Ok(()) + } + + async fn on_update_region_rule(&mut self) -> MetaResult<()> { + // TODO: Push new region rules to datanodes and reject outdated writes. + Ok(()) + } + + async fn on_update_manifests(&mut self) -> MetaResult<()> { + // TODO: Generate and submit new manifests for target regions. + Ok(()) + } + + async fn on_confirm(&mut self) -> MetaResult<()> { + // TODO: Confirm staged writes, resume ingestion, and record success. + Ok(()) + } + + async fn on_cleanup(&mut self) -> MetaResult<()> { + let prepare_result = self.prepare_result()?; + + for route in &prepare_result.source_routes { + self.resume_region(route).await?; + } + for route in prepare_result.target_routes.iter().flatten() { + self.resume_region(route).await?; + } + + if !self.data.staged_regions.is_empty() { + let route_manager = self.context.table_metadata_manager.table_route_manager(); + + for region_id in &self.data.staged_regions { + route_manager + .set_region_staging_state(*region_id, false) + .await?; + } + + Self::clear_regions_staging(&mut self.data.route_snapshot, &self.data.staged_regions); + self.data.staged_regions.clear(); + self.data.metadata_updated = false; + } + + self.data.freeze_completed = false; + Ok(()) + } + + fn prepare_result(&self) -> MetaResult<&GroupPrepareResult> { + self.data.prepare_result.as_ref().ok_or_else(|| { + error::UnsupportedSnafu { + operation: format!( + "group {} is missing prepare context", + self.data.entry.group_id + ), + } + .build() + }) + } + + async fn pause_region(&self, _route: &RegionRoute) -> MetaResult<()> { + // TODO: invoke datanode RPC to pause compaction and snapshot for the region. + Ok(()) + } + + async fn resume_region(&self, _route: &RegionRoute) -> MetaResult<()> { + // TODO: invoke datanode RPC to resume compaction and snapshot for the region. + Ok(()) + } + + fn mark_regions_staging(routes: &mut [RegionRoute], region_ids: &[RegionId]) { + for region_id in region_ids.iter().copied() { + if let Some(route) = routes.iter_mut().find(|route| route.region.id == region_id) { + route.set_leader_staging(); + } + } + } + + fn clear_regions_staging(routes: &mut [RegionRoute], region_ids: &[RegionId]) { + for region_id in region_ids.iter().copied() { + if let Some(route) = routes.iter_mut().find(|route| route.region.id == region_id) { + route.clear_leader_staging(); + } + } + } + + fn collect_existing_region_ids(&self, prepare_result: &GroupPrepareResult) -> Vec { + let mut set = BTreeSet::new(); + for route in &prepare_result.source_routes { + set.insert(route.region.id); + } + for route in prepare_result.target_routes.iter().flatten() { + set.insert(route.region.id); + } + set.into_iter().collect() + } +} + +#[async_trait::async_trait] +impl Procedure for RepartitionGroupProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let status = self.step().await.map_err(map_to_procedure_error)?; + Ok(status) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::default() + } +} diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs new file mode 100644 index 0000000000..95de4adb4c --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -0,0 +1,100 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::key::table_route::PhysicalTableRouteValue; +use partition::subtask::RepartitionSubtask; +use serde::{Deserialize, Serialize}; +use store_api::storage::{RegionId, TableId}; +use uuid::Uuid; + +/// Identifier of a plan group. +pub type PlanGroupId = Uuid; + +/// Logical description of the repartition plan. +/// +/// The plan is persisted by the procedure framework so it must remain +/// serializable/deserializable across versions. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct RepartitionPlan { + pub table_id: TableId, + pub plan_hash: String, + pub entries: Vec, + pub resource_demand: ResourceDemand, + pub route_snapshot: PhysicalTableRouteValue, +} + +impl RepartitionPlan { + pub fn empty(table_id: TableId) -> Self { + Self { + table_id, + plan_hash: String::new(), + entries: Vec::new(), + resource_demand: ResourceDemand::default(), + route_snapshot: PhysicalTableRouteValue::default(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct PlanEntry { + pub group_id: PlanGroupId, + pub subtask: RepartitionSubtask, + pub sources: Vec, + pub targets: Vec, +} + +impl PlanEntry { + pub fn new( + group_id: PlanGroupId, + subtask: RepartitionSubtask, + sources: Vec, + targets: Vec, + ) -> Self { + Self { + group_id, + subtask, + sources, + targets, + } + } + + pub fn implied_new_regions(&self) -> u32 { + self.targets + .iter() + .filter(|target| target.region_id.is_none()) + .count() as u32 + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RegionDescriptor { + pub region_id: Option, + pub partition_expr_json: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct PartitionRuleDiff { + pub entries: Vec, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct ResourceDemand { + pub new_regions: u32, +} + +impl ResourceDemand { + pub fn add_entry(&mut self, entry: &PlanEntry) { + self.new_regions = self.new_regions.saturating_add(entry.implied_new_regions()); + } +} diff --git a/src/partition/src/subtask.rs b/src/partition/src/subtask.rs index e74e9872f5..4d9e22821e 100644 --- a/src/partition/src/subtask.rs +++ b/src/partition/src/subtask.rs @@ -14,12 +14,14 @@ use std::collections::VecDeque; +use serde::{Deserialize, Serialize}; + use crate::error::Result; use crate::expr::PartitionExpr; use crate::overlap::associate_from_to; /// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RepartitionSubtask { pub from_expr_indices: Vec, pub to_expr_indices: Vec,