From 72a6b9ff66fcc5722ed5ba44544f749f057cccca Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 28 Oct 2025 04:29:57 +0000 Subject: [PATCH] wip Signed-off-by: Zhenchi --- Cargo.lock | 10 +- Cargo.toml | 3 +- src/datanode/Cargo.toml | 1 + src/datanode/src/error.rs | 10 + src/datanode/src/region_server.rs | 162 ++- src/meta-srv/src/error.rs | 149 +- src/meta-srv/src/procedure/repartition.rs | 528 ++++---- .../src/procedure/repartition/context.rs | 323 ++++- .../src/procedure/repartition/group.rs | 1203 +++++++++++++++-- .../src/procedure/repartition/plan.rs | 43 +- src/store-api/src/region_request.rs | 24 + 11 files changed, 2094 insertions(+), 362 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da0548704c..4250632b5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3934,6 +3934,7 @@ dependencies = [ "mito2", "num_cpus", "object-store", + "partition", "prometheus", "prost 0.13.5", "query", @@ -5324,7 +5325,6 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=14b9dc40bdc8288742b0cefc7bb024303b7429ef#14b9dc40bdc8288742b0cefc7bb024303b7429ef" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", @@ -6922,7 +6922,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -9860,7 +9860,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -9906,7 +9906,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.106", @@ -14511,7 +14511,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ebafce51ba..be7ecab8a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,8 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" } +# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "14b9dc40bdc8288742b0cefc7bb024303b7429ef" } +greptime-proto = { path = "../greptime-proto" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 265ede339e..8cd67f44bd 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -53,6 +53,7 @@ metric-engine.workspace = true mito2.workspace = true num_cpus.workspace = true object-store.workspace = true +partition.workspace = true prometheus.workspace = true prost.workspace = true query.workspace = true diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a2e6f674e2..da26ce018f 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -19,6 +19,7 @@ use common_error::define_into_tonic_status; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use mito2::remap_manifest::Error as RemapManifestError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::error::Error as TableError; @@ -396,6 +397,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to remap manifests: {}", source))] + RemapManifest { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: RemapManifestError, + }, + #[snafu(display("Not yet implemented: {what}"))] NotYetImplemented { what: String }, } @@ -469,6 +478,7 @@ impl ErrorExt for Error { ObjectStore { source, .. } => source.status_code(), BuildCacheStore { .. } => StatusCode::StorageUnavailable, + RemapManifest { .. } => StatusCode::Unexpected, } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 341ee9442c..d2df5a4793 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -24,7 +24,9 @@ use api::region::RegionResponse; use api::v1::meta::TopicStat; use api::v1::region::sync_request::ManifestInfo; use api::v1::region::{ - ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request, + ApplyStagedManifestRequest, ListMetadataRequest, PauseRequest, PublishRegionRuleRequest, + RegionResponse as RegionResponseV1, RemapManifestRequest, ResumeRequest, + StageRegionRuleRequest, SyncRequest, region_request, }; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; @@ -84,6 +86,8 @@ use crate::error::{ use crate::event_listener::RegionServerEventListenerRef; use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder}; +const REMAP_STATS_EXTENSION_KEY: &str = "repartition.manifest.stats"; + #[derive(Clone)] pub struct RegionServer { inner: Arc, @@ -370,6 +374,24 @@ impl RegionServer { } } + /// Temporarily pauses compaction and snapshot related activities for the region. + /// + /// Currently a stub; real implementation will coordinate with region worker. + pub async fn pause_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> { + info!("pause_compaction_and_snapshot stub invoked for region {region_id}"); + let _ = region_id; + Ok(()) + } + + /// Resumes compaction and snapshot related activities for the region. + /// + /// Currently a stub; real implementation will coordinate with region worker. + pub async fn resume_compaction_and_snapshot(&self, region_id: RegionId) -> Result<()> { + info!("resume_compaction_and_snapshot stub invoked for region {region_id}"); + let _ = region_id; + Ok(()) + } + /// Stop the region server. pub async fn stop(&self) -> Result<()> { self.inner.stop().await @@ -538,6 +560,124 @@ impl RegionServer { Ok(response) } + async fn handle_pause_region_request(&self, request: &PauseRequest) -> Result { + let region_id = RegionId::from_u64(request.region_id); + let tracing_context = TracingContext::from_current_span(); + let span = tracing_context.attach(info_span!( + "RegionServer::handle_pause_region_request", + region_id = region_id.to_string() + )); + + self.pause_compaction_and_snapshot(region_id) + .trace(span) + .await + .map(|_| RegionResponse::new(AffectedRows::default())) + } + + async fn handle_resume_region_request( + &self, + request: &ResumeRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + let tracing_context = TracingContext::from_current_span(); + let span = tracing_context.attach(info_span!( + "RegionServer::handle_resume_region_request", + region_id = region_id.to_string() + )); + + self.resume_compaction_and_snapshot(region_id) + .trace(span) + .await + .map(|_| RegionResponse::new(AffectedRows::default())) + } + + async fn handle_stage_region_rule_request( + &self, + request: &StageRegionRuleRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + info!( + "Stage region rule for region {region_id} with version {}", + request.rule_version + ); + match self + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader) + .await? + { + SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => { + Ok(RegionResponse::new(AffectedRows::default())) + } + SetRegionRoleStateResponse::InvalidTransition(err) => { + Err(err).with_context(|_| HandleRegionRequestSnafu { region_id }) + } + } + } + + async fn handle_publish_region_rule_request( + &self, + request: &PublishRegionRuleRequest, + ) -> Result { + let region_id = RegionId::from_u64(request.region_id); + info!( + "Publish region rule for region {region_id} with version {}", + request.rule_version + ); + match self + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader) + .await? + { + SetRegionRoleStateResponse::Success(_) | SetRegionRoleStateResponse::NotFound => { + Ok(RegionResponse::new(AffectedRows::default())) + } + SetRegionRoleStateResponse::InvalidTransition(err) => { + Err(err).with_context(|_| HandleRegionRequestSnafu { region_id }) + } + } + } + + async fn handle_remap_manifest_request( + &self, + request: &RemapManifestRequest, + ) -> Result { + info!( + "received remap manifest request for table {} group {}", + request.table_id, request.group_id + ); + + let stats_json = serde_json::to_vec(&serde_json::json!({ + "files_per_region": HashMap::::new(), + "total_file_refs": 0u64, + "empty_regions": Vec::::new(), + "group_id": &request.group_id, + })) + .context(SerializeJsonSnafu)?; + + let mut extensions = HashMap::new(); + extensions.insert(REMAP_STATS_EXTENSION_KEY.to_string(), stats_json); + + Ok(RegionResponse { + affected_rows: 0, + extensions, + metadata: Vec::new(), + }) + } + + async fn handle_apply_staged_manifest_request( + &self, + request: &ApplyStagedManifestRequest, + ) -> Result { + info!( + "received manifest apply request for table {} group {} publish={} regions {:?}", + request.table_id, request.group_id, request.publish, request.region_ids + ); + + Ok(RegionResponse { + affected_rows: 0, + extensions: HashMap::new(), + metadata: Vec::new(), + }) + } + /// Sync region manifest and registers new opened logical regions. pub async fn sync_region( &self, @@ -569,6 +709,26 @@ impl RegionServerHandler for RegionServer { region_request::Body::Sync(sync_request) => { self.handle_sync_region_request(sync_request).await } + region_request::Body::Pause(pause_request) => { + self.handle_pause_region_request(pause_request).await + } + region_request::Body::Resume(resume_request) => { + self.handle_resume_region_request(resume_request).await + } + region_request::Body::StageRegionRule(stage_request) => { + self.handle_stage_region_rule_request(stage_request).await + } + region_request::Body::PublishRegionRule(publish_request) => { + self.handle_publish_region_rule_request(publish_request) + .await + } + region_request::Body::RemapManifest(remap_request) => { + self.handle_remap_manifest_request(remap_request).await + } + region_request::Body::ApplyStagedManifest(apply_request) => { + self.handle_apply_staged_manifest_request(apply_request) + .await + } region_request::Body::ListMetadata(list_metadata_request) => { self.handle_list_metadata_request(list_metadata_request) .await diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 2f4756c2ae..dfdcb79d60 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -17,12 +17,14 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_meta::DatanodeId; +use common_procedure::ProcedureId; use common_runtime::JoinError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; +use uuid::Uuid; use crate::metasrv::SelectTarget; use crate::pubsub::Message; @@ -774,6 +776,129 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to create repartition subtasks"))] + RepartitionCreateSubtasks { + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to serialize partition expression"))] + RepartitionSerializePartitionExpr { + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Source partition expression '{}' does not match any existing region", + expr + ))] + RepartitionSourceExprMismatch { + expr: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Repartition group {} is missing a source region id", group_id))] + RepartitionMissingSourceRegionId { + group_id: Uuid, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} cannot find route for source region {}", + group_id, + region_id + ))] + RepartitionSourceRegionRouteMissing { + group_id: Uuid, + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Repartition group {} has no source regions after planning", group_id))] + RepartitionNoSourceRegions { + group_id: Uuid, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} target {} is missing an allocated region id", + group_id, + target_index + ))] + RepartitionMissingTargetRegionId { + group_id: Uuid, + target_index: usize, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Route for target region {} not found", region_id))] + RepartitionTargetRegionRouteMissing { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Repartition group {} is missing prepare context", group_id))] + RepartitionMissingPrepareContext { + group_id: Uuid, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Repartition group {} has no registered subprocedure", group_id))] + RepartitionSubprocedureUnknown { + group_id: Uuid, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Failed to fetch state for repartition group {} subprocedure {}", + group_id, + procedure_id + ))] + RepartitionSubprocedureStateFetch { + group_id: Uuid, + procedure_id: ProcedureId, + #[snafu(source)] + source: common_procedure::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} subprocedure {} state missing", + group_id, + procedure_id + ))] + RepartitionSubprocedureStateMissing { + group_id: Uuid, + procedure_id: ProcedureId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} subprocedure {} failed: {}", + group_id, + procedure_id, + reason + ))] + RepartitionSubprocedureFailed { + group_id: Uuid, + procedure_id: ProcedureId, + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unsupported operation {}", operation))] Unsupported { operation: String, @@ -997,6 +1122,11 @@ impl Error { matches!(self, Error::RetryLater { .. }) || matches!(self, Error::RetryLaterWithSource { .. }) } + + /// Returns `true` if the error requires cleaning poison records. + pub fn need_clean_poisons(&self) -> bool { + false + } } pub type Result = std::result::Result; @@ -1012,6 +1142,8 @@ impl ErrorExt for Error { | Error::TcpBind { .. } | Error::SerializeToJson { .. } | Error::DeserializeFromJson { .. } + | Error::RepartitionCreateSubtasks { .. } + | Error::RepartitionSerializePartitionExpr { .. } | Error::NoLeader { .. } | Error::LeaderLeaseExpired { .. } | Error::LeaderLeaseChanged { .. } @@ -1032,7 +1164,8 @@ impl ErrorExt for Error { | Error::FlowStateHandler { .. } | Error::BuildWalOptionsAllocator { .. } | Error::BuildPartitionClient { .. } - | Error::BuildKafkaClient { .. } => StatusCode::Internal, + | Error::BuildKafkaClient { .. } + | Error::RepartitionSubprocedureStateFetch { .. } => StatusCode::Internal, Error::DeleteRecords { .. } | Error::GetOffset { .. } @@ -1066,7 +1199,14 @@ impl ErrorExt for Error { | Error::TooManyPartitions { .. } | Error::TomlFormat { .. } | Error::HandlerNotFound { .. } - | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments, + | Error::LeaderPeerChanged { .. } + | Error::RepartitionSourceExprMismatch { .. } + | Error::RepartitionMissingSourceRegionId { .. } + | Error::RepartitionSourceRegionRouteMissing { .. } + | Error::RepartitionNoSourceRegions { .. } + | Error::RepartitionMissingTargetRegionId { .. } + | Error::RepartitionTargetRegionRouteMissing { .. } + | Error::RepartitionMissingPrepareContext { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } | Error::InvalidRegionKeyFromUtf8 { .. } @@ -1080,7 +1220,10 @@ impl ErrorExt for Error { | Error::RegionRouteNotFound { .. } | Error::MigrationAbort { .. } | Error::MigrationRunning { .. } - | Error::RegionMigrated { .. } => StatusCode::Unexpected, + | Error::RegionMigrated { .. } + | Error::RepartitionSubprocedureUnknown { .. } + | Error::RepartitionSubprocedureStateMissing { .. } + | Error::RepartitionSubprocedureFailed { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::SaveClusterInfo { source, .. } | Error::InvalidClusterInfoFormat { source, .. } diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index cc666d1a8b..ee4daa66e7 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -16,28 +16,41 @@ mod context; mod group; mod plan; -use common_catalog::format_full_table_name; +use std::collections::HashMap; + use common_meta::ddl::DdlContext; -use common_meta::ddl::utils::map_to_procedure_error; -use common_meta::error::{self, Result as MetaResult}; +use common_meta::key::table_route::PhysicalTableRouteValue; use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock}; -use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; -use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureWithId, Status}; +use common_procedure::error::{Error as ProcedureError, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, ProcedureId, ProcedureWithId, Status, +}; +use common_telemetry::error; use partition::expr::PartitionExpr; -use partition::subtask; +use partition::subtask::{self, RepartitionSubtask}; use serde::{Deserialize, Serialize}; -use serde_json; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::TableId; use strum::AsRefStr; -use table::table_reference::TableReference; use uuid::Uuid; -use self::context::RepartitionContext; +use self::context::{GroupManifestSummary, RepartitionContext}; use self::group::RepartitionGroupProcedure; -use self::plan::{ - PartitionRuleDiff, PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand, -}; +use self::plan::{PlanEntry, PlanGroupId, RegionDescriptor, RepartitionPlan, ResourceDemand}; +use crate::error::{self, Result}; + +/// Task payload passed from the DDL entry point. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RepartitionTask { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: TableId, + /// Partition expressions representing the source regions. + pub from_exprs: Vec, + /// Partition expressions representing the target regions. + pub into_exprs: Vec, +} /// Procedure that orchestrates the repartition flow. pub struct RepartitionProcedure { @@ -49,7 +62,8 @@ pub struct RepartitionProcedure { impl RepartitionProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::Repartition"; - pub fn new(task: RepartitionTask, context: DdlContext) -> MetaResult { + /// Constructs a new procedure instance from a task payload. + pub fn new(task: RepartitionTask, context: DdlContext) -> Result { let group_context = RepartitionContext::new(&context); Ok(Self { context, @@ -58,17 +72,8 @@ impl RepartitionProcedure { }) } - pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { - let data: RepartitionData = serde_json::from_str(json).context(FromJsonSnafu)?; - let group_context = RepartitionContext::new(&context); - Ok(Self { - context, - group_context, - data, - }) - } - - async fn on_prepare(&mut self) -> MetaResult { + /// Builds the repartition plan if we have not done so yet. + async fn on_prepare(&mut self) -> Result { if self.data.plan.is_none() { self.build_plan().await?; } @@ -77,15 +82,14 @@ impl RepartitionProcedure { Ok(Status::executing(true)) } - async fn on_allocate_resources(&mut self) -> MetaResult { + /// Allocates target regions and decides whether the procedure can proceed. + async fn on_allocate_resources(&mut self) -> Result { if !self.data.resource_allocated { - let demand = self.data.resource_demand.unwrap_or_default(); - let allocated = self.allocate_resources(demand).await?; + let allocated = self.allocate_resources().await?; if !allocated { if let Some(plan) = &self.data.plan { - self.data - .failed_groups - .extend(plan.entries.iter().map(|entry| entry.group_id)); + let failed_groups = plan.entries.iter().map(|entry| entry.group_id); + self.data.failed_groups.extend(failed_groups); } self.data.state = RepartitionState::Finalize; return Ok(Status::executing(true)); @@ -97,7 +101,8 @@ impl RepartitionProcedure { Ok(Status::executing(true)) } - async fn on_dispatch_subprocedures(&mut self) -> MetaResult { + /// Spawns group subprocedures for every pending plan entry. + async fn on_dispatch_subprocedures(&mut self) -> Result { let plan = match self.data.plan.as_ref() { Some(plan) => plan, None => { @@ -106,243 +111,289 @@ impl RepartitionProcedure { } }; - let groups_to_schedule: Vec = plan + let entries_to_schedule: Vec = plan .entries .iter() .filter(|entry| { !self.data.succeeded_groups.contains(&entry.group_id) && !self.data.failed_groups.contains(&entry.group_id) }) - .map(|entry| entry.group_id) + .cloned() .collect(); - if groups_to_schedule.is_empty() { + if entries_to_schedule.is_empty() { self.data.state = RepartitionState::Finalize; return Ok(Status::executing(true)); } - let subprocedures = self.spawn_group_procedures(plan, &groups_to_schedule); + let groups_to_schedule: Vec = entries_to_schedule + .iter() + .map(|entry| entry.group_id) + .collect(); + + let subprocedures = self.spawn_group_procedures( + plan.table_id, + plan.route_snapshot.clone(), + entries_to_schedule, + ); self.data.pending_groups = groups_to_schedule; self.data.state = RepartitionState::CollectSubprocedures; Ok(Status::suspended(subprocedures, true)) } - async fn on_collect_subprocedures(&mut self, _ctx: &ProcedureContext) -> MetaResult { - self.data - .succeeded_groups - .append(&mut self.data.pending_groups); + /// Records the list of subprocedures that finished and move to finalisation. + async fn on_collect_subprocedures(&mut self, ctx: &ProcedureContext) -> Result { + let pending = std::mem::take(&mut self.data.pending_groups); + let mut first_error: Option = None; + let mut succeeded = Vec::new(); + for group_id in pending { + let procedure_id = match self.data.group_subprocedures.remove(&group_id) { + Some(id) => id, + None => { + let err = error::RepartitionSubprocedureUnknownSnafu { group_id }.build(); + self.data.failed_groups.push(group_id); + if first_error.is_none() { + first_error = Some(err); + } + continue; + } + }; + + let state_opt = ctx.provider.procedure_state(procedure_id).await.context( + error::RepartitionSubprocedureStateFetchSnafu { + group_id, + procedure_id, + }, + )?; + + let state = match state_opt { + Some(state) => state, + None => { + let err = error::RepartitionSubprocedureStateMissingSnafu { + group_id, + procedure_id, + } + .build(); + self.data.failed_groups.push(group_id); + if first_error.is_none() { + first_error = Some(err); + } + continue; + } + }; + + if state.is_done() { + succeeded.push(group_id); + continue; + } + + let reason = state + .error() + .map(|err| err.to_string()) + .unwrap_or_else(|| format!("subprocedure state {}", state.as_str_name())); + let err = error::RepartitionSubprocedureFailedSnafu { + group_id, + procedure_id, + reason, + } + .build(); + self.data.failed_groups.push(group_id); + if first_error.is_none() { + first_error = Some(err); + } + } + + self.data.succeeded_groups.extend(succeeded); self.data.state = RepartitionState::Finalize; + + if let Some(err) = first_error { + return Err(err); + } + Ok(Status::executing(true)) } - async fn on_finalize(&mut self) -> MetaResult { + /// Builds the summary that will be returned to the caller. + async fn on_finalize(&mut self) -> Result { + self.deallocate_resources().await?; + self.data.summary = Some(RepartitionSummary { succeeded_groups: self.data.succeeded_groups.clone(), failed_groups: self.data.failed_groups.clone(), + manifest_summaries: self.group_context.manifest_summaries(), }); + self.group_context.clear_group_records(); self.data.state = RepartitionState::Finished; Ok(Status::done()) } - async fn build_plan(&mut self) -> MetaResult<()> { + /// Constructs the repartition plan from the task specification. + async fn build_plan(&mut self) -> Result<()> { let table_id = self.data.task.table_id; - let table_ref = self.data.task.table_ref(); - let table_name_str = - format_full_table_name(table_ref.catalog, table_ref.schema, table_ref.table); - - self.context - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await? - .ok_or_else(|| { - error::TableNotFoundSnafu { - table_name: table_name_str.clone(), - } - .build() - })?; + let from_exprs = &self.data.task.from_exprs; + let into_exprs = &self.data.task.into_exprs; let (physical_table_id, physical_route) = self .context .table_metadata_manager .table_route_manager() .get_physical_table_route(table_id) - .await?; + .await + .context(error::TableMetadataManagerSnafu)?; - let from_exprs_json = self.data.task.from_exprs_json.clone(); - let into_exprs_json = self.data.task.into_exprs_json.clone(); + let src_descriptors = Self::source_region_descriptors(from_exprs, &physical_route)?; + let subtasks = subtask::create_subtasks(from_exprs, into_exprs) + .context(error::RepartitionCreateSubtasksSnafu)?; + let entries = Self::build_plan_entries(subtasks, &src_descriptors, into_exprs); - let from_exprs = Self::deserialize_partition_exprs(&from_exprs_json)?; - let into_exprs = Self::deserialize_partition_exprs(&into_exprs_json)?; + let demand = ResourceDemand::from_plan_entries(&entries); + let plan = RepartitionPlan::new(physical_table_id, entries, demand, physical_route.clone()); + self.data.plan = Some(plan); + Ok(()) + } + + fn source_region_descriptors( + from_exprs: &[PartitionExpr], + physical_route: &PhysicalTableRouteValue, + ) -> Result> { let existing_regions = physical_route .region_routes .iter() .map(|route| (route.region.id, route.region.partition_expr())) .collect::>(); - let mut used_regions = vec![false; existing_regions.len()]; - let mut source_descriptor_by_index = Vec::with_capacity(from_exprs_json.len()); - for expr_json in &from_exprs_json { - let mut found = None; - for (idx, (region_id, expr)) in existing_regions.iter().enumerate() { - if !used_regions[idx] && expr == expr_json { - found = Some((*region_id, expr.clone())); - used_regions[idx] = true; - break; - } - } - - let (region_id, partition_expr_json) = found.ok_or_else(|| { - error::UnsupportedSnafu { - operation: format!( - "repartition source expression '{}' does not match any existing region", - expr_json - ), - } - .build() - })?; - - source_descriptor_by_index.push(RegionDescriptor { - region_id: Some(region_id), - partition_expr_json, - }); - } - - let subtasks = subtask::create_subtasks(&from_exprs, &into_exprs).map_err(|err| { - error::UnsupportedSnafu { - operation: format!("create_subtasks failed: {err}"), - } - .build() - })?; - - let mut plan = RepartitionPlan::empty(physical_table_id); - let mut diff = PartitionRuleDiff::default(); - let mut demand = ResourceDemand::default(); - - for subtask in subtasks { - let group_id = Uuid::new_v4(); - - let sources = subtask - .from_expr_indices - .iter() - .map(|&idx| source_descriptor_by_index[idx].clone()) - .collect::>(); - - let targets = subtask - .to_expr_indices - .iter() - .enumerate() - .map(|(position, &idx)| { - let reused_region = if subtask.from_expr_indices.len() == 1 { - if position == 0 { - sources.get(0).and_then(|descriptor| descriptor.region_id) - } else { - None - } - } else if subtask.to_expr_indices.len() == 1 { - sources.first().and_then(|descriptor| descriptor.region_id) - } else { - sources - .get(position) - .and_then(|descriptor| descriptor.region_id) - }; - - RegionDescriptor { - region_id: reused_region, - partition_expr_json: into_exprs_json[idx].clone(), - } - }) - .collect::>(); - - let entry = PlanEntry::new(group_id, subtask, sources, targets); - demand.add_entry(&entry); - diff.entries.push(group_id); - plan.entries.push(entry); - } - - plan.resource_demand = demand; - plan.route_snapshot = physical_route.clone(); - plan.plan_hash = format!( - "{}:{}->{}", - physical_table_id, - Self::expr_signature(&from_exprs_json), - Self::expr_signature(&into_exprs_json) - ); - - self.data.plan = Some(plan); - self.data.rule_diff = Some(diff); - self.data.resource_demand = Some(demand); - - Ok(()) - } - - async fn allocate_resources(&self, _demand: ResourceDemand) -> MetaResult { - Ok(true) - } - - fn spawn_group_procedures( - &self, - plan: &RepartitionPlan, - group_ids: &[PlanGroupId], - ) -> Vec { - group_ids + let descriptors = from_exprs .iter() - .filter_map(|group_id| { - plan.entries + .map(|expr| { + let expr_json = expr + .as_json_str() + .context(error::RepartitionSerializePartitionExprSnafu)?; + + let matched_region_id = existing_regions .iter() - .find(|entry| entry.group_id == *group_id) - .cloned() + .find_map(|(region_id, existing_expr)| { + (existing_expr == &expr_json).then_some(*region_id) + }) + .with_context(|| error::RepartitionSourceExprMismatchSnafu { + expr: expr_json, + })?; + + Ok(RegionDescriptor { + region_id: Some(matched_region_id), + partition_expr: expr.clone(), + }) }) - .map(|entry| { - let route_snapshot = plan.route_snapshot.region_routes.clone(); - ProcedureWithId::with_random_id(Box::new(RepartitionGroupProcedure::new( - entry, - plan.table_id, - route_snapshot, - self.group_context.clone(), - ))) - }) - .collect() + .collect::>>()?; + + Ok(descriptors) } + fn build_plan_entries( + subtasks: Vec, + source_index: &[RegionDescriptor], + target_exprs: &[PartitionExpr], + ) -> Vec { + let plan_entries = subtasks + .into_iter() + .map(|subtask| { + let group_id = Uuid::new_v4(); + let sources = subtask + .from_expr_indices + .iter() + .map(|&idx| source_index[idx].clone()) + .collect::>(); + + let targets = subtask + .to_expr_indices + .iter() + .map(|&idx| RegionDescriptor { + region_id: None, // will be assigned later + partition_expr: target_exprs[idx].clone(), + }) + .collect::>(); + + PlanEntry::new(group_id, subtask, sources, targets) + }) + .collect::>(); + + plan_entries + } + + /// Allocates resources required by the plan. Returning `false` + /// indicates that the procedure should abort. + async fn allocate_resources(&mut self) -> Result { + todo!("allocate resources"); + } + + async fn deallocate_resources(&mut self) -> Result<()> { + if !self.data.resource_allocated { + return Ok(()); + } + self.data.resource_allocated = false; + + todo!("deallocate resources"); + } + + /// Builds the child procedure list for the provided plan groups. + fn spawn_group_procedures( + &mut self, + table_id: TableId, + route_snapshot: PhysicalTableRouteValue, + entries: Vec, + ) -> Vec { + let mut id_map = HashMap::new(); + + let procedures = entries + .into_iter() + .map(|entry| { + let group_id = entry.group_id; + let group_procedure = RepartitionGroupProcedure::new( + entry, + table_id, + route_snapshot.clone(), + self.data.task.catalog_name.clone(), + self.data.task.schema_name.clone(), + self.group_context.clone(), + ); + let procedure = ProcedureWithId::with_random_id(Box::new(group_procedure)); + id_map.insert(group_id, procedure.id); + procedure + }) + .collect::>(); + + self.data.group_subprocedures = id_map; + procedures + } + + /// Composes the set of locks required to safely mutate table metadata. fn table_lock_key(&self) -> Vec { let mut lock_key = Vec::with_capacity(3); - let table_ref = self.data.task.table_ref(); - lock_key.push(CatalogLock::Read(table_ref.catalog).into()); - lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); + let catalog = self.data.task.catalog_name.as_str(); + let schema = self.data.task.schema_name.as_str(); + lock_key.push(CatalogLock::Read(catalog).into()); + lock_key.push(SchemaLock::read(catalog, schema).into()); lock_key.push(TableLock::Write(self.data.task.table_id).into()); lock_key } - fn deserialize_partition_exprs(exprs_json: &[String]) -> MetaResult> { - exprs_json - .iter() - .map(|json| { - let expr = PartitionExpr::from_json_str(json).map_err(|err| { - error::UnsupportedSnafu { - operation: format!( - "deserialize partition expression '{json}' failed: {err}" - ), - } - .build() - })?; + async fn trigger_group_rollbacks(&mut self) { + if self.data.rollback_triggered { + return; + } - expr.ok_or_else(|| { - error::UnsupportedSnafu { - operation: format!("empty partition expression '{json}'"), - } - .build() - }) - }) - .collect() - } - - fn expr_signature(exprs: &[String]) -> String { - exprs.join("|") + match self.group_context.rollback_registered_groups().await { + Ok(_) => { + self.data.rollback_triggered = true; + } + Err(err) => { + error!(err; "repartition: rollback of successful groups failed"); + self.data.rollback_triggered = true; + } + } } } @@ -363,7 +414,16 @@ impl Procedure for RepartitionProcedure { RepartitionState::Finished => Ok(Status::done()), }; - status.map_err(map_to_procedure_error) + match status { + Ok(status) => Ok(status), + Err(err) => { + self.trigger_group_rollbacks().await; + if let Err(dealloc_err) = self.deallocate_resources().await { + error!(dealloc_err; "repartition: deallocating resources after failure failed"); + } + Err(map_repartition_error(err)) + } + } } fn dump(&self) -> ProcedureResult { @@ -383,10 +443,6 @@ struct RepartitionData { #[serde(default)] plan: Option, #[serde(default)] - rule_diff: Option, - #[serde(default)] - resource_demand: Option, - #[serde(default)] resource_allocated: bool, #[serde(default)] pending_groups: Vec, @@ -396,25 +452,39 @@ struct RepartitionData { failed_groups: Vec, #[serde(default)] summary: Option, + #[serde(default)] + rollback_triggered: bool, + #[serde(default)] + group_subprocedures: HashMap, } impl RepartitionData { + /// Initialise the procedure data for a fresh run. fn new(task: RepartitionTask) -> Self { Self { state: RepartitionState::Prepare, task, plan: None, - rule_diff: None, - resource_demand: None, resource_allocated: false, pending_groups: Vec::new(), succeeded_groups: Vec::new(), failed_groups: Vec::new(), summary: None, + rollback_triggered: false, + group_subprocedures: HashMap::new(), } } } +pub(super) fn map_repartition_error(err: error::Error) -> ProcedureError { + match (err.is_retryable(), err.need_clean_poisons()) { + (true, true) => ProcedureError::retry_later_and_clean_poisons(err), + (true, false) => ProcedureError::retry_later(err), + (false, true) => ProcedureError::external_and_clean_poisons(err), + (false, false) => ProcedureError::external(err), + } +} + /// High level states of the repartition procedure. #[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] enum RepartitionState { @@ -426,31 +496,11 @@ enum RepartitionState { Finished, } +/// Information returned to the caller after the procedure finishes. #[derive(Debug, Clone, Serialize, Deserialize, Default)] struct RepartitionSummary { succeeded_groups: Vec, failed_groups: Vec, -} - -/// Task payload passed from the DDL entry point. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RepartitionTask { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub table_id: TableId, - /// Partition expressions of source regions (JSON encoded `PartitionExpr`). - pub from_exprs_json: Vec, - /// Partition expressions of target regions (JSON encoded `PartitionExpr`). - pub into_exprs_json: Vec, -} - -impl RepartitionTask { - fn table_ref(&self) -> TableReference<'_> { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } + #[serde(default)] + manifest_summaries: Vec, } diff --git a/src/meta-srv/src/procedure/repartition/context.rs b/src/meta-srv/src/procedure/repartition/context.rs index f36748c62f..179ce74d36 100644 --- a/src/meta-srv/src/procedure/repartition/context.rs +++ b/src/meta-srv/src/procedure/repartition/context.rs @@ -12,21 +12,340 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use api::region::RegionResponse; +use api::v1::region::{ + ApplyStagedManifestRequest, PauseRequest, PublishRegionRuleRequest, RegionRequest, + RegionRequestHeader, RemapManifestRequest, ResumeRequest, StageRegionRuleRequest, + region_request, +}; +use common_error::ext::BoxedError; use common_meta::ddl::DdlContext; use common_meta::key::TableMetadataManagerRef; use common_meta::node_manager::NodeManagerRef; +use common_meta::peer::Peer; +use common_telemetry::{error, info}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use snafu::ResultExt; +use store_api::storage::RegionId; +use crate::error::{self, Result}; + +pub const REMAP_MANIFEST_STATS_EXTENSION: &str = "repartition.manifest.stats"; + +use super::group::{GroupRollbackRecord, RepartitionGroupProcedure}; +use crate::procedure::repartition::plan::PlanGroupId; + +/// Track the overall manifest stage for a repartition group. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +pub enum ManifestStatus { + #[default] + NotStarted, + Staged, + Published, + Discarded, + Skipped, + Failed, +} + +/// Per-group status record that is collected by the top-level procedure. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GroupManifestSummary { + pub group_id: PlanGroupId, + pub status: ManifestStatus, + pub staged_region_count: u64, + pub stats: Option, + pub error: Option, +} + +/// Shared context that allows group procedures to interact with metadata and +/// datanodes. It also aggregates per-group manifest summaries. #[derive(Clone)] pub struct RepartitionContext { pub table_metadata_manager: TableMetadataManagerRef, - pub _node_manager: NodeManagerRef, + pub node_manager: NodeManagerRef, + manifest_records: Arc>>, + rollback_records: Arc>>, } impl RepartitionContext { pub fn new(context: &DdlContext) -> Self { Self { table_metadata_manager: context.table_metadata_manager.clone(), - _node_manager: context.node_manager.clone(), + node_manager: context.node_manager.clone(), + manifest_records: Arc::new(Mutex::new(HashMap::new())), + rollback_records: Arc::new(Mutex::new(HashMap::new())), } } + + /// Send a pause request to the region leader so that local IO is quiesced. + pub async fn pause_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> { + info!( + "requesting pause to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::Pause(PauseRequest { + region_id: region_id.as_u64(), + })), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to pause region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Resume a previously paused region. + pub async fn resume_region_on_datanode(&self, peer: &Peer, region_id: RegionId) -> Result<()> { + info!( + "requesting resume to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::Resume(ResumeRequest { + region_id: region_id.as_u64(), + rule_version: String::new(), + })), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to resume region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Stage the provided rule version on the datanode. + pub async fn stage_region_rule_on_datanode( + &self, + peer: &Peer, + region_id: RegionId, + rule_version: &str, + ) -> Result<()> { + info!( + "requesting region rule staging to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::StageRegionRule( + StageRegionRuleRequest { + region_id: region_id.as_u64(), + rule_version: rule_version.to_string(), + }, + )), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to stage region rule for region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Publish the staged rule version to make it active. + pub async fn publish_region_rule_on_datanode( + &self, + peer: &Peer, + region_id: RegionId, + rule_version: &str, + ) -> Result<()> { + info!( + "requesting region rule publish to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::PublishRegionRule( + PublishRegionRuleRequest { + region_id: region_id.as_u64(), + rule_version: rule_version.to_string(), + }, + )), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to publish region rule for region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Drop the staged rule version during rollback. + pub async fn clear_region_rule_stage_on_datanode( + &self, + peer: &Peer, + region_id: RegionId, + ) -> Result<()> { + info!( + "requesting region rule stage clear to datanode {} for region {}", + peer.id, region_id + ); + let datanode = self.node_manager.datanode(peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::StageRegionRule( + StageRegionRuleRequest { + region_id: region_id.as_u64(), + rule_version: String::new(), + }, + )), + }; + datanode + .handle(request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to clear staged region rule for region {} on datanode {}", + region_id, peer.id + ), + })?; + Ok(()) + } + + /// Instruct the datanode to remap manifests for this group. + pub async fn remap_manifests_on_datanode( + &self, + peer: &Peer, + manifest_request: RemapManifestRequest, + ) -> Result { + let table_id = manifest_request.table_id; + let group_id = manifest_request.group_id.clone(); + info!( + "requesting manifest remap to datanode {} for table {} in group {}", + peer.id, table_id, group_id + ); + let datanode = self.node_manager.datanode(peer).await; + let region_request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::RemapManifest(manifest_request)), + }; + let response = datanode + .handle(region_request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to remap manifests for group {} on datanode {}", + group_id, peer.id + ), + })?; + Ok(response) + } + + /// Publish or discard staged manifests. + pub async fn apply_staged_manifests_on_datanode( + &self, + peer: &Peer, + manifest_request: ApplyStagedManifestRequest, + ) -> Result { + let publish = manifest_request.publish; + let table_id = manifest_request.table_id; + let group_id = manifest_request.group_id.clone(); + info!( + "requesting manifest {} on datanode {} for table {} in group {}", + if publish { "publish" } else { "discard" }, + peer.id, + table_id, + group_id + ); + let datanode = self.node_manager.datanode(peer).await; + let region_request = RegionRequest { + header: Some(RegionRequestHeader::default()), + body: Some(region_request::Body::ApplyStagedManifest(manifest_request)), + }; + let response = datanode + .handle(region_request) + .await + .map_err(BoxedError::new) + .context(error::RetryLaterWithSourceSnafu { + reason: format!( + "failed to {} staged manifests for group {} on datanode {}", + if publish { "publish" } else { "discard" }, + group_id, + peer.id + ), + })?; + Ok(response) + } + + /// Store the latest manifest summary for a group. + pub fn record_manifest_summary(&self, summary: GroupManifestSummary) { + let mut records = self.manifest_records.lock().unwrap(); + records.insert(summary.group_id, summary); + } + + pub fn register_group_success(&self, record: GroupRollbackRecord) { + let mut records = self.rollback_records.lock().unwrap(); + let group_id = record.group_id; + records.insert(group_id, record); + } + + pub async fn rollback_registered_groups(&self) -> Result<()> { + let records: Vec = { + let mut map = self.rollback_records.lock().unwrap(); + map.drain().map(|(_, record)| record).collect() + }; + + let mut first_err: Option = None; + for record in records { + let group_id = record.group_id; + if let Err(err) = + RepartitionGroupProcedure::execute_rollback(self.clone(), record).await + { + error!(err; "repartition: rollback of group {:?} failed", group_id); + if first_err.is_none() { + first_err = Some(err); + } + } + } + + if let Some(err) = first_err { + return Err(err); + } + + Ok(()) + } + + pub fn clear_group_records(&self) { + self.rollback_records.lock().unwrap().clear(); + } + + /// Collect all manifest summaries recorded so far. + pub fn manifest_summaries(&self) -> Vec { + let records = self.manifest_records.lock().unwrap(); + records.values().cloned().collect() + } } diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index cc5dfe1d91..e9cd13ac52 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -12,21 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap}; -use common_meta::ddl::utils::map_to_procedure_error; -use common_meta::error::{self, Result as MetaResult}; -use common_meta::rpc::router::RegionRoute; +use api::region::RegionResponse; +use api::v1::region::{ + ApplyStagedManifestRequest, RemapManifestRequest, RemapManifestSource, RemapManifestTarget, +}; +use common_meta::key::DeserializedValueWithBytes; +use common_meta::key::datanode_table::{DatanodeTableKey, RegionInfo}; +use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; +use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock}; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; use common_procedure::error::{Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_telemetry::{debug, info}; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use serde_json::{self, Value}; +use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::{RegionId, TableId}; use strum::AsRefStr; -use crate::procedure::repartition::context::RepartitionContext; -use crate::procedure::repartition::plan::PlanEntry; +use crate::error::{self, Result}; +use crate::procedure::repartition::context::{ + GroupManifestSummary, ManifestStatus, REMAP_MANIFEST_STATS_EXTENSION, RepartitionContext, +}; +use crate::procedure::repartition::plan::{PlanEntry, PlanGroupId, RegionDescriptor}; +/// Logical states executed by the group procedure state machine. #[derive(Debug, Clone, Copy, Serialize, Deserialize, AsRefStr)] pub enum GroupState { Prepare, @@ -39,12 +52,18 @@ pub enum GroupState { Finished, } +/// Persisted snapshot of group execution, replayed by the procedure framework +/// when the workflow is resumed. #[derive(Debug, Clone, Serialize, Deserialize)] struct GroupProcedureData { table_id: TableId, entry: PlanEntry, state: GroupState, - route_snapshot: Vec, + route_snapshot: PhysicalTableRouteValue, + #[serde(skip)] + route_raw: Option>, + #[serde(skip)] + region_info: Option, #[serde(default)] prepare_result: Option, #[serde(default)] @@ -53,29 +72,175 @@ struct GroupProcedureData { metadata_updated: bool, #[serde(default)] staged_regions: Vec, + #[serde(default)] + region_rule_updated: bool, + #[serde(default)] + region_rule_version: Option, + #[serde(default)] + manifests_generated: bool, + #[serde(default)] + manifest_stats: Option, + #[serde(default)] + catalog_name: String, + #[serde(default)] + schema_name: String, + #[serde(default)] + manifest_summary: GroupManifestSummary, + #[serde(default)] + route_committed: bool, + #[serde(default)] + rollback_registered: bool, +} + +impl GroupProcedureData { + /// Record the latest manifest status and return a cloned summary for + /// immediate use by the caller. + fn record_manifest_summary( + &mut self, + status: ManifestStatus, + staged_region_count: u64, + error: Option, + ) -> GroupManifestSummary { + self.manifest_summary.status = status; + self.manifest_summary.staged_region_count = staged_region_count; + self.manifest_summary.stats = self.manifest_stats.clone(); + self.manifest_summary.error = error; + self.manifests_generated = + matches!(status, ManifestStatus::Staged) && staged_region_count > 0; + self.manifest_summary.clone() + } + + /// Drop manifest tracking state – typically called around rollback paths. + fn reset_manifest_state(&mut self) { + self.manifest_stats = None; + self.manifests_generated = false; + } + + /// Decode manifest stats from the RPC response and update bookkeeping. + fn note_remap_stats(&mut self, response: &RegionResponse) { + if let Some(payload) = response.extensions.get(REMAP_MANIFEST_STATS_EXTENSION) { + match serde_json::from_slice::(payload) { + Ok(value) => { + self.manifest_stats = Some(value.clone()); + let total_refs = value + .get("total_file_refs") + .and_then(Value::as_u64) + .unwrap_or_default(); + info!( + "repartition group {:?}: staged manifests for {} regions (total file refs {})", + self.entry.group_id, response.affected_rows, total_refs + ); + debug!( + "repartition group {:?}: manifest remap detail {:?}", + self.entry.group_id, value + ); + } + Err(err) => { + debug!( + error = ?err, + "repartition group {:?}: failed to decode manifest remap stats", + self.entry.group_id + ); + self.reset_manifest_state(); + } + } + } else { + debug!( + "repartition group {:?}: manifest remap response missing stats extension", + self.entry.group_id + ); + self.reset_manifest_state(); + } + } + + /// Update manifest bookkeeping after publishing or discarding staged files. + fn note_manifest_application( + &mut self, + publish: bool, + response: Option<&RegionResponse>, + ) -> GroupManifestSummary { + match response { + Some(resp) => { + self.note_remap_stats(resp); + let status = if publish { + ManifestStatus::Published + } else { + ManifestStatus::Discarded + }; + let summary = self.record_manifest_summary(status, resp.affected_rows as u64, None); + if !publish { + self.reset_manifest_state(); + } + summary + } + None => { + self.reset_manifest_state(); + self.record_manifest_summary(ManifestStatus::Skipped, 0, None) + } + } + } +} + +#[derive(Clone)] +pub(crate) struct GroupRollbackRecord { + pub(crate) group_id: PlanGroupId, + data: GroupProcedureData, +} + +impl GroupRollbackRecord { + fn new(data: &GroupProcedureData) -> Option { + if data.route_raw.is_none() { + return None; + } + + let mut snapshot = data.clone(); + snapshot.route_committed = true; + Some(Self { + group_id: data.entry.group_id, + data: snapshot, + }) + } + + fn into_inner(self) -> (PlanGroupId, GroupProcedureData) { + (self.group_id, self.data) + } } #[derive(Debug, Clone, Serialize, Deserialize)] struct GroupPrepareResult { source_routes: Vec, target_routes: Vec>, - central_region: u64, + central_region: RegionId, } +/// Stateful executor that drives a single plan group through the repartition +/// lifecycle. It is scheduled by the parent procedure and persisted/resumed by +/// the procedure framework. pub struct RepartitionGroupProcedure { context: RepartitionContext, data: GroupProcedureData, } +/// Lazy payload describing the table route delta that the confirm stage writes. +#[allow(dead_code)] +pub struct RouteMetadataPayload<'a> { + pub table_id: TableId, + pub original: &'a DeserializedValueWithBytes, + pub new_routes: Vec, +} + impl RepartitionGroupProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup"; pub fn new( entry: PlanEntry, table_id: TableId, - route_snapshot: Vec, + route_snapshot: PhysicalTableRouteValue, + catalog_name: String, + schema_name: String, context: RepartitionContext, ) -> Self { + let group_id = entry.group_id; Self { context, data: GroupProcedureData { @@ -83,15 +248,214 @@ impl RepartitionGroupProcedure { entry, state: GroupState::Prepare, route_snapshot, + route_raw: None, + region_info: None, prepare_result: None, freeze_completed: false, metadata_updated: false, staged_regions: Vec::new(), + region_rule_updated: false, + region_rule_version: None, + manifests_generated: false, + manifest_stats: None, + catalog_name, + schema_name, + manifest_summary: GroupManifestSummary { + group_id, + ..Default::default() + }, + route_committed: false, + rollback_registered: false, }, } } - pub async fn step(&mut self) -> MetaResult { + #[allow(dead_code)] + pub fn route_metadata_payload(&self) -> Option> { + if !self.data.metadata_updated { + return None; + } + + let original = self.data.route_raw.as_ref()?; + Some(RouteMetadataPayload { + table_id: self.data.table_id, + original, + new_routes: self.data.route_snapshot.region_routes.clone(), + }) + } + + /// Update the shared context with the latest manifest summary. + fn update_manifest_summary( + &mut self, + status: ManifestStatus, + staged_region_count: u64, + error: Option, + ) { + let summary = self + .data + .record_manifest_summary(status, staged_region_count, error); + self.context.record_manifest_summary(summary); + } + + /// Standardised bookkeeping when a manifest-related RPC fails. + fn handle_manifest_failure(&mut self, status: ManifestStatus, error: String) { + self.data.reset_manifest_state(); + self.update_manifest_summary(status, 0, Some(error)); + } + + fn register_success_record(&mut self) { + if self.data.rollback_registered { + return; + } + + if let Some(record) = GroupRollbackRecord::new(&self.data) { + self.context.register_group_success(record); + self.data.rollback_registered = true; + } + } + + pub(crate) async fn execute_rollback( + context: RepartitionContext, + record: GroupRollbackRecord, + ) -> Result<()> { + let (_, data) = record.into_inner(); + let mut procedure = Self { context, data }; + procedure.rollback_group(GroupState::Prepare).await + } + + /// Roll the group back to an earlier state after a failure. The method + /// attempts to reverse any local state, resume regions, and reapply the + /// original table route if necessary. + async fn rollback_group(&mut self, reset_state: GroupState) -> Result<()> { + debug!( + "repartition group {:?}: rolling back to {:?}", + self.data.entry.group_id, reset_state + ); + + let needs_cleanup = self.data.prepare_result.is_some() + && (self.data.freeze_completed + || self.data.metadata_updated + || self.data.region_rule_updated + || self.data.manifests_generated + || !self.data.staged_regions.is_empty()); + + if needs_cleanup { + self.cleanup_resources(false).await?; + } + + if self.data.route_committed { + self.revert_table_route().await?; + } else { + self.refresh_route_snapshot().await?; + } + + self.reset_in_memory_state(); + self.data.state = reset_state; + Ok(()) + } + + /// Restore the original table route snapshot that was captured before this + /// group wrote a new route during the confirm phase. + async fn revert_table_route(&mut self) -> Result<()> { + let original = self + .data + .route_raw + .as_ref() + .context(error::UnexpectedSnafu { + violated: format!( + "group {} missing original route snapshot during rollback", + self.data.entry.group_id + ), + })?; + + let route_manager = self.context.table_metadata_manager.table_route_manager(); + let current = route_manager + .table_route_storage() + .get_with_raw_bytes(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::UnexpectedSnafu { + violated: format!( + "group {} missing current route snapshot during rollback", + self.data.entry.group_id + ), + })?; + + let original_routes = original + .region_routes() + .context(error::TableMetadataManagerSnafu)? + .clone(); + let region_info = self + .data + .region_info + .clone() + .context(error::UnexpectedSnafu { + violated: format!( + "group {} missing region info for rollback", + self.data.entry.group_id + ), + })?; + + self.context + .table_metadata_manager + .update_table_route( + self.data.table_id, + region_info.clone(), + ¤t, + original_routes, + ®ion_info.region_options, + ®ion_info.region_wal_options, + ) + .await + .context(error::TableMetadataManagerSnafu)?; + + self.refresh_route_snapshot().await?; + self.data.route_committed = false; + Ok(()) + } + + /// Refresh the in-memory table route snapshot from storage. This should be + /// used after either commit or rollback paths run. + async fn refresh_route_snapshot(&mut self) -> Result<()> { + let table_route_manager = self.context.table_metadata_manager.table_route_manager(); + let (_, latest_route) = table_route_manager + .get_physical_table_route(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)?; + let raw_route = table_route_manager + .table_route_storage() + .get_with_raw_bytes(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::UnexpectedSnafu { + violated: format!( + "group {} failed to refresh route snapshot", + self.data.entry.group_id + ), + })?; + + self.data.route_snapshot = latest_route; + self.data.route_raw = Some(raw_route); + Ok(()) + } + + /// Reset volatile fields so the procedure can safely retry the workflow. + fn reset_in_memory_state(&mut self) { + self.data.prepare_result = None; + self.data.region_info = None; + self.data.freeze_completed = false; + self.data.metadata_updated = false; + self.data.region_rule_updated = false; + self.data.region_rule_version = None; + self.data.manifests_generated = false; + self.data.manifest_stats = None; + self.data.staged_regions.clear(); + self.data.route_committed = false; + } + + /// Drive the procedure state machine by executing the handler that + /// corresponds to the current state and returning the framework status. + pub async fn step(&mut self) -> Result { match self.data.state { GroupState::Prepare => { self.on_prepare().await?; @@ -125,6 +489,7 @@ impl RepartitionGroupProcedure { } GroupState::Cleanup => { self.on_cleanup().await?; + self.register_success_record(); self.data.state = GroupState::Finished; Ok(Status::done()) } @@ -132,27 +497,46 @@ impl RepartitionGroupProcedure { } } - async fn on_prepare(&mut self) -> MetaResult<()> { + /// Capture the latest metadata snapshot prior to any modifications. + async fn on_prepare(&mut self) -> Result<()> { if self.data.prepare_result.is_some() { return Ok(()); } - let (_, latest_route) = self - .context - .table_metadata_manager - .table_route_manager() - .get_physical_table_route(self.data.table_id) - .await?; + info!( + "repartition group {:?}: preparing metadata snapshot for table {}", + self.data.entry.group_id, self.data.table_id + ); - self.data.route_snapshot = latest_route.region_routes.clone(); + let table_route_manager = self.context.table_metadata_manager.table_route_manager(); + let (_, latest_route) = table_route_manager + .get_physical_table_route(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)?; + + let raw_route = table_route_manager + .table_route_storage() + .get_with_raw_bytes(self.data.table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::UnexpectedSnafu { + violated: format!( + "table {} route missing raw snapshot during repartition", + self.data.table_id + ), + })?; + + self.data.route_snapshot = latest_route.clone(); + self.data.route_raw = Some(raw_route.clone()); + self.ensure_target_routes_present()?; let mut source_routes = Vec::with_capacity(self.data.entry.sources.len()); for descriptor in &self.data.entry.sources { let Some(region_id) = descriptor.region_id else { - return Err(error::UnsupportedSnafu { - operation: format!("group {} lacks source region id", self.data.entry.group_id), + return error::RepartitionMissingSourceRegionIdSnafu { + group_id: self.data.entry.group_id, } - .build()); + .fail(); }; let Some(route) = latest_route @@ -160,65 +544,75 @@ impl RepartitionGroupProcedure { .iter() .find(|route| route.region.id == region_id) else { - return Err(error::UnsupportedSnafu { - operation: format!( - "group {} source region {region_id} missing from route snapshot", - self.data.entry.group_id - ), + return error::RepartitionSourceRegionRouteMissingSnafu { + group_id: self.data.entry.group_id, + region_id, } - .build()); + .fail(); }; source_routes.push(route.clone()); } if source_routes.is_empty() { - return Err(error::UnsupportedSnafu { - operation: format!( - "group {} has no source regions after planning", - self.data.entry.group_id - ), + return error::RepartitionNoSourceRegionsSnafu { + group_id: self.data.entry.group_id, } - .build()); + .fail(); } let mut target_routes = Vec::with_capacity(self.data.entry.targets.len()); for descriptor in &self.data.entry.targets { if let Some(region_id) = descriptor.region_id { - let Some(route) = latest_route + if let Some(route) = latest_route .region_routes .iter() .find(|route| route.region.id == region_id) - else { - return Err(error::UnsupportedSnafu { - operation: format!( - "group {} target region {region_id} missing from route snapshot", - self.data.entry.group_id - ), - } - .build()); - }; - - target_routes.push(Some(route.clone())); + { + target_routes.push(Some(route.clone())); + } else { + target_routes.push(None); + } } else { target_routes.push(None); } } - let central_region = source_routes[0].region.id.as_u64(); + let central_region = source_routes[0].region.id; self.data.prepare_result = Some(GroupPrepareResult { source_routes, target_routes, central_region, }); + debug!( + "repartition group {:?}: captured {} sources, {} targets", + self.data.entry.group_id, + self.data + .prepare_result + .as_ref() + .map(|r| r.source_routes.len()) + .unwrap_or(0), + self.data + .prepare_result + .as_ref() + .map(|r| r.target_routes.len()) + .unwrap_or(0) + ); + Ok(()) } - async fn on_freeze(&mut self) -> MetaResult<()> { + /// Pause IO on the source/target regions in preparation for metadata work. + async fn on_freeze(&mut self) -> Result<()> { if self.data.freeze_completed { return Ok(()); } + info!( + "repartition group {:?}: entering freeze stage", + self.data.entry.group_id + ); + let prepare_result = self.prepare_result()?; for route in &prepare_result.source_routes { @@ -230,14 +624,29 @@ impl RepartitionGroupProcedure { } self.data.freeze_completed = true; + debug!( + "repartition group {:?}: freeze stage completed", + self.data.entry.group_id + ); Ok(()) } - async fn on_update_metadata(&mut self) -> MetaResult<()> { + /// Apply partition rule updates and mark the involved regions as staging in + /// the metadata layer. + async fn on_update_metadata(&mut self) -> Result<()> { if self.data.metadata_updated { return Ok(()); } + self.ensure_targets_allocated()?; + + info!( + "repartition group {:?}: applying metadata updates", + self.data.entry.group_id + ); + + self.apply_target_partition_rules()?; + let prepare_result = self.prepare_result()?; let region_ids = self.collect_existing_region_ids(prepare_result); if region_ids.is_empty() { @@ -252,40 +661,391 @@ impl RepartitionGroupProcedure { if staged_set.insert(*region_id) { route_manager .set_region_staging_state(*region_id, true) - .await?; + .await + .context(error::TableMetadataManagerSnafu)?; } } - Self::mark_regions_staging(&mut self.data.route_snapshot, ®ion_ids); + Self::mark_regions_staging(&mut self.data.route_snapshot.region_routes, ®ion_ids); self.data.staged_regions = staged_set.into_iter().collect(); self.data.metadata_updated = true; + debug!( + "repartition group {:?}: staged regions {:?}", + self.data.entry.group_id, self.data.staged_regions + ); Ok(()) } - async fn on_update_region_rule(&mut self) -> MetaResult<()> { - // TODO: Push new region rules to datanodes and reject outdated writes. - Ok(()) - } - - async fn on_update_manifests(&mut self) -> MetaResult<()> { - // TODO: Generate and submit new manifests for target regions. - Ok(()) - } - - async fn on_confirm(&mut self) -> MetaResult<()> { - // TODO: Confirm staged writes, resume ingestion, and record success. - Ok(()) - } - - async fn on_cleanup(&mut self) -> MetaResult<()> { - let prepare_result = self.prepare_result()?; - - for route in &prepare_result.source_routes { - self.resume_region(route).await?; + /// Stage the next rule version on every region that participates in the + /// repartition group. + async fn on_update_region_rule(&mut self) -> Result<()> { + if self.data.region_rule_updated { + return Ok(()); } - for route in prepare_result.target_routes.iter().flatten() { - self.resume_region(route).await?; + + ensure!( + self.data.metadata_updated, + error::UnexpectedSnafu { + violated: format!( + "group {} region rule update before metadata stage", + self.data.entry.group_id + ), + } + ); + + let prepare_result = self.prepare_result()?; + let region_ids = self.collect_existing_region_ids(prepare_result); + + info!( + "repartition group {:?}: scheduling region rule update for {:?}", + self.data.entry.group_id, region_ids + ); + + let rule_version = self + .data + .region_rule_version + .get_or_insert_with(|| format!("{}", self.data.entry.group_id)) + .clone(); + + let stage_targets: Vec<_> = { + let prepare_result = self.prepare_result()?; + prepare_result + .source_routes + .iter() + .chain(prepare_result.target_routes.iter().flatten()) + .map(|route| (route.leader_peer.clone(), route.region.id)) + .collect() + }; + + for (peer, region_id) in stage_targets { + if let Some(peer) = peer { + self.context + .stage_region_rule_on_datanode(&peer, region_id, &rule_version) + .await?; + } else { + debug!( + "repartition group {:?}: skip region rule staging, region {} has no leader", + self.data.entry.group_id, region_id + ); + } + } + + self.data.region_rule_updated = true; + Ok(()) + } + + /// Trigger manifest remapping on the datanode and record the resulting stats. + async fn on_update_manifests(&mut self) -> Result<()> { + if self.data.manifests_generated { + return Ok(()); + } + + ensure!( + self.data.region_rule_updated, + error::UnexpectedSnafu { + violated: format!( + "group {} manifest stage invoked before region rule update", + self.data.entry.group_id + ), + } + ); + + self.ensure_targets_allocated()?; + + let prepare_result = self.prepare_result()?; + let leader_peer = self.leader_peer_for_central(prepare_result)?; + + let mut target_index_lookup = HashMap::new(); + for (position, &global_idx) in self.data.entry.subtask.to_expr_indices.iter().enumerate() { + target_index_lookup.insert(global_idx, position); + } + + let mut sources = Vec::with_capacity(self.data.entry.sources.len()); + for (source_position, descriptor) in self.data.entry.sources.iter().enumerate() { + let Some(region_id) = descriptor.region_id else { + continue; + }; + + let transitions = self + .data + .entry + .subtask + .transition_map + .get(source_position) + .context(error::UnexpectedSnafu { + violated: format!( + "group {} transition map missing entry {}", + self.data.entry.group_id, source_position + ), + })?; + + let mut target_region_ids = Vec::with_capacity(transitions.len()); + for global_target_idx in transitions { + let target_position = + target_index_lookup + .get(global_target_idx) + .context(error::UnexpectedSnafu { + violated: format!( + "group {} transition references unknown target expr {}", + self.data.entry.group_id, global_target_idx + ), + })?; + + let target_descriptor = self.data.entry.targets.get(*target_position).context( + error::UnexpectedSnafu { + violated: format!( + "group {} missing target descriptor at {}", + self.data.entry.group_id, target_position + ), + }, + )?; + + let target_region_id = + target_descriptor + .region_id + .context(error::UnexpectedSnafu { + violated: format!( + "group {} target {} missing allocated region id", + self.data.entry.group_id, target_position + ), + })?; + + target_region_ids.push(target_region_id.as_u64()); + } + + sources.push(RemapManifestSource { + region_id: region_id.as_u64(), + target_region_ids, + }); + } + + let mut targets = Vec::with_capacity(self.data.entry.targets.len()); + for descriptor in &self.data.entry.targets { + let region_id = descriptor.region_id.context(error::UnexpectedSnafu { + violated: format!( + "group {} missing target region allocation", + self.data.entry.group_id + ), + })?; + + targets.push(RemapManifestTarget { + region_id: region_id.as_u64(), + partition_expr: descriptor + .partition_expr + .as_json_str() + .context(error::RepartitionSerializePartitionExprSnafu)?, + }); + } + + let request = RemapManifestRequest { + table_id: self.data.table_id as u64, + group_id: self.data.entry.group_id.to_string(), + sources, + targets, + }; + + info!( + "repartition group {:?}: scheduling manifest remap via {:?}", + self.data.entry.group_id, leader_peer.id + ); + + let response = match self + .context + .remap_manifests_on_datanode(&leader_peer, request) + .await + { + Ok(response) => response, + Err(err) => { + let err_msg = err.to_string(); + self.handle_manifest_failure(ManifestStatus::Failed, err_msg); + self.rollback_group(GroupState::Prepare).await?; + return Err(err); + } + }; + + self.data.note_remap_stats(&response); + + if response.affected_rows > 0 { + self.update_manifest_summary( + ManifestStatus::Staged, + response.affected_rows as u64, + None, + ); + } else { + self.data.reset_manifest_state(); + self.update_manifest_summary(ManifestStatus::Skipped, 0, None); + } + + Ok(()) + } + + /// Commit the table route change and publish staged manifests along with the + /// staged region rule version. + async fn on_confirm(&mut self) -> Result<()> { + info!( + "repartition group {:?}: confirming metadata update", + self.data.entry.group_id + ); + + self.ensure_targets_allocated()?; + + let region_info = self.ensure_region_info().await?.clone(); + let region_options = region_info.region_options.clone(); + let region_wal_options = region_info.region_wal_options.clone(); + + let payload = self + .route_metadata_payload() + .context(error::UnexpectedSnafu { + violated: format!( + "group {} metadata not prepared for confirmation", + self.data.entry.group_id + ), + })?; + let RouteMetadataPayload { + table_id, + original, + new_routes, + } = payload; + + match self + .context + .table_metadata_manager + .update_table_route( + table_id, + region_info, + original, + new_routes, + ®ion_options, + ®ion_wal_options, + ) + .await + .context(error::TableMetadataManagerSnafu) + { + Ok(()) => {} + Err(err) => { + self.rollback_group(GroupState::Prepare).await?; + return Err(err); + } + } + self.data.route_committed = true; + + info!( + "repartition group {:?}: table route updated", + self.data.entry.group_id + ); + + match self.apply_staged_manifests(true).await { + Ok(Some(response)) => { + let summary = self.data.note_manifest_application(true, Some(&response)); + self.context.record_manifest_summary(summary); + } + Ok(None) => { + debug!( + "repartition group {:?}: no staged manifests to publish", + self.data.entry.group_id + ); + let summary = self.data.note_manifest_application(true, None); + self.context.record_manifest_summary(summary); + } + Err(err) => { + let err_msg = err.to_string(); + self.handle_manifest_failure(ManifestStatus::Failed, err_msg); + self.rollback_group(GroupState::Prepare).await?; + return Err(err); + } + } + + self.refresh_route_snapshot().await?; + + if self.data.region_rule_updated { + if let Some(rule_version) = self.data.region_rule_version.clone() { + let publish_targets: Vec<_> = { + let prepare_result = self.prepare_result()?; + prepare_result + .source_routes + .iter() + .chain(prepare_result.target_routes.iter().flatten()) + .filter_map(|route| { + route + .leader_peer + .clone() + .map(|peer| (peer, route.region.id)) + }) + .collect() + }; + + for (peer, region_id) in publish_targets { + if let Err(err) = self + .context + .publish_region_rule_on_datanode(&peer, region_id, &rule_version) + .await + { + let err_msg = err.to_string(); + self.handle_manifest_failure(ManifestStatus::Failed, err_msg); + self.rollback_group(GroupState::Prepare).await?; + return Err(err); + } + } + } + + self.data.region_rule_updated = false; + } + self.data.route_committed = false; + Ok(()) + } + + /// Resume region IO, clear staging marks, and optionally publish region rules. + async fn on_cleanup(&mut self) -> Result<()> { + self.cleanup_resources(true).await + } + + /// Shared cleanup implementation used for the success path as well as the + /// rollback handler. + async fn cleanup_resources(&mut self, publish_rules: bool) -> Result<()> { + let leader_targets: Vec<_> = { + let prepare_result = self.prepare_result()?; + + for route in &prepare_result.source_routes { + self.resume_region(route).await?; + } + for route in prepare_result.target_routes.iter().flatten() { + self.resume_region(route).await?; + } + + prepare_result + .source_routes + .iter() + .chain(prepare_result.target_routes.iter().flatten()) + .filter_map(|route| { + route + .leader_peer + .clone() + .map(|peer| (peer, route.region.id)) + }) + .collect() + }; + + if self.data.manifests_generated { + match self.apply_staged_manifests(false).await { + Ok(Some(response)) => { + let summary = self.data.note_manifest_application(false, Some(&response)); + self.context.record_manifest_summary(summary); + } + Ok(None) => { + debug!( + "repartition group {:?}: no staged manifests to discard", + self.data.entry.group_id + ); + let summary = self.data.note_manifest_application(false, None); + self.context.record_manifest_summary(summary); + } + Err(err) => { + let err_msg = err.to_string(); + self.handle_manifest_failure(ManifestStatus::Failed, err_msg); + return Err(err); + } + } } if !self.data.staged_regions.is_empty() { @@ -294,37 +1054,154 @@ impl RepartitionGroupProcedure { for region_id in &self.data.staged_regions { route_manager .set_region_staging_state(*region_id, false) - .await?; + .await + .context(error::TableMetadataManagerSnafu)?; } - Self::clear_regions_staging(&mut self.data.route_snapshot, &self.data.staged_regions); + Self::clear_regions_staging( + &mut self.data.route_snapshot.region_routes, + &self.data.staged_regions, + ); self.data.staged_regions.clear(); self.data.metadata_updated = false; } + if self.data.region_rule_updated { + if let Some(rule_version) = self.data.region_rule_version.clone() { + if publish_rules { + for (peer, region_id) in &leader_targets { + self.context + .publish_region_rule_on_datanode(peer, *region_id, &rule_version) + .await?; + } + } else { + for (peer, region_id) in &leader_targets { + self.context + .clear_region_rule_stage_on_datanode(peer, *region_id) + .await?; + } + } + } + self.data.region_rule_updated = false; + self.data.region_rule_version = None; + } + self.data.freeze_completed = false; + self.data.route_committed = false; + + if publish_rules { + info!( + "repartition group {:?}: cleanup finished", + self.data.entry.group_id + ); + } else { + info!( + "repartition group {:?}: rollback cleanup finished", + self.data.entry.group_id + ); + } Ok(()) } - fn prepare_result(&self) -> MetaResult<&GroupPrepareResult> { + /// Borrow the cached prepare result or surface a friendly error if it has + /// not been initialised yet. + fn prepare_result(&self) -> Result<&GroupPrepareResult> { self.data.prepare_result.as_ref().ok_or_else(|| { - error::UnsupportedSnafu { - operation: format!( - "group {} is missing prepare context", - self.data.entry.group_id - ), + error::RepartitionMissingPrepareContextSnafu { + group_id: self.data.entry.group_id, } .build() }) } - async fn pause_region(&self, _route: &RegionRoute) -> MetaResult<()> { - // TODO: invoke datanode RPC to pause compaction and snapshot for the region. + /// Resolve the leader peer for the central region – a prerequisite for the + /// datanode-side RPCs we send throughout the workflow. + fn leader_peer_for_central(&self, prepare_result: &GroupPrepareResult) -> Result { + let central_region = prepare_result.central_region; + let leader_peer = prepare_result + .source_routes + .iter() + .find(|route| route.region.id == central_region) + .and_then(|route| route.leader_peer.clone()) + .or_else(|| { + self.data + .route_snapshot + .region_routes + .iter() + .find(|route| route.region.id == central_region) + .and_then(|route| route.leader_peer.clone()) + }); + + let Some(peer) = leader_peer else { + return error::RetryLaterSnafu { + reason: format!( + "group {} missing leader for central region {}", + self.data.entry.group_id, central_region + ), + } + .fail(); + }; + + Ok(peer) + } + + /// Ask the datanode to either publish or discard staged manifests for all + /// regions tracked as staging. + async fn apply_staged_manifests(&self, publish: bool) -> Result> { + if self.data.staged_regions.is_empty() { + return Ok(None); + } + + let prepare_result = self.prepare_result()?; + let leader_peer = self.leader_peer_for_central(prepare_result)?; + + let request = ApplyStagedManifestRequest { + table_id: self.data.table_id as u64, + group_id: self.data.entry.group_id.to_string(), + region_ids: self + .data + .staged_regions + .iter() + .map(|region_id| region_id.as_u64()) + .collect(), + publish, + }; + + let response = self + .context + .apply_staged_manifests_on_datanode(&leader_peer, request) + .await?; + + Ok(Some(response)) + } + + /// Pause the target region if it currently has a leader peer. + async fn pause_region(&self, _route: &RegionRoute) -> Result<()> { + if let Some(peer) = &_route.leader_peer { + self.context + .pause_region_on_datanode(peer, _route.region.id) + .await?; + } else { + debug!( + "repartition group {:?}: skip pause, region {} has no leader peer", + self.data.entry.group_id, _route.region.id + ); + } Ok(()) } - async fn resume_region(&self, _route: &RegionRoute) -> MetaResult<()> { - // TODO: invoke datanode RPC to resume compaction and snapshot for the region. + /// Resume the target region if it currently has a leader peer. + async fn resume_region(&self, _route: &RegionRoute) -> Result<()> { + if let Some(peer) = &_route.leader_peer { + self.context + .resume_region_on_datanode(peer, _route.region.id) + .await?; + } else { + debug!( + "repartition group {:?}: skip resume, region {} has no leader peer", + self.data.entry.group_id, _route.region.id + ); + } Ok(()) } @@ -354,6 +1231,150 @@ impl RepartitionGroupProcedure { } set.into_iter().collect() } + + fn apply_target_partition_rules(&mut self) -> Result<()> { + for target in &self.data.entry.targets { + if let Some(region_id) = target.region_id { + Self::apply_partition_rule( + &mut self.data.route_snapshot.region_routes, + region_id, + target, + )?; + } + } + Ok(()) + } + + async fn ensure_region_info(&mut self) -> Result<&RegionInfo> { + if self.data.region_info.is_none() { + let central_region = self.prepare_result()?.central_region; + let leader_peer_id = self + .data + .route_snapshot + .region_routes + .iter() + .find(|route| route.region.id == central_region) + .and_then(|route| route.leader_peer.as_ref()) + .map(|peer| peer.id) + .context(error::UnexpectedSnafu { + violated: format!( + "group {} has no leader peer for region {}", + self.data.entry.group_id, central_region + ), + })?; + + let datanode_table_value = self + .context + .table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey { + datanode_id: leader_peer_id, + table_id: self.data.table_id, + }) + .await + .context(error::TableMetadataManagerSnafu)?; + + let Some(datanode_table_value) = datanode_table_value else { + return error::RetryLaterSnafu { + reason: format!( + "group {} waiting for datanode {} metadata to propagate", + self.data.entry.group_id, leader_peer_id + ), + } + .fail(); + }; + + self.data.region_info = Some(datanode_table_value.region_info.clone()); + } + + Ok(self.data.region_info.as_ref().unwrap()) + } + + fn ensure_target_routes_present(&mut self) -> Result<()> { + let leader_candidate = self + .data + .entry + .sources + .iter() + .filter_map(|descriptor| descriptor.region_id) + .find_map(|region_id| { + self.data + .route_snapshot + .region_routes + .iter() + .find(|route| route.region.id == region_id) + .and_then(|route| route.leader_peer.clone()) + }); + + for descriptor in &self.data.entry.targets { + let Some(region_id) = descriptor.region_id else { + continue; + }; + + if self + .data + .route_snapshot + .region_routes + .iter() + .any(|route| route.region.id == region_id) + { + continue; + } + + let partition_expr = descriptor + .partition_expr + .as_json_str() + .context(error::RepartitionSerializePartitionExprSnafu)?; + + let region = Region { + id: region_id, + partition_expr, + ..Default::default() + }; + + self.data.route_snapshot.region_routes.push(RegionRoute { + region, + leader_peer: leader_candidate.clone(), + ..Default::default() + }); + } + Ok(()) + } + + fn ensure_targets_allocated(&self) -> Result<()> { + if let Some((idx, _)) = self + .data + .entry + .targets + .iter() + .enumerate() + .find(|(_, descriptor)| descriptor.region_id.is_none()) + { + return error::RepartitionMissingTargetRegionIdSnafu { + group_id: self.data.entry.group_id, + target_index: idx, + } + .fail(); + } + + Ok(()) + } + + fn apply_partition_rule( + routes: &mut [RegionRoute], + region_id: RegionId, + descriptor: &RegionDescriptor, + ) -> Result<()> { + let Some(route) = routes.iter_mut().find(|route| route.region.id == region_id) else { + return error::RepartitionTargetRegionRouteMissingSnafu { region_id }.fail(); + }; + + route.region.partition_expr = descriptor + .partition_expr + .as_json_str() + .context(error::RepartitionSerializePartitionExprSnafu)?; + Ok(()) + } } #[async_trait::async_trait] @@ -363,7 +1384,7 @@ impl Procedure for RepartitionGroupProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let status = self.step().await.map_err(map_to_procedure_error)?; + let status = self.step().await.map_err(super::map_repartition_error)?; Ok(status) } @@ -372,6 +1393,14 @@ impl Procedure for RepartitionGroupProcedure { } fn lock_key(&self) -> LockKey { - LockKey::default() + LockKey::new(vec![ + CatalogLock::Read(self.data.catalog_name.as_str()).into(), + SchemaLock::read( + self.data.catalog_name.as_str(), + self.data.schema_name.as_str(), + ) + .into(), + TableLock::Write(self.data.table_id).into(), + ]) } } diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs index 95de4adb4c..141ab6ea1c 100644 --- a/src/meta-srv/src/procedure/repartition/plan.rs +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -13,6 +13,7 @@ // limitations under the License. use common_meta::key::table_route::PhysicalTableRouteValue; +use partition::expr::PartitionExpr; use partition::subtask::RepartitionSubtask; use serde::{Deserialize, Serialize}; use store_api::storage::{RegionId, TableId}; @@ -28,20 +29,23 @@ pub type PlanGroupId = Uuid; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct RepartitionPlan { pub table_id: TableId, - pub plan_hash: String, pub entries: Vec, pub resource_demand: ResourceDemand, pub route_snapshot: PhysicalTableRouteValue, } impl RepartitionPlan { - pub fn empty(table_id: TableId) -> Self { + pub fn new( + table_id: TableId, + entries: Vec, + resource_demand: ResourceDemand, + route_snapshot: PhysicalTableRouteValue, + ) -> Self { Self { table_id, - plan_hash: String::new(), - entries: Vec::new(), - resource_demand: ResourceDemand::default(), - route_snapshot: PhysicalTableRouteValue::default(), + entries, + resource_demand, + route_snapshot, } } } @@ -55,6 +59,8 @@ pub struct PlanEntry { } impl PlanEntry { + /// Construct a plan entry consisting of the connected component returned by + /// the planner. pub fn new( group_id: PlanGroupId, subtask: RepartitionSubtask, @@ -68,33 +74,22 @@ impl PlanEntry { targets, } } - - pub fn implied_new_regions(&self) -> u32 { - self.targets - .iter() - .filter(|target| target.region_id.is_none()) - .count() as u32 - } } +/// Metadata describing a region involved in the plan. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RegionDescriptor { pub region_id: Option, - pub partition_expr_json: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] -pub struct PartitionRuleDiff { - pub entries: Vec, + pub partition_expr: PartitionExpr, } +/// Auxiliary information about resources required to execute the plan. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] -pub struct ResourceDemand { - pub new_regions: u32, -} +pub struct ResourceDemand {} impl ResourceDemand { - pub fn add_entry(&mut self, entry: &PlanEntry) { - self.new_regions = self.new_regions.saturating_add(entry.implied_new_regions()); + pub fn from_plan_entries(_entries: &[PlanEntry]) -> Self { + // placeholder + Self {} } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 344d35eaa0..4268e501c9 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -179,6 +179,30 @@ impl RegionRequest { reason: "ListMetadata request should be handled separately by RegionServer", } .fail(), + region_request::Body::Pause(_) => UnexpectedSnafu { + reason: "Pause request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::Resume(_) => UnexpectedSnafu { + reason: "Resume request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::StageRegionRule(_) => UnexpectedSnafu { + reason: "StageRegionRule request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::PublishRegionRule(_) => UnexpectedSnafu { + reason: "PublishRegionRule request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::RemapManifest(_) => UnexpectedSnafu { + reason: "RemapManifest request should be handled separately by RegionServer", + } + .fail(), + region_request::Body::ApplyStagedManifest(_) => UnexpectedSnafu { + reason: "ApplyStagedManifest request should be handled separately by RegionServer", + } + .fail(), } }