From e7f685134a6558c9a4563216da1cd3ae134c15cd Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 31 Oct 2025 04:34:08 +0000 Subject: [PATCH] feat: repartition procedure related structs (wip) Signed-off-by: Zhenchi --- Cargo.lock | 1 + src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/procedure.rs | 1 + src/meta-srv/src/procedure/repartition.rs | 79 +++++++++++++++++++ .../src/procedure/repartition/plan.rs | 37 +++++++++ .../src/procedure/repartition/unit.rs | 37 +++++++++ src/partition/src/subtask.rs | 4 +- 7 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 src/meta-srv/src/procedure/repartition.rs create mode 100644 src/meta-srv/src/procedure/repartition/plan.rs create mode 100644 src/meta-srv/src/procedure/repartition/unit.rs diff --git a/Cargo.lock b/Cargo.lock index f4ff29fb70..6bf7453138 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7431,6 +7431,7 @@ dependencies = [ "local-ip-address", "once_cell", "parking_lot 0.12.4", + "partition", "prometheus", "prost 0.13.5", "rand 0.9.1", diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index bd2075501c..bcd961fc6c 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -63,6 +63,7 @@ itertools.workspace = true lazy_static.workspace = true once_cell.workspace = true parking_lot.workspace = true +partition.workspace = true prometheus.workspace = true prost.workspace = true rand.workspace = true diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 88869d8482..da1a1b00e7 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef; use snafu::ResultExt; pub mod region_migration; +pub mod repartition; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs new file mode 100644 index 0000000000..bb0381f3d8 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod plan; +mod unit; + +use std::collections::HashMap; + +use common_meta::ddl::DdlContext; +use common_procedure::ProcedureId; +use partition::expr::PartitionExpr; +use serde::{Deserialize, Serialize}; +use store_api::storage::TableId; +use strum::AsRefStr; + +use crate::procedure::repartition::plan::RepartitionPlan; +use crate::procedure::repartition::unit::RepartitionUnitId; + +/// Task payload passed from the DDL entry point. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RepartitionTask { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: TableId, + /// Partition expressions representing the source regions. + pub from_exprs: Vec, + /// Partition expressions representing the target regions. + pub into_exprs: Vec, +} + +/// Serialized data of the repartition procedure. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RepartitionData { + state: RepartitionState, + task: RepartitionTask, + #[serde(default)] + plan: Option, + #[serde(default)] + resource_allocated: bool, + #[serde(default)] + pending_units: Vec, + #[serde(default)] + succeeded_units: Vec, + #[serde(default)] + failed_units: Vec, + #[serde(default)] + rollback_triggered: bool, + #[serde(default)] + group_subprocedures: HashMap, +} + +/// High level states of the repartition procedure. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] +enum RepartitionState { + Prepare, + AllocateResources, + DispatchSubprocedures, + CollectSubprocedures, + Finalize, + Finished, +} + +/// Procedure that orchestrates the repartition flow. +pub struct RepartitionProcedure { + context: DdlContext, + data: RepartitionData, +} diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs new file mode 100644 index 0000000000..6225ca3c6f --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::key::table_route::PhysicalTableRouteValue; +use serde::{Deserialize, Serialize}; + +use crate::procedure::repartition::unit::RepartitionUnit; + +/// Logical description of the repartition plan. +/// +/// The plan is persisted by the procedure framework so it must remain +/// serializable/deserializable across versions. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct RepartitionPlan { + units: Vec, + resource_demand: ResourceDemand, + route_snapshot: PhysicalTableRouteValue, + +} + +/// Resources required to execute the plan. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct ResourceDemand { + // TODO(zhongzc): add fields here. +} + diff --git a/src/meta-srv/src/procedure/repartition/unit.rs b/src/meta-srv/src/procedure/repartition/unit.rs new file mode 100644 index 0000000000..8a2a4b290a --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/unit.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use partition::expr::PartitionExpr; +use partition::subtask::RepartitionSubtask; +use serde::{Deserialize, Serialize}; +use store_api::storage::RegionId; +use uuid::Uuid; + +pub type RepartitionUnitId = Uuid; + +/// A shard of a repartition plan. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RepartitionUnit { + pub id: RepartitionUnitId, + pub subtask: RepartitionSubtask, + pub sources: Vec, + pub targets: Vec, +} + +/// Metadata describing a region involved in the plan. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RegionDescriptor { + pub region_id: Option, + pub partition_expr: PartitionExpr, +} diff --git a/src/partition/src/subtask.rs b/src/partition/src/subtask.rs index e74e9872f5..4d9e22821e 100644 --- a/src/partition/src/subtask.rs +++ b/src/partition/src/subtask.rs @@ -14,12 +14,14 @@ use std::collections::VecDeque; +use serde::{Deserialize, Serialize}; + use crate::error::Result; use crate::expr::PartitionExpr; use crate::overlap::associate_from_to; /// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RepartitionSubtask { pub from_expr_indices: Vec, pub to_expr_indices: Vec,