Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-10-13 03:40:18 +00:00
parent d0877997a2
commit 15935ee89a
4 changed files with 15 additions and 19 deletions

View File

@@ -43,8 +43,8 @@ pub mod drop_flow;
pub mod drop_table;
pub mod drop_view;
pub mod flow_meta;
pub mod table_meta;
pub mod repartition;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]

View File

@@ -101,15 +101,9 @@ impl PartitionRuleDiff {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum PartitionChange {
/// Split one region into multiple target regions.
Split {
from: RegionId,
to: Vec<RegionId>,
},
Split { from: RegionId, to: Vec<RegionId> },
/// Merge multiple regions into one target region.
Merge {
from: Vec<RegionId>,
to: RegionId,
},
Merge { from: Vec<RegionId>, to: RegionId },
/// No-op placeholder for future operations (e.g. rule rewrite).
Unsupported,
}

View File

@@ -40,9 +40,11 @@ use strum::AsRefStr;
use table::table_reference::TableReference;
use uuid::Uuid;
use crate::ddl::repartition::{PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand};
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::DdlContext;
use crate::ddl::repartition::{
PartitionRuleDiff, PlanGroup, PlanGroupId, RepartitionPlan, ResourceDemand,
};
use crate::ddl::utils::map_to_procedure_error;
use crate::error::Result;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
@@ -138,7 +140,7 @@ impl RepartitionProcedure {
async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> Result<Status> {
self.data
.succeeded_groups
.extend(self.data.pending_groups.drain(..));
.append(&mut self.data.pending_groups);
self.data.state = RepartitionState::Finalize;
Ok(Status::executing(true))
@@ -162,8 +164,10 @@ impl RepartitionProcedure {
plan.groups.push(PlanGroup::new(group_id));
}
let mut demand = ResourceDemand::default();
demand.new_regions = plan.groups.len() as u32;
let demand = ResourceDemand {
new_regions: plan.groups.len() as u32,
..Default::default()
};
plan.resource_demand = demand;
Ok((plan, diff, demand))
@@ -207,9 +211,7 @@ impl Procedure for RepartitionProcedure {
RepartitionState::Prepare => self.on_prepare().await,
RepartitionState::AllocateResources => self.on_allocate_resources().await,
RepartitionState::DispatchSubprocedures => self.on_dispatch_subprocedures().await,
RepartitionState::CollectSubprocedures => {
self.on_collect_subprocedures(ctx).await
}
RepartitionState::CollectSubprocedures => self.on_collect_subprocedures(ctx).await,
RepartitionState::Finalize => self.on_finalize().await,
RepartitionState::Finished => Ok(Status::done()),
};
@@ -298,7 +300,7 @@ pub struct RepartitionTask {
}
impl RepartitionTask {
fn table_ref(&self) -> TableReference {
fn table_ref(&self) -> TableReference<'_> {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,

View File

@@ -31,11 +31,11 @@ use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::create_view::CreateViewProcedure;
use crate::ddl::repartition::{RepartitionProcedure, RepartitionTask};
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::drop_flow::DropFlowProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::drop_view::DropViewProcedure;
use crate::ddl::repartition::{RepartitionProcedure, RepartitionTask};
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{DdlContext, utils};
use crate::error::{