feat: scaffold repartition procedure with plan/resource stubs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-10-13 02:41:02 +00:00
parent aa84642afc
commit d0877997a2
6 changed files with 527 additions and 1 deletions

View File

@@ -87,6 +87,7 @@ tokio-postgres-rustls = { version = "0.12", optional = true }
tonic.workspace = true
tracing.workspace = true
typetag.workspace = true
uuid.workspace = true
[dev-dependencies]
chrono.workspace = true

View File

@@ -44,6 +44,7 @@ pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
pub mod table_meta;
pub mod repartition;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]

View File

@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod plan;
pub mod procedure;
pub use plan::{
PartitionChange, PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand,
};
pub use procedure::{RepartitionProcedure, RepartitionTask};

View File

@@ -0,0 +1,145 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, TableId};
use uuid::Uuid;
/// Identifier of a plan group.
pub type PlanGroupId = Uuid;
/// Logical description of the repartition plan.
///
/// The plan is persisted by the procedure framework so it must remain
/// serializable/deserializable across versions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RepartitionPlan {
/// Identifier of the physical table to repartition.
pub table_id: TableId,
/// Deterministic hash of the generated plan. Used for idempotence checks.
pub plan_hash: String,
/// Plan groups to execute. Each group is independent from others.
pub groups: Vec<PlanGroup>,
/// Aggregate resource expectation.
pub resource_demand: ResourceDemand,
}
impl RepartitionPlan {
/// Creates an empty plan. Primarily used in tests and early skeleton code.
pub fn empty(table_id: TableId) -> Self {
Self {
table_id,
plan_hash: String::new(),
groups: Vec::new(),
resource_demand: ResourceDemand::default(),
}
}
/// Returns `true` if the plan does not contain any work.
pub fn is_trivial(&self) -> bool {
self.groups.is_empty()
}
}
/// A group of repartition operations that can be executed as a sub-procedure.
///
/// Groups are designed to be independent so that failure in one group does not
/// propagate to others.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PlanGroup {
/// Stable identifier of the group.
pub group_id: PlanGroupId,
/// Regions that provide the data source.
pub source_regions: Vec<RegionId>,
/// Target regions that should exist after this group finishes.
pub target_regions: Vec<RegionId>,
/// Ordered list of logical changes required by this group.
pub changes: Vec<PartitionChange>,
/// Estimated resource demand contributed by this group.
pub resource_hint: ResourceDemand,
}
impl PlanGroup {
/// Convenience constructor for skeleton code and tests.
pub fn new(group_id: PlanGroupId) -> Self {
Self {
group_id,
source_regions: Vec::new(),
target_regions: Vec::new(),
changes: Vec::new(),
resource_hint: ResourceDemand::default(),
}
}
}
/// Diff between the old and the new partition rules.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct PartitionRuleDiff {
/// Ordered list of changes to transform the old rule into the new rule.
pub changes: Vec<PartitionChange>,
}
impl PartitionRuleDiff {
/// Returns `true` if there is no change between two rules.
pub fn is_empty(&self) -> bool {
self.changes.is_empty()
}
}
/// Primitive repartition changes recognised by the planner.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum PartitionChange {
/// Split one region into multiple target regions.
Split {
from: RegionId,
to: Vec<RegionId>,
},
/// Merge multiple regions into one target region.
Merge {
from: Vec<RegionId>,
to: RegionId,
},
/// No-op placeholder for future operations (e.g. rule rewrite).
Unsupported,
}
impl PartitionChange {
/// Returns the regions referenced by this change.
pub fn referenced_regions(&self) -> Vec<RegionId> {
match self {
PartitionChange::Split { from, to } => {
let mut regions = Vec::with_capacity(1 + to.len());
regions.push(*from);
regions.extend(to.iter().copied());
regions
}
PartitionChange::Merge { from, to } => {
let mut regions = Vec::with_capacity(from.len() + 1);
regions.extend(from.iter().copied());
regions.push(*to);
regions
}
PartitionChange::Unsupported => Vec::new(),
}
}
}
/// Resource estimation for executing a plan or a plan group.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct ResourceDemand {
/// Number of brand-new regions that must be allocated before execution.
pub new_regions: u32,
/// Rough estimate of data volume to rewrite (in bytes).
pub estimated_bytes: u64,
}

View File

@@ -0,0 +1,343 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! High-level state machine for table repartitioning.
//!
//! The [`RepartitionProcedure`] coordinates the full flow described in the RFC:
//! 1. **Prepare** validate the request, lock the table, compute the rule diff,
//! and materialize a deterministic [`RepartitionPlan`].
//! 2. **AllocateResources** estimate [`ResourceDemand`] and talk to PaaS to
//! reserve the additional regions required by the plan.
//! 3. **DispatchSubprocedures** split the plan into `PlanGroup`s, spawn
//! sub-procedures that stop compaction/snapshot, enter the no-ingestion
//! window, update metadata, push region-rule versions, generate new manifests,
//! and stage/acknowledge versioned writes via `RegionEdit`.
//! 4. **Finalize** aggregate results, roll failed groups back (metadata,
//! manifests, staged data), restart unhealthy regions, unlock the table, and
//! trigger cache reload / optional compaction in the background.
//!
//! Each step is designed to be idempotent so the framework can retry safely.
//! This file currently contains the skeleton states and will be filled out with
//! concrete logic in follow-up patches.
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureWithId, Status};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::TableId;
use strum::AsRefStr;
use table::table_reference::TableReference;
use uuid::Uuid;
use crate::ddl::repartition::{PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand};
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
/// Procedure that orchestrates the repartition flow.
pub struct RepartitionProcedure {
#[allow(dead_code)]
context: DdlContext,
data: RepartitionData,
}
impl RepartitionProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
pub fn new(task: RepartitionTask, context: DdlContext) -> Result<Self> {
Ok(Self {
context,
data: RepartitionData::new(task),
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: RepartitionData = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
async fn on_prepare(&mut self) -> Result<Status> {
if self.data.plan.is_none() {
let (mut plan, diff, demand) = self.generate_plan_stub()?;
plan.plan_hash = format!(
"{}:{}:{}",
self.data.task.table_id,
plan.groups.len(),
self.data.task.new_rule_payload.len()
);
self.data.resource_demand = Some(demand);
self.data.rule_diff = Some(diff);
self.data.plan = Some(plan);
}
self.data.state = RepartitionState::AllocateResources;
Ok(Status::executing(true))
}
async fn on_allocate_resources(&mut self) -> Result<Status> {
if !self.data.resource_allocated {
let demand = self.data.resource_demand.unwrap_or_default();
let allocated = self.allocate_resources_stub(demand).await?;
if !allocated {
if let Some(plan) = &self.data.plan {
self.data
.failed_groups
.extend(plan.groups.iter().map(|group| group.group_id));
}
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
self.data.resource_allocated = true;
}
self.data.state = RepartitionState::DispatchSubprocedures;
Ok(Status::executing(true))
}
async fn on_dispatch_subprocedures(&mut self) -> Result<Status> {
let Some(plan) = self.data.plan.as_ref() else {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
};
let groups_to_schedule: Vec<PlanGroupId> = plan
.groups
.iter()
.filter(|group| {
!self.data.succeeded_groups.contains(&group.group_id)
&& !self.data.failed_groups.contains(&group.group_id)
})
.map(|group| group.group_id)
.collect();
if groups_to_schedule.is_empty() {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
let subprocedures = self.spawn_group_stub_procedures(&groups_to_schedule);
self.data.pending_groups = groups_to_schedule;
self.data.state = RepartitionState::CollectSubprocedures;
Ok(Status::suspended(subprocedures, true))
}
async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> Result<Status> {
self.data
.succeeded_groups
.extend(self.data.pending_groups.drain(..));
self.data.state = RepartitionState::Finalize;
Ok(Status::executing(true))
}
async fn on_finalize(&mut self) -> Result<Status> {
self.data.summary = Some(RepartitionSummary {
succeeded_groups: self.data.succeeded_groups.clone(),
failed_groups: self.data.failed_groups.clone(),
});
self.data.state = RepartitionState::Finished;
Ok(Status::done())
}
fn generate_plan_stub(&self) -> Result<(RepartitionPlan, PartitionRuleDiff, ResourceDemand)> {
let mut plan = RepartitionPlan::empty(self.data.task.table_id);
let diff = PartitionRuleDiff::default();
if !self.data.task.new_rule_payload.is_empty() {
let group_id = Uuid::new_v4();
plan.groups.push(PlanGroup::new(group_id));
}
let mut demand = ResourceDemand::default();
demand.new_regions = plan.groups.len() as u32;
plan.resource_demand = demand;
Ok((plan, diff, demand))
}
async fn allocate_resources_stub(&self, _demand: ResourceDemand) -> Result<bool> {
Ok(true)
}
fn spawn_group_stub_procedures(&self, groups: &[PlanGroupId]) -> Vec<ProcedureWithId> {
groups
.iter()
.map(|group_id| {
ProcedureWithId::with_random_id(Box::new(RepartitionGroupStubProcedure::new(
*group_id,
)))
})
.collect()
}
fn table_lock_key(&self) -> Vec<common_procedure::StringKey> {
let mut lock_key = Vec::with_capacity(3);
let table_ref = self.data.task.table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.data.task.table_id).into());
lock_key
}
}
#[async_trait::async_trait]
impl Procedure for RepartitionProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = self.data.state;
let status = match state {
RepartitionState::Prepare => self.on_prepare().await,
RepartitionState::AllocateResources => self.on_allocate_resources().await,
RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await,
RepartitionState::CollectSubprocedures => {
self.on_collect_subprocedures(ctx).await
}
RepartitionState::Finalize => self.on_finalize().await,
RepartitionState::Finished => Ok(Status::done()),
};
status.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
LockKey::new(self.table_lock_key())
}
}
/// Serialized data of the repartition procedure.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RepartitionData {
state: RepartitionState,
task: RepartitionTask,
#[serde(default)]
plan: Option<RepartitionPlan>,
#[serde(default)]
rule_diff: Option<PartitionRuleDiff>,
#[serde(default)]
resource_demand: Option<ResourceDemand>,
#[serde(default)]
resource_allocated: bool,
#[serde(default)]
pending_groups: Vec<PlanGroupId>,
#[serde(default)]
succeeded_groups: Vec<PlanGroupId>,
#[serde(default)]
failed_groups: Vec<PlanGroupId>,
#[serde(default)]
summary: Option<RepartitionSummary>,
}
impl RepartitionData {
fn new(task: RepartitionTask) -> Self {
Self {
state: RepartitionState::Prepare,
task,
plan: None,
rule_diff: None,
resource_demand: None,
resource_allocated: false,
pending_groups: Vec::new(),
succeeded_groups: Vec::new(),
failed_groups: Vec::new(),
summary: None,
}
}
}
/// High level states of the repartition procedure.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)]
enum RepartitionState {
/// Validates request parameters and generates deterministic plan.
Prepare,
/// Coordinates with the resource provider (PaaS) for region allocation.
AllocateResources,
/// Submits work items to sub-procedures.
DispatchSubprocedures,
/// Collects and inspects sub-procedure results.
CollectSubprocedures,
/// Finalises metadata and emits summary.
Finalize,
/// Terminal state.
Finished,
}
/// Task payload passed from the DDL entry point.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepartitionTask {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
/// Serialized representation of the new partition rule.
///
/// The actual format will be determined later; currently we keep the raw
/// payload so that the procedure can be wired end-to-end.
pub new_rule_payload: Vec<u8>,
}
impl RepartitionTask {
fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.table_name,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RepartitionSummary {
succeeded_groups: Vec<PlanGroupId>,
failed_groups: Vec<PlanGroupId>,
}
struct RepartitionGroupStubProcedure {
group_id: PlanGroupId,
}
impl RepartitionGroupStubProcedure {
fn new(group_id: PlanGroupId) -> Self {
Self { group_id }
}
}
#[async_trait::async_trait]
impl Procedure for RepartitionGroupStubProcedure {
fn type_name(&self) -> &str {
"metasrv-procedure::RepartitionGroupStub"
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
Ok(Status::done())
}
fn dump(&self) -> ProcedureResult<String> {
Ok(self.group_id.to_string())
}
fn lock_key(&self) -> LockKey {
LockKey::default()
}
}

View File

@@ -31,6 +31,7 @@ use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::create_view::CreateViewProcedure;
use crate::ddl::repartition::{RepartitionProcedure, RepartitionTask};
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::drop_flow::DropFlowProcedure;
use crate::ddl::drop_table::DropTableProcedure;
@@ -181,7 +182,8 @@ impl DdlManager {
TruncateTableProcedure,
CreateDatabaseProcedure,
DropDatabaseProcedure,
DropViewProcedure
DropViewProcedure,
RepartitionProcedure
);
for (type_name, loader_factory) in loaders {
@@ -210,6 +212,19 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a repartition task.
#[tracing::instrument(skip_all)]
pub async fn submit_repartition_task(
&self,
repartition_task: RepartitionTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = RepartitionProcedure::new(repartition_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a create table task.
#[tracing::instrument(skip_all)]
pub async fn submit_create_table_task(