feat: add group procedure

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-10-23 10:19:06 +00:00
parent 5be4987fd7
commit 3c1d7fcb89
12 changed files with 957 additions and 508 deletions

1
Cargo.lock generated
View File

@@ -7426,6 +7426,7 @@ dependencies = [
"local-ip-address",
"once_cell",
"parking_lot 0.12.4",
"partition",
"prometheus",
"prost 0.13.5",
"rand 0.9.1",

View File

@@ -43,7 +43,6 @@ pub mod drop_flow;
pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
pub mod repartition;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -1,139 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, TableId};
use uuid::Uuid;
/// Identifier of a plan group.
pub type PlanGroupId = Uuid;
/// Logical description of the repartition plan.
///
/// The plan is persisted by the procedure framework so it must remain
/// serializable/deserializable across versions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RepartitionPlan {
/// Identifier of the physical table to repartition.
pub table_id: TableId,
/// Deterministic hash of the generated plan. Used for idempotence checks.
pub plan_hash: String,
/// Plan groups to execute. Each group is independent from others.
pub groups: Vec<PlanGroup>,
/// Aggregate resource expectation.
pub resource_demand: ResourceDemand,
}
impl RepartitionPlan {
/// Creates an empty plan. Primarily used in tests and early skeleton code.
pub fn empty(table_id: TableId) -> Self {
Self {
table_id,
plan_hash: String::new(),
groups: Vec::new(),
resource_demand: ResourceDemand::default(),
}
}
/// Returns `true` if the plan does not contain any work.
pub fn is_trivial(&self) -> bool {
self.groups.is_empty()
}
}
/// A group of repartition operations that can be executed as a sub-procedure.
///
/// Groups are designed to be independent so that failure in one group does not
/// propagate to others.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PlanGroup {
/// Stable identifier of the group.
pub group_id: PlanGroupId,
/// Regions that provide the data source.
pub source_regions: Vec<RegionId>,
/// Target regions that should exist after this group finishes.
pub target_regions: Vec<RegionId>,
/// Ordered list of logical changes required by this group.
pub changes: Vec<PartitionChange>,
/// Estimated resource demand contributed by this group.
pub resource_hint: ResourceDemand,
}
impl PlanGroup {
/// Convenience constructor for skeleton code and tests.
pub fn new(group_id: PlanGroupId) -> Self {
Self {
group_id,
source_regions: Vec::new(),
target_regions: Vec::new(),
changes: Vec::new(),
resource_hint: ResourceDemand::default(),
}
}
}
/// Diff between the old and the new partition rules.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct PartitionRuleDiff {
/// Ordered list of changes to transform the old rule into the new rule.
pub changes: Vec<PartitionChange>,
}
impl PartitionRuleDiff {
/// Returns `true` if there is no change between two rules.
pub fn is_empty(&self) -> bool {
self.changes.is_empty()
}
}
/// Primitive repartition changes recognised by the planner.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum PartitionChange {
/// Split one region into multiple target regions.
Split { from: RegionId, to: Vec<RegionId> },
/// Merge multiple regions into one target region.
Merge { from: Vec<RegionId>, to: RegionId },
/// No-op placeholder for future operations (e.g. rule rewrite).
Unsupported,
}
impl PartitionChange {
/// Returns the regions referenced by this change.
pub fn referenced_regions(&self) -> Vec<RegionId> {
match self {
PartitionChange::Split { from, to } => {
let mut regions = Vec::with_capacity(1 + to.len());
regions.push(*from);
regions.extend(to.iter().copied());
regions
}
PartitionChange::Merge { from, to } => {
let mut regions = Vec::with_capacity(from.len() + 1);
regions.extend(from.iter().copied());
regions.push(*to);
regions
}
PartitionChange::Unsupported => Vec::new(),
}
}
}
/// Resource estimation for executing a plan or a plan group.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct ResourceDemand {
/// Number of brand-new regions that must be allocated before execution.
pub new_regions: u32,
/// Rough estimate of data volume to rewrite (in bytes).
pub estimated_bytes: u64,
}

View File

@@ -1,345 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! High-level state machine for table repartitioning.
//!
//! The [`RepartitionProcedure`] coordinates the full flow described in the RFC:
//! 1. **Prepare** validate the request, lock the table, compute the rule diff,
//! and materialize a deterministic [`RepartitionPlan`].
//! 2. **AllocateResources** estimate [`ResourceDemand`] and talk to PaaS to
//! reserve the additional regions required by the plan.
//! 3. **DispatchSubprocedures** split the plan into `PlanGroup`s, spawn
//! sub-procedures that stop compaction/snapshot, enter the no-ingestion
//! window, update metadata, push region-rule versions, generate new manifests,
//! and stage/acknowledge versioned writes via `RegionEdit`.
//! 4. **Finalize** aggregate results, roll failed groups back (metadata,
//! manifests, staged data), restart unhealthy regions, unlock the table, and
//! trigger cache reload / optional compaction in the background.
//!
//! Each step is designed to be idempotent so the framework can retry safely.
//! This file currently contains the skeleton states and will be filled out with
//! concrete logic in follow-up patches.
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureWithId, Status};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::TableId;
use strum::AsRefStr;
use table::table_reference::TableReference;
use uuid::Uuid;
use crate::ddl::DdlContext;
use crate::ddl::repartition::{
PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand,
};
use crate::ddl::utils::map_to_procedure_error;
use crate::error::Result;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
/// Procedure that orchestrates the repartition flow.
pub struct RepartitionProcedure {
#[allow(dead_code)]
context: DdlContext,
data: RepartitionData,
}
impl RepartitionProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
pub fn new(task: RepartitionTask, context: DdlContext) -> Result<Self> {
Ok(Self {
context,
data: RepartitionData::new(task),
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: RepartitionData = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
async fn on_prepare(&mut self) -> Result<Status> {
if self.data.plan.is_none() {
let (mut plan, diff, demand) = self.generate_plan_stub()?;
plan.plan_hash = format!(
"{}:{}:{}",
self.data.task.table_id,
plan.groups.len(),
self.data.task.new_rule_payload.len()
);
self.data.resource_demand = Some(demand);
self.data.rule_diff = Some(diff);
self.data.plan = Some(plan);
}
self.data.state = RepartitionState::AllocateResources;
Ok(Status::executing(true))
}
async fn on_allocate_resources(&mut self) -> Result<Status> {
if !self.data.resource_allocated {
let demand = self.data.resource_demand.unwrap_or_default();
let allocated = self.allocate_resources_stub(demand).await?;
if !allocated {
if let Some(plan) = &self.data.plan {
self.data
.failed_groups
.extend(plan.groups.iter().map(|group| group.group_id));
}
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
self.data.resource_allocated = true;
}
self.data.state = RepartitionState::DispatchSubprocedures;
Ok(Status::executing(true))
}
async fn on_dispatch_subprocedures(&mut self) -> Result<Status> {
let Some(plan) = self.data.plan.as_ref() else {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
};
let groups_to_schedule: Vec<PlanGroupId> = plan
.groups
.iter()
.filter(|group| {
!self.data.succeeded_groups.contains(&group.group_id)
&& !self.data.failed_groups.contains(&group.group_id)
})
.map(|group| group.group_id)
.collect();
if groups_to_schedule.is_empty() {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
let subprocedures = self.spawn_group_stub_procedures(&groups_to_schedule);
self.data.pending_groups = groups_to_schedule;
self.data.state = RepartitionState::CollectSubprocedures;
Ok(Status::suspended(subprocedures, true))
}
async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> Result<Status> {
self.data
.succeeded_groups
.append(&mut self.data.pending_groups);
self.data.state = RepartitionState::Finalize;
Ok(Status::executing(true))
}
async fn on_finalize(&mut self) -> Result<Status> {
self.data.summary = Some(RepartitionSummary {
succeeded_groups: self.data.succeeded_groups.clone(),
failed_groups: self.data.failed_groups.clone(),
});
self.data.state = RepartitionState::Finished;
Ok(Status::done())
}
fn generate_plan_stub(&self) -> Result<(RepartitionPlan, PartitionRuleDiff, ResourceDemand)> {
let mut plan = RepartitionPlan::empty(self.data.task.table_id);
let diff = PartitionRuleDiff::default();
if !self.data.task.new_rule_payload.is_empty() {
let group_id = Uuid::new_v4();
plan.groups.push(PlanGroup::new(group_id));
}
let demand = ResourceDemand {
new_regions: plan.groups.len() as u32,
..Default::default()
};
plan.resource_demand = demand;
Ok((plan, diff, demand))
}
async fn allocate_resources_stub(&self, _demand: ResourceDemand) -> Result<bool> {
Ok(true)
}
fn spawn_group_stub_procedures(&self, groups: &[PlanGroupId]) -> Vec<ProcedureWithId> {
groups
.iter()
.map(|group_id| {
ProcedureWithId::with_random_id(Box::new(RepartitionGroupStubProcedure::new(
*group_id,
)))
})
.collect()
}
fn table_lock_key(&self) -> Vec<common_procedure::StringKey> {
let mut lock_key = Vec::with_capacity(3);
let table_ref = self.data.task.table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.data.task.table_id).into());
lock_key
}
}
#[async_trait::async_trait]
impl Procedure for RepartitionProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = self.data.state;
let status = match state {
RepartitionState::Prepare => self.on_prepare().await,
RepartitionState::AllocateResources => self.on_allocate_resources().await,
RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await,
RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await,
RepartitionState::Finalize => self.on_finalize().await,
RepartitionState::Finished => Ok(Status::done()),
};
status.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
LockKey::new(self.table_lock_key())
}
}
/// Serialized data of the repartition procedure.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RepartitionData {
state: RepartitionState,
task: RepartitionTask,
#[serde(default)]
plan: Option<RepartitionPlan>,
#[serde(default)]
rule_diff: Option<PartitionRuleDiff>,
#[serde(default)]
resource_demand: Option<ResourceDemand>,
#[serde(default)]
resource_allocated: bool,
#[serde(default)]
pending_groups: Vec<PlanGroupId>,
#[serde(default)]
succeeded_groups: Vec<PlanGroupId>,
#[serde(default)]
failed_groups: Vec<PlanGroupId>,
#[serde(default)]
summary: Option<RepartitionSummary>,
}
impl RepartitionData {
fn new(task: RepartitionTask) -> Self {
Self {
state: RepartitionState::Prepare,
task,
plan: None,
rule_diff: None,
resource_demand: None,
resource_allocated: false,
pending_groups: Vec::new(),
succeeded_groups: Vec::new(),
failed_groups: Vec::new(),
summary: None,
}
}
}
/// High level states of the repartition procedure.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)]
enum RepartitionState {
/// Validates request parameters and generates deterministic plan.
Prepare,
/// Coordinates with the resource provider (PaaS) for region allocation.
AllocateResources,
/// Submits work items to sub-procedures.
DispatchSubprocedures,
/// Collects and inspects sub-procedure results.
CollectSubprocedures,
/// Finalises metadata and emits summary.
Finalize,
/// Terminal state.
Finished,
}
/// Task payload passed from the DDL entry point.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepartitionTask {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
/// Serialized representation of the new partition rule.
///
/// The actual format will be determined later; currently we keep the raw
/// payload so that the procedure can be wired end-to-end.
pub new_rule_payload: Vec<u8>,
}
impl RepartitionTask {
fn table_ref(&self) -> TableReference<'_> {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.table_name,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RepartitionSummary {
succeeded_groups: Vec<PlanGroupId>,
failed_groups: Vec<PlanGroupId>,
}
struct RepartitionGroupStubProcedure {
group_id: PlanGroupId,
}
impl RepartitionGroupStubProcedure {
fn new(group_id: PlanGroupId) -> Self {
Self { group_id }
}
}
#[async_trait::async_trait]
impl Procedure for RepartitionGroupStubProcedure {
fn type_name(&self) -> &str {
"metasrv-procedure::RepartitionGroupStub"
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
Ok(Status::done())
}
fn dump(&self) -> ProcedureResult<String> {
Ok(self.group_id.to_string())
}
fn lock_key(&self) -> LockKey {
LockKey::default()
}
}

View File

@@ -35,7 +35,6 @@ use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::drop_flow::DropFlowProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::drop_view::DropViewProcedure;
use crate::ddl::repartition::{RepartitionProcedure, RepartitionTask};
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{DdlContext, utils};
use crate::error::{
@@ -182,8 +181,7 @@ impl DdlManager {
TruncateTableProcedure,
CreateDatabaseProcedure,
DropDatabaseProcedure,
DropViewProcedure,
RepartitionProcedure
DropViewProcedure
);
for (type_name, loader_factory) in loaders {
@@ -212,19 +210,6 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a repartition task.
#[tracing::instrument(skip_all)]
pub async fn submit_repartition_task(
&self,
repartition_task: RepartitionTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = RepartitionProcedure::new(repartition_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a create table task.
#[tracing::instrument(skip_all)]
pub async fn submit_create_table_task(

View File

@@ -44,6 +44,7 @@ common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
common-workload.workspace = true
partition.workspace = true
dashmap.workspace = true
datatypes.workspace = true
deadpool = { workspace = true, optional = true }

View File

@@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
pub mod region_migration;
pub mod repartition;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]

View File

@@ -0,0 +1,456 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod context;
mod group;
mod plan;
use common_catalog::format_full_table_name;
use common_meta::ddl::DdlContext;
use common_meta::ddl::utils::map_to_procedure_error;
use common_meta::error::{self, Result as MetaResult};
use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock};
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureWithId, Status};
use partition::expr::PartitionExpr;
use partition::subtask;
use serde::{Deserialize, Serialize};
use serde_json;
use snafu::ResultExt;
use store_api::storage::TableId;
use strum::AsRefStr;
use table::table_reference::TableReference;
use uuid::Uuid;
use self::context::RepartitionContext;
use self::group::RepartitionGroupProcedure;
use self::plan::{
PartitionRuleDiff, PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand,
};
/// Procedure that orchestrates the repartition flow.
pub struct RepartitionProcedure {
context: DdlContext,
group_context: RepartitionContext,
data: RepartitionData,
}
impl RepartitionProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
pub fn new(task: RepartitionTask, context: DdlContext) -> MetaResult<Self> {
let group_context = RepartitionContext::new(&context);
Ok(Self {
context,
group_context,
data: RepartitionData::new(task),
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: RepartitionData = serde_json::from_str(json).context(FromJsonSnafu)?;
let group_context = RepartitionContext::new(&context);
Ok(Self {
context,
group_context,
data,
})
}
async fn on_prepare(&mut self) -> MetaResult<Status> {
if self.data.plan.is_none() {
self.build_plan().await?;
}
self.data.state = RepartitionState::AllocateResources;
Ok(Status::executing(true))
}
async fn on_allocate_resources(&mut self) -> MetaResult<Status> {
if !self.data.resource_allocated {
let demand = self.data.resource_demand.unwrap_or_default();
let allocated = self.allocate_resources(demand).await?;
if !allocated {
if let Some(plan) = &self.data.plan {
self.data
.failed_groups
.extend(plan.entries.iter().map(|entry| entry.group_id));
}
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
self.data.resource_allocated = true;
}
self.data.state = RepartitionState::DispatchSubprocedures;
Ok(Status::executing(true))
}
async fn on_dispatch_subprocedures(&mut self) -> MetaResult<Status> {
let plan = match self.data.plan.as_ref() {
Some(plan) => plan,
None => {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
};
let groups_to_schedule: Vec<PlanGroupId> = plan
.entries
.iter()
.filter(|entry| {
!self.data.succeeded_groups.contains(&entry.group_id)
&& !self.data.failed_groups.contains(&entry.group_id)
})
.map(|entry| entry.group_id)
.collect();
if groups_to_schedule.is_empty() {
self.data.state = RepartitionState::Finalize;
return Ok(Status::executing(true));
}
let subprocedures = self.spawn_group_procedures(plan, &groups_to_schedule);
self.data.pending_groups = groups_to_schedule;
self.data.state = RepartitionState::CollectSubprocedures;
Ok(Status::suspended(subprocedures, true))
}
async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> MetaResult<Status> {
self.data
.succeeded_groups
.append(&mut self.data.pending_groups);
self.data.state = RepartitionState::Finalize;
Ok(Status::executing(true))
}
async fn on_finalize(&mut self) -> MetaResult<Status> {
self.data.summary = Some(RepartitionSummary {
succeeded_groups: self.data.succeeded_groups.clone(),
failed_groups: self.data.failed_groups.clone(),
});
self.data.state = RepartitionState::Finished;
Ok(Status::done())
}
async fn build_plan(&mut self) -> MetaResult<()> {
let table_id = self.data.task.table_id;
let table_ref = self.data.task.table_ref();
let table_name_str =
format_full_table_name(table_ref.catalog, table_ref.schema, table_ref.table);
self.context
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.ok_or_else(|| {
error::TableNotFoundSnafu {
table_name: table_name_str.clone(),
}
.build()
})?;
let (physical_table_id, physical_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await?;
let from_exprs_json = self.data.task.from_exprs_json.clone();
let into_exprs_json = self.data.task.into_exprs_json.clone();
let from_exprs = Self::deserialize_partition_exprs(&from_exprs_json)?;
let into_exprs = Self::deserialize_partition_exprs(&into_exprs_json)?;
let existing_regions = physical_route
.region_routes
.iter()
.map(|route| (route.region.id, route.region.partition_expr()))
.collect::<Vec<_>>();
let mut used_regions = vec![false; existing_regions.len()];
let mut source_descriptor_by_index = Vec::with_capacity(from_exprs_json.len());
for expr_json in &from_exprs_json {
let mut found = None;
for (idx, (region_id, expr)) in existing_regions.iter().enumerate() {
if !used_regions[idx] && expr == expr_json {
found = Some((*region_id, expr.clone()));
used_regions[idx] = true;
break;
}
}
let (region_id, partition_expr_json) = found.ok_or_else(|| {
error::UnsupportedSnafu {
operation: format!(
"repartition source expression '{}' does not match any existing region",
expr_json
),
}
.build()
})?;
source_descriptor_by_index.push(RegionDescriptor {
region_id: Some(region_id),
partition_expr_json,
});
}
let subtasks = subtask::create_subtasks(&from_exprs, &into_exprs).map_err(|err| {
error::UnsupportedSnafu {
operation: format!("create_subtasks failed: {err}"),
}
.build()
})?;
let mut plan = RepartitionPlan::empty(physical_table_id);
let mut diff = PartitionRuleDiff::default();
let mut demand = ResourceDemand::default();
for subtask in subtasks {
let group_id = Uuid::new_v4();
let sources = subtask
.from_expr_indices
.iter()
.map(|&idx| source_descriptor_by_index[idx].clone())
.collect::<Vec<_>>();
let targets = subtask
.to_expr_indices
.iter()
.enumerate()
.map(|(position, &idx)| {
let reused_region = if subtask.from_expr_indices.len() == 1 {
if position == 0 {
sources.get(0).and_then(|descriptor| descriptor.region_id)
} else {
None
}
} else if subtask.to_expr_indices.len() == 1 {
sources.first().and_then(|descriptor| descriptor.region_id)
} else {
sources
.get(position)
.and_then(|descriptor| descriptor.region_id)
};
RegionDescriptor {
region_id: reused_region,
partition_expr_json: into_exprs_json[idx].clone(),
}
})
.collect::<Vec<_>>();
let entry = PlanEntry::new(group_id, subtask, sources, targets);
demand.add_entry(&entry);
diff.entries.push(group_id);
plan.entries.push(entry);
}
plan.resource_demand = demand;
plan.route_snapshot = physical_route.clone();
plan.plan_hash = format!(
"{}:{}->{}",
physical_table_id,
Self::expr_signature(&from_exprs_json),
Self::expr_signature(&into_exprs_json)
);
self.data.plan = Some(plan);
self.data.rule_diff = Some(diff);
self.data.resource_demand = Some(demand);
Ok(())
}
async fn allocate_resources(&self, _demand: ResourceDemand) -> MetaResult<bool> {
Ok(true)
}
fn spawn_group_procedures(
&self,
plan: &RepartitionPlan,
group_ids: &[PlanGroupId],
) -> Vec<ProcedureWithId> {
group_ids
.iter()
.filter_map(|group_id| {
plan.entries
.iter()
.find(|entry| entry.group_id == *group_id)
.cloned()
})
.map(|entry| {
let route_snapshot = plan.route_snapshot.region_routes.clone();
ProcedureWithId::with_random_id(Box::new(RepartitionGroupProcedure::new(
entry,
plan.table_id,
route_snapshot,
self.group_context.clone(),
)))
})
.collect()
}
fn table_lock_key(&self) -> Vec<common_procedure::StringKey> {
let mut lock_key = Vec::with_capacity(3);
let table_ref = self.data.task.table_ref();
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.data.task.table_id).into());
lock_key
}
fn deserialize_partition_exprs(exprs_json: &[String]) -> MetaResult<Vec<PartitionExpr>> {
exprs_json
.iter()
.map(|json| {
let expr = PartitionExpr::from_json_str(json).map_err(|err| {
error::UnsupportedSnafu {
operation: format!(
"deserialize partition expression '{json}' failed: {err}"
),
}
.build()
})?;
expr.ok_or_else(|| {
error::UnsupportedSnafu {
operation: format!("empty partition expression '{json}'"),
}
.build()
})
})
.collect()
}
fn expr_signature(exprs: &[String]) -> String {
exprs.join("|")
}
}
#[async_trait::async_trait]
impl Procedure for RepartitionProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = self.data.state;
let status = match state {
RepartitionState::Prepare => self.on_prepare().await,
RepartitionState::AllocateResources => self.on_allocate_resources().await,
RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await,
RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await,
RepartitionState::Finalize => self.on_finalize().await,
RepartitionState::Finished => Ok(Status::done()),
};
status.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
LockKey::new(self.table_lock_key())
}
}
/// Serialized data of the repartition procedure.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RepartitionData {
state: RepartitionState,
task: RepartitionTask,
#[serde(default)]
plan: Option<RepartitionPlan>,
#[serde(default)]
rule_diff: Option<PartitionRuleDiff>,
#[serde(default)]
resource_demand: Option<ResourceDemand>,
#[serde(default)]
resource_allocated: bool,
#[serde(default)]
pending_groups: Vec<PlanGroupId>,
#[serde(default)]
succeeded_groups: Vec<PlanGroupId>,
#[serde(default)]
failed_groups: Vec<PlanGroupId>,
#[serde(default)]
summary: Option<RepartitionSummary>,
}
impl RepartitionData {
fn new(task: RepartitionTask) -> Self {
Self {
state: RepartitionState::Prepare,
task,
plan: None,
rule_diff: None,
resource_demand: None,
resource_allocated: false,
pending_groups: Vec::new(),
succeeded_groups: Vec::new(),
failed_groups: Vec::new(),
summary: None,
}
}
}
/// High level states of the repartition procedure.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)]
enum RepartitionState {
Prepare,
AllocateResources,
DispatchSubprocedures,
CollectSubprocedures,
Finalize,
Finished,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RepartitionSummary {
succeeded_groups: Vec<PlanGroupId>,
failed_groups: Vec<PlanGroupId>,
}
/// Task payload passed from the DDL entry point.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepartitionTask {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
/// Partition expressions of source regions (JSON encoded `PartitionExpr`).
pub from_exprs_json: Vec<String>,
/// Partition expressions of target regions (JSON encoded `PartitionExpr`).
pub into_exprs_json: Vec<String>,
}
impl RepartitionTask {
fn table_ref(&self) -> TableReference<'_> {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.table_name,
}
}
}

View File

@@ -12,10 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod plan;
pub mod procedure;
use common_meta::ddl::DdlContext;
use common_meta::key::TableMetadataManagerRef;
use common_meta::node_manager::NodeManagerRef;
pub use plan::{
PartitionChange, PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand,
};
pub use procedure::{RepartitionProcedure, RepartitionTask};
#[derive(Clone)]
pub struct RepartitionContext {
pub table_metadata_manager: TableMetadataManagerRef,
pub _node_manager: NodeManagerRef,
}
impl RepartitionContext {
pub fn new(context: &DdlContext) -> Self {
Self {
table_metadata_manager: context.table_metadata_manager.clone(),
_node_manager: context.node_manager.clone(),
}
}
}

View File

@@ -0,0 +1,377 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use common_meta::ddl::utils::map_to_procedure_error;
use common_meta::error::{self, Result as MetaResult};
use common_meta::rpc::router::RegionRoute;
use common_procedure::error::{Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use strum::AsRefStr;
use crate::procedure::repartition::context::RepartitionContext;
use crate::procedure::repartition::plan::PlanEntry;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)]
pub enum GroupState {
Prepare,
Freeze,
UpdateMetadata,
UpdateRegionRule,
UpdateManifests,
Confirm,
Cleanup,
Finished,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct GroupProcedureData {
table_id: TableId,
entry: PlanEntry,
state: GroupState,
route_snapshot: Vec<RegionRoute>,
#[serde(default)]
prepare_result: Option<GroupPrepareResult>,
#[serde(default)]
freeze_completed: bool,
#[serde(default)]
metadata_updated: bool,
#[serde(default)]
staged_regions: Vec<RegionId>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct GroupPrepareResult {
source_routes: Vec<RegionRoute>,
target_routes: Vec<Option<RegionRoute>>,
central_region: u64,
}
pub struct RepartitionGroupProcedure {
context: RepartitionContext,
data: GroupProcedureData,
}
impl RepartitionGroupProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
pub fn new(
entry: PlanEntry,
table_id: TableId,
route_snapshot: Vec<RegionRoute>,
context: RepartitionContext,
) -> Self {
Self {
context,
data: GroupProcedureData {
table_id,
entry,
state: GroupState::Prepare,
route_snapshot,
prepare_result: None,
freeze_completed: false,
metadata_updated: false,
staged_regions: Vec::new(),
},
}
}
pub async fn step(&mut self) -> MetaResult<Status> {
match self.data.state {
GroupState::Prepare => {
self.on_prepare().await?;
self.data.state = GroupState::Freeze;
Ok(Status::executing(true))
}
GroupState::Freeze => {
self.on_freeze().await?;
self.data.state = GroupState::UpdateMetadata;
Ok(Status::executing(true))
}
GroupState::UpdateMetadata => {
self.on_update_metadata().await?;
self.data.state = GroupState::UpdateRegionRule;
Ok(Status::executing(true))
}
GroupState::UpdateRegionRule => {
self.on_update_region_rule().await?;
self.data.state = GroupState::UpdateManifests;
Ok(Status::executing(true))
}
GroupState::UpdateManifests => {
self.on_update_manifests().await?;
self.data.state = GroupState::Confirm;
Ok(Status::executing(true))
}
GroupState::Confirm => {
self.on_confirm().await?;
self.data.state = GroupState::Cleanup;
Ok(Status::executing(true))
}
GroupState::Cleanup => {
self.on_cleanup().await?;
self.data.state = GroupState::Finished;
Ok(Status::done())
}
GroupState::Finished => Ok(Status::done()),
}
}
async fn on_prepare(&mut self) -> MetaResult<()> {
if self.data.prepare_result.is_some() {
return Ok(());
}
let (_, latest_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(self.data.table_id)
.await?;
self.data.route_snapshot = latest_route.region_routes.clone();
let mut source_routes = Vec::with_capacity(self.data.entry.sources.len());
for descriptor in &self.data.entry.sources {
let Some(region_id) = descriptor.region_id else {
return Err(error::UnsupportedSnafu {
operation: format!("group {} lacks source region id", self.data.entry.group_id),
}
.build());
};
let Some(route) = latest_route
.region_routes
.iter()
.find(|route| route.region.id == region_id)
else {
return Err(error::UnsupportedSnafu {
operation: format!(
"group {} source region {region_id} missing from route snapshot",
self.data.entry.group_id
),
}
.build());
};
source_routes.push(route.clone());
}
if source_routes.is_empty() {
return Err(error::UnsupportedSnafu {
operation: format!(
"group {} has no source regions after planning",
self.data.entry.group_id
),
}
.build());
}
let mut target_routes = Vec::with_capacity(self.data.entry.targets.len());
for descriptor in &self.data.entry.targets {
if let Some(region_id) = descriptor.region_id {
let Some(route) = latest_route
.region_routes
.iter()
.find(|route| route.region.id == region_id)
else {
return Err(error::UnsupportedSnafu {
operation: format!(
"group {} target region {region_id} missing from route snapshot",
self.data.entry.group_id
),
}
.build());
};
target_routes.push(Some(route.clone()));
} else {
target_routes.push(None);
}
}
let central_region = source_routes[0].region.id.as_u64();
self.data.prepare_result = Some(GroupPrepareResult {
source_routes,
target_routes,
central_region,
});
Ok(())
}
async fn on_freeze(&mut self) -> MetaResult<()> {
if self.data.freeze_completed {
return Ok(());
}
let prepare_result = self.prepare_result()?;
for route in &prepare_result.source_routes {
self.pause_region(route).await?;
}
for route in prepare_result.target_routes.iter().flatten() {
self.pause_region(route).await?;
}
self.data.freeze_completed = true;
Ok(())
}
async fn on_update_metadata(&mut self) -> MetaResult<()> {
if self.data.metadata_updated {
return Ok(());
}
let prepare_result = self.prepare_result()?;
let region_ids = self.collect_existing_region_ids(prepare_result);
if region_ids.is_empty() {
self.data.metadata_updated = true;
return Ok(());
}
let mut staged_set: BTreeSet<RegionId> = self.data.staged_regions.iter().copied().collect();
let route_manager = self.context.table_metadata_manager.table_route_manager();
for region_id in &region_ids {
if staged_set.insert(*region_id) {
route_manager
.set_region_staging_state(*region_id, true)
.await?;
}
}
Self::mark_regions_staging(&mut self.data.route_snapshot, &region_ids);
self.data.staged_regions = staged_set.into_iter().collect();
self.data.metadata_updated = true;
Ok(())
}
async fn on_update_region_rule(&mut self) -> MetaResult<()> {
// TODO: Push new region rules to datanodes and reject outdated writes.
Ok(())
}
async fn on_update_manifests(&mut self) -> MetaResult<()> {
// TODO: Generate and submit new manifests for target regions.
Ok(())
}
async fn on_confirm(&mut self) -> MetaResult<()> {
// TODO: Confirm staged writes, resume ingestion, and record success.
Ok(())
}
async fn on_cleanup(&mut self) -> MetaResult<()> {
let prepare_result = self.prepare_result()?;
for route in &prepare_result.source_routes {
self.resume_region(route).await?;
}
for route in prepare_result.target_routes.iter().flatten() {
self.resume_region(route).await?;
}
if !self.data.staged_regions.is_empty() {
let route_manager = self.context.table_metadata_manager.table_route_manager();
for region_id in &self.data.staged_regions {
route_manager
.set_region_staging_state(*region_id, false)
.await?;
}
Self::clear_regions_staging(&mut self.data.route_snapshot, &self.data.staged_regions);
self.data.staged_regions.clear();
self.data.metadata_updated = false;
}
self.data.freeze_completed = false;
Ok(())
}
fn prepare_result(&self) -> MetaResult<&GroupPrepareResult> {
self.data.prepare_result.as_ref().ok_or_else(|| {
error::UnsupportedSnafu {
operation: format!(
"group {} is missing prepare context",
self.data.entry.group_id
),
}
.build()
})
}
async fn pause_region(&self, _route: &RegionRoute) -> MetaResult<()> {
// TODO: invoke datanode RPC to pause compaction and snapshot for the region.
Ok(())
}
async fn resume_region(&self, _route: &RegionRoute) -> MetaResult<()> {
// TODO: invoke datanode RPC to resume compaction and snapshot for the region.
Ok(())
}
fn mark_regions_staging(routes: &mut [RegionRoute], region_ids: &[RegionId]) {
for region_id in region_ids.iter().copied() {
if let Some(route) = routes.iter_mut().find(|route| route.region.id == region_id) {
route.set_leader_staging();
}
}
}
fn clear_regions_staging(routes: &mut [RegionRoute], region_ids: &[RegionId]) {
for region_id in region_ids.iter().copied() {
if let Some(route) = routes.iter_mut().find(|route| route.region.id == region_id) {
route.clear_leader_staging();
}
}
}
fn collect_existing_region_ids(&self, prepare_result: &GroupPrepareResult) -> Vec<RegionId> {
let mut set = BTreeSet::new();
for route in &prepare_result.source_routes {
set.insert(route.region.id);
}
for route in prepare_result.target_routes.iter().flatten() {
set.insert(route.region.id);
}
set.into_iter().collect()
}
}
#[async_trait::async_trait]
impl Procedure for RepartitionGroupProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let status = self.step().await.map_err(map_to_procedure_error)?;
Ok(status)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
LockKey::default()
}
}

View File

@@ -0,0 +1,100 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::key::table_route::PhysicalTableRouteValue;
use partition::subtask::RepartitionSubtask;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, TableId};
use uuid::Uuid;
/// Identifier of a plan group.
pub type PlanGroupId = Uuid;
/// Logical description of the repartition plan.
///
/// The plan is persisted by the procedure framework so it must remain
/// serializable/deserializable across versions.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RepartitionPlan {
pub table_id: TableId,
pub plan_hash: String,
pub entries: Vec<PlanEntry>,
pub resource_demand: ResourceDemand,
pub route_snapshot: PhysicalTableRouteValue,
}
impl RepartitionPlan {
pub fn empty(table_id: TableId) -> Self {
Self {
table_id,
plan_hash: String::new(),
entries: Vec::new(),
resource_demand: ResourceDemand::default(),
route_snapshot: PhysicalTableRouteValue::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PlanEntry {
pub group_id: PlanGroupId,
pub subtask: RepartitionSubtask,
pub sources: Vec<RegionDescriptor>,
pub targets: Vec<RegionDescriptor>,
}
impl PlanEntry {
pub fn new(
group_id: PlanGroupId,
subtask: RepartitionSubtask,
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
) -> Self {
Self {
group_id,
subtask,
sources,
targets,
}
}
pub fn implied_new_regions(&self) -> u32 {
self.targets
.iter()
.filter(|target| target.region_id.is_none())
.count() as u32
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionDescriptor {
pub region_id: Option<RegionId>,
pub partition_expr_json: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct PartitionRuleDiff {
pub entries: Vec<PlanGroupId>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct ResourceDemand {
pub new_regions: u32,
}
impl ResourceDemand {
pub fn add_entry(&mut self, entry: &PlanEntry) {
self.new_regions = self.new_regions.saturating_add(entry.implied_new_regions());
}
}

View File

@@ -14,12 +14,14 @@
use std::collections::VecDeque;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::expr::PartitionExpr;
use crate::overlap::associate_from_to;
/// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RepartitionSubtask {
pub from_expr_indices: Vec<usize>,
pub to_expr_indices: Vec<usize>,