diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index eadb7cdc75..51d2b4d37b 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -24,7 +24,6 @@ use common_base::Plugins; use common_config::Configurable; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use common_meta::distributed_time_constants::META_LEASE_SECS; -use common_meta::election::CANDIDATE_LEASE_SECS; use common_meta::election::etcd::EtcdElection; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; @@ -290,6 +289,7 @@ pub async fn metasrv_builder( use std::time::Duration; use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS; + use common_meta::election::CANDIDATE_LEASE_SECS; use common_meta::election::rds::postgres::{ElectionPgClient, PgElection}; use common_meta::kv_backend::rds::PgStore; use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod}; @@ -354,6 +354,7 @@ pub async fn metasrv_builder( (None, BackendImpl::MysqlStore) => { use std::time::Duration; + use common_meta::election::CANDIDATE_LEASE_SECS; use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection}; use common_meta::kv_backend::rds::MySqlStore; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 7b7983b1ba..a0f800f981 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -1136,6 +1136,12 @@ impl Error { Error::RetryLater { .. } | Error::RetryLaterWithSource { .. } | Error::MailboxTimeout { .. } + ) || matches!( + self, + Error::AllocateRegions { source, .. } if source.is_retry_later() + ) || matches!( + self, + Error::DeallocateRegions { source, .. } if source.is_retry_later() ) } } @@ -1324,3 +1330,35 @@ pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io: err = err.source()?; } } + +#[cfg(test)] +mod tests { + use common_error::mock::MockError; + use common_error::status_code::StatusCode; + use snafu::ResultExt; + + use super::DeallocateRegionsSnafu; + + #[test] + fn test_deallocate_regions_is_retryable_when_source_is_retry_later() { + let source = common_meta::error::Error::retry_later(MockError::new(StatusCode::Internal)); + let err = Err::<(), _>(source) + .context(DeallocateRegionsSnafu { table_id: 1024_u32 }) + .unwrap_err(); + + assert!(err.is_retryable()); + } + + #[test] + fn test_deallocate_regions_is_not_retryable_when_source_is_not_retry_later() { + let source = common_meta::error::UnexpectedSnafu { + err_msg: "mock error", + } + .build(); + let err = Err::<(), _>(source) + .context(DeallocateRegionsSnafu { table_id: 1024_u32 }) + .unwrap_err(); + + assert!(!err.is_retryable()); + } +} diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index 37c7745ae5..db8bfeadc5 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -23,7 +23,7 @@ pub mod repartition_start; pub mod utils; use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display}; use std::time::{Duration, Instant}; @@ -40,15 +40,15 @@ use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use common_meta::node_manager::NodeManagerRef; -use common_meta::region_keeper::MemoryRegionKeeperRef; +use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_meta::region_registry::LeaderRegionRegistryRef; -use common_meta::rpc::router::RegionRoute; +use common_meta::rpc::router::{RegionRoute, operating_leader_regions}; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata, }; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -56,6 +56,8 @@ use store_api::storage::{RegionNumber, TableId}; use table::table_name::TableName; use crate::error::{self, Result}; +use crate::procedure::repartition::collect::ProcedureMeta; +use crate::procedure::repartition::deallocate_region::DeallocateRegion; use crate::procedure::repartition::group::{ Context as RepartitionGroupContext, RepartitionGroupProcedure, }; @@ -74,6 +76,12 @@ pub struct PersistentContext { pub table_name: String, pub table_id: TableId, pub plans: Vec, + /// Records failed sub-procedures for metadata rollback. + #[serde(default)] + pub failed_procedures: Vec, + #[serde(default)] + /// Records unknown sub-procedures for metadata rollback. + pub unknown_procedures: Vec, /// The timeout for repartition operations. #[serde(with = "humantime_serde", default = "default_timeout")] pub timeout: Duration, @@ -102,6 +110,8 @@ impl PersistentContext { table_name, table_id, plans: vec![], + failed_procedures: vec![], + unknown_procedures: vec![], timeout: timeout.unwrap_or_else(default_timeout), } } @@ -393,6 +403,23 @@ impl Context { .await; Ok(()) } + + pub fn register_operating_regions( + memory_region_keeper: &MemoryRegionKeeperRef, + region_routes: &[RegionRoute], + ) -> Result> { + let mut operating_guards = Vec::with_capacity(region_routes.len()); + for (region_id, datanode_id) in operating_leader_regions(region_routes) { + let guard = memory_region_keeper + .register(datanode_id, region_id) + .context(error::RegionOperatingRaceSnafu { + peer_id: datanode_id, + region_id, + })?; + operating_guards.push(guard); + } + Ok(operating_guards) + } } #[async_trait::async_trait] @@ -456,6 +483,131 @@ impl RepartitionProcedure { Ok(Self { state, context }) } + + /// Returns whether parent rollback should remove this repartition's allocated regions. + /// + /// This uses an "after AllocateRegion" semantic: once execution reaches + /// `AllocateRegion` or any later state, rollback must try to remove this round's + /// `allocated_region_ids` from table-route metadata when they exist. + /// + /// State flow: + /// `RepartitionStart -> AllocateRegion -> Dispatch -> Collect -> DeallocateRegion -> RepartitionEnd` + /// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + /// rollback allocated regions in metadata + /// + /// Notes: + /// - `RepartitionStart`: no-op, because allocation has not happened yet. + /// - `AllocateRegion` / `Dispatch` / `Collect` rollback-active. + /// - `DeallocateRegion`: is not rollback-active. + /// - `RepartitionEnd`: no-op. + fn should_rollback_allocated_regions(&self) -> bool { + self.state.as_any().is::() + || self.state.as_any().is::() + || self.state.as_any().is::() + } + + fn rollback_allocated_region_ids(&self) -> HashSet { + if self.state.as_any().is::() + || self.state.as_any().is::() + { + return self + .context + .persistent_ctx + .plans + .iter() + .flat_map(|plan| plan.allocated_region_ids.iter().copied()) + .collect(); + } + + self.context + .persistent_ctx + .failed_procedures + .iter() + .chain(self.context.persistent_ctx.unknown_procedures.iter()) + .flat_map(|procedure_meta| { + let plan_index = procedure_meta.plan_index; + self.context.persistent_ctx.plans[plan_index] + .allocated_region_ids + .iter() + .copied() + }) + .collect() + } + + fn filter_allocated_region_routes( + region_routes: &[RegionRoute], + allocated_region_ids: &HashSet, + ) -> Vec { + region_routes + .iter() + .filter(|route| !allocated_region_ids.contains(&route.region.id)) + .cloned() + .collect() + } + + async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> { + if !self.should_rollback_allocated_regions() { + return Ok(()); + } + + let table_id = self.context.persistent_ctx.table_id; + let allocated_region_ids = self.rollback_allocated_region_ids(); + if allocated_region_ids.is_empty() { + return Ok(()); + } + + let table_lock = TableLock::Write(table_id).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; + let table_route_value = self.context.get_table_route_value().await?; + let current_region_routes = table_route_value.region_routes().unwrap(); + let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes( + table_id, + current_region_routes, + &allocated_region_ids, + ); + if !allocated_region_routes.is_empty() { + let table = TableName { + catalog_name: self.context.persistent_ctx.catalog_name.clone(), + schema_name: self.context.persistent_ctx.schema_name.clone(), + table_name: self.context.persistent_ctx.table_name.clone(), + }; + // Memory guards are not required here, + // because the table metadata still contains routes for the deallocating regions. + if let Err(err) = DeallocateRegion::deallocate_regions( + &self.context.node_manager, + &self.context.leader_region_registry, + table, + table_id, + &allocated_region_routes, + ) + .await + { + warn!(err; "Failed to drop allocated regions during repartition rollback, table_id: {}, regions: {:?}", table_id, allocated_region_ids); + } + } + + let new_region_routes = + Self::filter_allocated_region_routes(current_region_routes, &allocated_region_ids); + + if new_region_routes.len() != current_region_routes.len() { + self.context + .update_table_route(&table_route_value, new_region_routes, HashMap::new()) + .await + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!( + "Failed to rollback allocated region routes for repartition table: {}", + table_id + ), + })?; + } + + if let Err(err) = self.context.invalidate_table_cache().await { + warn!(err; "Failed to invalidate table cache during repartition rollback, table_id: {}", table_id); + } + + Ok(()) + } } #[async_trait::async_trait] @@ -497,9 +649,14 @@ impl Procedure for RepartitionProcedure { } } + async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> { + self.rollback_inner(ctx) + .await + .map_err(ProcedureError::external) + } + fn rollback_supported(&self) -> bool { - // TODO(weny): support rollback. - false + true } fn dump(&self) -> ProcedureResult { @@ -624,3 +781,642 @@ impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + + use common_error::ext::BoxedError; + use common_error::mock::MockError; + use common_error::status_code::StatusCode; + use common_meta::ddl::test_util::datanode_handler::{ + DatanodeWatcher, NaiveDatanodeHandler, UnexpectedErrorDatanodeHandler, + }; + use common_meta::error; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::test_util::MockDatanodeManager; + use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState}; + use store_api::storage::RegionId; + use table::table_name::TableName; + use tokio::sync::mpsc; + use uuid::Uuid; + + use super::*; + use crate::procedure::repartition::allocate_region::AllocateRegion; + use crate::procedure::repartition::collect::Collect; + use crate::procedure::repartition::deallocate_region::DeallocateRegion; + use crate::procedure::repartition::dispatch::Dispatch; + use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::repartition_end::RepartitionEnd; + use crate::procedure::repartition::test_util::{ + TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids, + new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr, + test_region_route, test_region_wal_options, + }; + + fn test_plan(table_id: TableId) -> RepartitionPlanEntry { + RepartitionPlanEntry { + group_id: uuid::Uuid::new_v4(), + source_regions: vec![RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 100), + }], + target_regions: vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 50), + }, + RegionDescriptor { + region_id: RegionId::new(table_id, 3), + partition_expr: range_expr("x", 50, 100), + }, + ], + allocated_region_ids: vec![RegionId::new(table_id, 3)], + pending_deallocate_region_ids: vec![], + transition_map: vec![vec![0, 1]], + } + } + + fn test_procedure(state: Box, context: Context) -> RepartitionProcedure { + RepartitionProcedure { state, context } + } + + fn test_context(env: &TestingEnv, table_id: TableId) -> Context { + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_ctx = env.ddl_context(node_manager); + let persistent_ctx = PersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + + Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ) + } + + #[test] + fn test_filter_allocated_region_routes() { + let table_id = 1024; + let region_routes = vec![ + test_region_route(RegionId::new(table_id, 1), "a"), + test_region_route(RegionId::new(table_id, 2), "b"), + ]; + let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]); + + let new_region_routes = RepartitionProcedure::filter_allocated_region_routes( + ®ion_routes, + &allocated_region_ids, + ); + + assert_eq!(new_region_routes.len(), 1); + assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1)); + } + + #[test] + fn test_should_rollback_allocated_regions() { + let env = TestingEnv::new(); + let table_id = 1024; + + let procedure = test_procedure( + Box::new(RepartitionStart::new(vec![], vec![])), + test_context(&env, table_id), + ); + assert!(!procedure.should_rollback_allocated_regions()); + + let procedure = test_procedure( + Box::new(AllocateRegion::new(vec![])), + test_context(&env, table_id), + ); + assert!(procedure.should_rollback_allocated_regions()); + + let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id)); + assert!(procedure.should_rollback_allocated_regions()); + + let procedure = + test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id)); + assert!(procedure.should_rollback_allocated_regions()); + + let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id)); + assert!(!procedure.should_rollback_allocated_regions()); + + let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id)); + assert!(!procedure.should_rollback_allocated_regions()); + } + + #[tokio::test] + async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_ctx = env.ddl_context(node_manager); + let original_region_routes = vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 50, 100).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + ]; + env.create_physical_table_metadata_with_wal_options( + table_id, + original_region_routes, + test_region_wal_options(&[1, 2]), + ) + .await; + + let mut persistent_ctx = PersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + persistent_ctx.plans = vec![test_plan(table_id)]; + persistent_ctx.failed_procedures = vec![ProcedureMeta { + plan_index: 0, + group_id: Uuid::new_v4(), + procedure_id: ProcedureId::random(), + }]; + let context = Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let mut procedure = RepartitionProcedure { + state: Box::new(Dispatch), + context, + }; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + let region_routes = current_parent_region_routes(&procedure.context).await; + assert_eq!(region_routes.len(), 2); + assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1)); + assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2)); + } + + #[tokio::test] + async fn test_repartition_rollback_removes_allocated_routes_from_allocate() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_ctx = env.ddl_context(node_manager); + let original_region_routes = vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 50, 100).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + ]; + env.create_physical_table_metadata_with_wal_options( + table_id, + original_region_routes, + test_region_wal_options(&[1, 2]), + ) + .await; + + let mut persistent_ctx = PersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + persistent_ctx.plans = vec![test_plan(table_id)]; + let context = Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let mut procedure = RepartitionProcedure { + state: Box::new(AllocateRegion::new(vec![])), + context, + }; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + let region_routes = current_parent_region_routes(&procedure.context).await; + assert_eq!(region_routes.len(), 2); + assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1)); + assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2)); + } + + #[tokio::test] + async fn test_repartition_rollback_from_collect_only_removes_failed_allocated_routes() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_ctx = env.ddl_context(node_manager); + let original_region_routes = vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + test_region_route(RegionId::new(table_id, 4), ""), + ]; + env.create_physical_table_metadata_with_wal_options( + table_id, + original_region_routes, + test_region_wal_options(&[1, 2, 3, 4]), + ) + .await; + + let mut persistent_ctx = PersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + let failed_plan = test_plan(table_id); + let succeeded_plan = RepartitionPlanEntry { + group_id: Uuid::new_v4(), + source_regions: vec![RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 100, 200), + }], + target_regions: vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 100, 150), + }, + RegionDescriptor { + region_id: RegionId::new(table_id, 4), + partition_expr: range_expr("x", 150, 200), + }, + ], + allocated_region_ids: vec![RegionId::new(table_id, 4)], + pending_deallocate_region_ids: vec![], + transition_map: vec![vec![0]], + }; + persistent_ctx.plans = vec![failed_plan, succeeded_plan]; + persistent_ctx.failed_procedures = vec![ProcedureMeta { + plan_index: 0, + group_id: persistent_ctx.plans[0].group_id, + procedure_id: ProcedureId::random(), + }]; + + let context = Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let mut procedure = RepartitionProcedure { + state: Box::new(Collect::new(vec![])), + context, + }; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + let region_routes = current_parent_region_routes(&procedure.context).await; + assert_eq!(region_routes.len(), 3); + assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1)); + assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2)); + assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4)); + } + + #[tokio::test] + async fn test_repartition_rollback_is_idempotent() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_ctx = env.ddl_context(node_manager); + let original_region_routes = vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 50, 100).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + ]; + env.create_physical_table_metadata_with_wal_options( + table_id, + original_region_routes, + test_region_wal_options(&[1, 2]), + ) + .await; + + let mut persistent_ctx = PersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + persistent_ctx.plans = vec![test_plan(table_id)]; + persistent_ctx.failed_procedures = vec![ProcedureMeta { + plan_index: 0, + group_id: Uuid::new_v4(), + procedure_id: ProcedureId::random(), + }]; + let context = Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let mut procedure = RepartitionProcedure { + state: Box::new(Dispatch), + context, + }; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + let once = current_parent_region_routes(&procedure.context).await; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + let twice = current_parent_region_routes(&procedure.context).await; + + assert_eq!(once, twice); + assert_eq!(once.len(), 2); + assert_eq!(once[0].region.id, RegionId::new(table_id, 1)); + assert_eq!(once[1].region.id, RegionId::new(table_id, 2)); + } + + #[tokio::test] + async fn test_repartition_procedure_flow_split_failed_and_full_rollback() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + + env.create_physical_table_metadata_for_repartition( + table_id, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + ], + test_region_wal_options(&[1, 2]), + ) + .await; + + let context = new_parent_context(&env, node_manager, table_id); + let mut procedure = RepartitionProcedure::new( + vec![range_expr("x", 0, 100)], + vec![range_expr("x", 0, 50), range_expr("x", 50, 100)], + context, + ); + + let start_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(!start_status.need_persist()); + let start_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(start_status.need_persist()); + assert_parent_state::(&procedure); + + let allocate_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(allocate_status.need_persist()); + assert_parent_state::(&procedure); + assert_eq!(procedure.context.persistent_ctx.plans.len(), 1); + let plan = &procedure.context.persistent_ctx.plans[0]; + let expected_plan = test_plan(table_id); + assert_eq!(plan.source_regions, expected_plan.source_regions); + assert_eq!(plan.target_regions, expected_plan.target_regions); + assert_eq!( + plan.allocated_region_ids, + expected_plan.allocated_region_ids + ); + assert_eq!( + plan.pending_deallocate_region_ids, + expected_plan.pending_deallocate_region_ids + ); + assert_eq!(plan.transition_map, expected_plan.transition_map); + assert_eq!( + current_parent_region_routes(&procedure.context).await, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + RegionRoute { + region: Region { + id: RegionId::new(table_id, 3), + partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(0)), + ..Default::default() + }, + ] + ); + + let dispatch_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(!dispatch_status.need_persist()); + let subprocedure_ids = extract_subprocedure_ids(dispatch_status); + assert_eq!(subprocedure_ids.len(), 1); + assert_parent_state::(&procedure); + + let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external( + MockError::new(StatusCode::Internal), + ))); + let collect_ctx = procedure_context_with_receivers(HashMap::from([( + subprocedure_ids[0], + procedure_state_receiver(failed_state), + )])); + + let err = procedure.execute(&collect_ctx).await.unwrap_err(); + assert!(!err.is_retry_later()); + assert_parent_state::(&procedure); + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + let region_routes = current_parent_region_routes(&procedure.context).await; + assert_eq!( + region_routes, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + ] + ); + } + + #[tokio::test] + async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() { + common_telemetry::init_default_ut_logging(); + let env = TestingEnv::new(); + let table_id = 1024; + let (tx, _rx) = mpsc::channel(8); + let should_retry = Arc::new(AtomicBool::new(true)); + let datanode_handler = DatanodeWatcher::new(tx).with_handler(move |_, _| { + if should_retry.swap(false, Ordering::SeqCst) { + return Err(error::Error::RetryLater { + source: BoxedError::new( + error::UnexpectedSnafu { + err_msg: "retry later", + } + .build(), + ), + clean_poisons: false, + }); + } + + Ok(api::region::RegionResponse::new(0)) + }); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + + env.create_physical_table_metadata_for_repartition( + table_id, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + ], + test_region_wal_options(&[1, 2]), + ) + .await; + + let context = new_parent_context(&env, node_manager, table_id); + let mut procedure = RepartitionProcedure::new( + vec![range_expr("x", 0, 100)], + vec![range_expr("x", 0, 50), range_expr("x", 50, 100)], + context, + ); + + let start_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(!start_status.need_persist()); + let start_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(start_status.need_persist()); + assert_parent_state::(&procedure); + + let err = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap_err(); + assert!(err.is_retry_later()); + assert_parent_state::(&procedure); + assert!(!procedure.context.persistent_ctx.plans.is_empty()); + assert_eq!( + current_parent_region_routes(&procedure.context).await, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + ] + ); + + let allocate_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(allocate_status.need_persist()); + assert_parent_state::(&procedure); + + assert_eq!(procedure.context.persistent_ctx.plans.len(), 1); + let plan = &procedure.context.persistent_ctx.plans[0]; + let expected_plan = test_plan(table_id); + assert_eq!(plan.source_regions, expected_plan.source_regions); + assert_eq!(plan.target_regions, expected_plan.target_regions); + assert_eq!( + plan.allocated_region_ids, + expected_plan.allocated_region_ids + ); + assert_eq!(plan.transition_map, expected_plan.transition_map); + assert_eq!( + current_parent_region_routes(&procedure.context).await, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + RegionRoute { + region: Region { + id: RegionId::new(table_id, 3), + partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(0)), + ..Default::default() + }, + ] + ); + + let dispatch_status = procedure + .execute(&TestingEnv::procedure_context()) + .await + .unwrap(); + assert!(!dispatch_status.need_persist()); + let subprocedure_ids = extract_subprocedure_ids(dispatch_status); + assert_eq!(subprocedure_ids.len(), 1); + assert_parent_state::(&procedure); + } +} diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index b1bf93d986..12ffac9918 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -21,12 +21,11 @@ use common_meta::ddl::create_table::template::{ }; use common_meta::lock_key::TableLock; use common_meta::node_manager::NodeManagerRef; -use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; -use common_meta::rpc::router::{RegionRoute, operating_leader_regions}; +use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; -use common_telemetry::info; -use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use common_telemetry::{debug, info}; +use serde::{Deserialize, Deserializer, Serialize}; +use snafu::ResultExt; use store_api::storage::{RegionNumber, TableId}; use table::metadata::TableInfo; use table::table_reference::TableReference; @@ -40,14 +39,103 @@ use crate::procedure::repartition::plan::{ }; use crate::procedure::repartition::{Context, State}; +#[derive(Debug, Clone, Serialize)] +pub enum AllocateRegion { + Build(BuildPlan), + Execute(ExecutePlan), +} + +impl<'de> Deserialize<'de> for AllocateRegion { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + enum CurrentAllocateRegion { + Build(BuildPlan), + Execute(ExecutePlan), + } + + #[derive(Deserialize)] + struct LegacyAllocateRegion { + plan_entries: Vec, + } + + #[derive(Deserialize)] + #[serde(untagged)] + enum AllocateRegionRepr { + Current(CurrentAllocateRegion), + Legacy(LegacyAllocateRegion), + } + + match AllocateRegionRepr::deserialize(deserializer)? { + AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => { + Ok(Self::Build(build_plan)) + } + AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => { + Ok(Self::Execute(execute_plan)) + } + AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan { + plan_entries: legacy.plan_entries, + })), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AllocateRegion { +pub struct BuildPlan { plan_entries: Vec, } -#[async_trait::async_trait] -#[typetag::serde] -impl State for AllocateRegion { +impl BuildPlan { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let timer = Instant::now(); + let table_id = ctx.persistent_ctx.table_id; + let table_route_value = ctx.get_table_route_value().await?; + let mut next_region_number = + AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap()); + + // Converts allocation plan to repartition plan. + let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans( + table_id, + &mut next_region_number, + &self.plan_entries, + ); + let plan_count = repartition_plan_entries.len(); + let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries); + info!( + "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}", + table_id, plan_count, to_allocate + ); + + // If no region to allocate, directly dispatch the plan. + if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 { + ctx.persistent_ctx.plans = repartition_plan_entries; + ctx.update_allocate_region_elapsed(timer.elapsed()); + return Ok((Box::new(Dispatch), Status::executing(true))); + } + + ctx.persistent_ctx.plans = repartition_plan_entries; + debug!( + "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}", + table_id, + timer.elapsed() + ); + Ok(( + Box::new(AllocateRegion::Execute(ExecutePlan)), + Status::executing(true), + )) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExecutePlan; + +impl ExecutePlan { async fn next( &mut self, ctx: &mut Context, @@ -55,36 +143,13 @@ impl State for AllocateRegion { ) -> Result<(Box, Status)> { let timer = Instant::now(); let table_id = ctx.persistent_ctx.table_id; + let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans); + let region_number_and_partition_exprs = + AllocateRegion::prepare_region_allocation_data(&allocate_regions)?; + let table_info_value = ctx.get_table_info_value().await?; let table_route_value = ctx.get_table_route_value().await?; // Safety: it is physical table route value. let region_routes = table_route_value.region_routes().unwrap(); - let mut next_region_number = - Self::get_next_region_number(table_route_value.max_region_number().unwrap()); - - // Converts allocation plan to repartition plan. - let repartition_plan_entries = Self::convert_to_repartition_plans( - table_id, - &mut next_region_number, - &self.plan_entries, - ); - let plan_count = repartition_plan_entries.len(); - let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries); - info!( - "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}", - table_id, plan_count, to_allocate - ); - - // If no region to allocate, directly dispatch the plan. - if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 { - ctx.persistent_ctx.plans = repartition_plan_entries; - ctx.update_allocate_region_elapsed(timer.elapsed()); - return Ok((Box::new(Dispatch), Status::executing(true))); - } - - let allocate_regions = Self::collect_allocate_regions(&repartition_plan_entries); - let region_number_and_partition_exprs = - Self::prepare_region_allocation_data(&allocate_regions)?; - let table_info_value = ctx.get_table_info_value().await?; let new_allocated_region_routes = ctx .region_routes_allocator .allocate( @@ -122,12 +187,13 @@ impl State for AllocateRegion { table_id, new_region_count, new_regions_brief ); - let _operating_guards = Self::register_operating_regions( + // The table route metadata is not updated yet; register it in memory for region lease renewal. + let _operating_guards = Context::register_operating_regions( &ctx.memory_region_keeper, &new_allocated_region_routes, )?; // Allocates the regions on datanodes. - Self::allocate_regions( + AllocateRegion::allocate_regions( &ctx.node_manager, &table_info_value.table_info, &new_allocated_region_routes, @@ -135,21 +201,33 @@ impl State for AllocateRegion { ) .await?; - // TODO(weny): for metric engine, sync logical regions from the the central region. - // Updates the table routes. let table_lock = TableLock::Write(table_id).into(); let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; let new_region_routes = - Self::generate_region_routes(region_routes, &new_allocated_region_routes); + AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes); ctx.update_table_route(&table_route_value, new_region_routes, wal_options) .await?; ctx.invalidate_table_cache().await?; - ctx.persistent_ctx.plans = repartition_plan_entries; ctx.update_allocate_region_elapsed(timer.elapsed()); Ok((Box::new(Dispatch), Status::executing(true))) } +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for AllocateRegion { + async fn next( + &mut self, + ctx: &mut Context, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + match self { + AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await, + AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await, + } + } fn as_any(&self) -> &dyn Any { self @@ -158,24 +236,7 @@ impl State for AllocateRegion { impl AllocateRegion { pub fn new(plan_entries: Vec) -> Self { - Self { plan_entries } - } - - fn register_operating_regions( - memory_region_keeper: &MemoryRegionKeeperRef, - region_routes: &[RegionRoute], - ) -> Result> { - let mut operating_guards = Vec::with_capacity(region_routes.len()); - for (region_id, datanode_id) in operating_leader_regions(region_routes) { - let guard = memory_region_keeper - .register(datanode_id, region_id) - .context(error::RegionOperatingRaceSnafu { - peer_id: datanode_id, - region_id, - })?; - operating_guards.push(guard); - } - Ok(operating_guards) + AllocateRegion::Build(BuildPlan { plan_entries }) } fn generate_region_routes( @@ -300,6 +361,7 @@ mod tests { use uuid::Uuid; use super::*; + use crate::procedure::repartition::State; use crate::procedure::repartition::test_util::range_expr; fn create_region_descriptor( @@ -488,4 +550,71 @@ mod tests { assert!(!result[0].1.is_empty()); assert!(!result[1].1.is_empty()); } + + #[test] + fn test_allocate_region_state_backward_compatibility() { + // Arrange + let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#; + + // Act + let state: Box = serde_json::from_str(serialized).unwrap(); + + // Assert + let allocate_region = state + .as_any() + .downcast_ref::() + .expect("expected AllocateRegion state"); + match allocate_region { + AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()), + AllocateRegion::Execute(_) => panic!("expected build plan"), + } + } + + #[test] + fn test_allocate_region_state_round_trip() { + // Arrange + let state: Box = Box::new(AllocateRegion::new(vec![])); + + // Act + let serialized = serde_json::to_string(&state).unwrap(); + let deserialized: Box = serde_json::from_str(&serialized).unwrap(); + + // Assert + assert_eq!( + serialized, + r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"# + ); + let allocate_region = deserialized + .as_any() + .downcast_ref::() + .expect("expected AllocateRegion state"); + match allocate_region { + AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()), + AllocateRegion::Execute(_) => panic!("expected build plan"), + } + } + + #[test] + fn test_allocate_region_execute_state_round_trip() { + // Arrange + let state: Box = Box::new(AllocateRegion::Execute(ExecutePlan)); + + // Act + let serialized = serde_json::to_string(&state).unwrap(); + let deserialized: Box = serde_json::from_str(&serialized).unwrap(); + + // Assert + assert_eq!( + serialized, + r#"{"repartition_state":"AllocateRegion","Execute":null}"# + ); + let allocate_region = deserialized + .as_any() + .downcast_ref::() + .expect("expected AllocateRegion state"); + match allocate_region { + AllocateRegion::Execute(_) => {} + AllocateRegion::Build(_) => panic!("expected execute plan"), + } + } } diff --git a/src/meta-srv/src/procedure/repartition/collect.rs b/src/meta-srv/src/procedure/repartition/collect.rs index d413158b94..1a6d0c6257 100644 --- a/src/meta-srv/src/procedure/repartition/collect.rs +++ b/src/meta-srv/src/procedure/repartition/collect.rs @@ -94,17 +94,28 @@ impl State for Collect { } } - let inflight = self.inflight_procedures.len(); let succeeded = self.succeeded_procedures.len(); let failed = self.failed_procedures.len(); let unknown = self.unknown_procedures.len(); info!( - "Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}", - table_id, inflight, succeeded, failed, unknown + "Collected repartition group results for table_id: {}, succeeded: {}, failed: {}, unknown: {}", + table_id, succeeded, failed, unknown ); if failed > 0 || unknown > 0 { - // TODO(weny): retry the failed or unknown procedures. + ctx.persistent_ctx + .failed_procedures + .extend(self.failed_procedures.iter()); + ctx.persistent_ctx + .unknown_procedures + .extend(self.unknown_procedures.iter()); + return crate::error::UnexpectedSnafu { + violated: format!( + "Repartition groups failed or became unknown, table_id: {}, failed: {}, unknown: {}", + table_id, failed, unknown + ), + } + .fail(); } if let Some(start_time) = ctx.volatile_ctx.dispatch_start_time.take() { @@ -118,3 +129,139 @@ impl State for Collect { self } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_error::mock::MockError; + use common_error::status_code::StatusCode; + use common_meta::test_util::MockDatanodeManager; + use common_procedure::{ + Context as ProcedureContext, ContextProvider, Error as ProcedureError, ProcedureId, + ProcedureState, + }; + use common_procedure_test::MockContextProvider; + use tokio::sync::watch; + + use super::*; + use crate::procedure::repartition::PersistentContext; + use crate::procedure::repartition::test_util::TestingEnv; + + struct FailedProcedureContextProvider { + receiver: watch::Receiver, + inner: MockContextProvider, + } + + #[async_trait::async_trait] + impl ContextProvider for FailedProcedureContextProvider { + async fn procedure_state( + &self, + procedure_id: ProcedureId, + ) -> common_procedure::Result> { + self.inner.procedure_state(procedure_id).await + } + + async fn procedure_state_receiver( + &self, + _procedure_id: ProcedureId, + ) -> common_procedure::Result>> { + Ok(Some(self.receiver.clone())) + } + + async fn try_put_poison( + &self, + key: &common_procedure::PoisonKey, + procedure_id: ProcedureId, + ) -> common_procedure::Result<()> { + self.inner.try_put_poison(key, procedure_id).await + } + + async fn acquire_lock( + &self, + key: &common_procedure::StringKey, + ) -> common_procedure::local::DynamicKeyLockGuard { + self.inner.acquire_lock(key).await + } + } + + #[tokio::test] + async fn test_collect_returns_error_when_unknown_exists() { + let env = TestingEnv::new(); + let ddl_ctx = env.ddl_context(Arc::new(MockDatanodeManager::new(()))); + let persistent_ctx = PersistentContext::new( + table::table_name::TableName::new("test_catalog", "test_schema", "test_table"), + 1024, + None, + ); + let mut ctx = crate::procedure::repartition::Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let mut state = Collect { + inflight_procedures: vec![], + succeeded_procedures: vec![], + failed_procedures: vec![], + unknown_procedures: vec![ProcedureMeta { + plan_index: 0, + group_id: uuid::Uuid::new_v4(), + procedure_id: common_procedure::ProcedureId::random(), + }], + }; + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_collect_returns_error_when_failed_exists() { + let env = TestingEnv::new(); + let ddl_ctx = env.ddl_context(Arc::new(MockDatanodeManager::new(()))); + let persistent_ctx = PersistentContext::new( + table::table_name::TableName::new("test_catalog", "test_schema", "test_table"), + 1024, + None, + ); + let mut ctx = crate::procedure::repartition::Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let procedure_id = common_procedure::ProcedureId::random(); + let (tx, rx) = watch::channel(ProcedureState::Running); + tx.send(ProcedureState::failed(Arc::new(ProcedureError::external( + MockError::new(StatusCode::Internal), + )))) + .unwrap(); + let procedure_ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(FailedProcedureContextProvider { + receiver: rx, + inner: MockContextProvider::default(), + }), + }; + let mut state = Collect { + inflight_procedures: vec![ProcedureMeta { + plan_index: 0, + group_id: uuid::Uuid::new_v4(), + procedure_id, + }], + succeeded_procedures: vec![], + failed_procedures: vec![], + unknown_procedures: vec![], + }; + + let err = state.next(&mut ctx, &procedure_ctx).await.unwrap_err(); + + assert_eq!(state.failed_procedures.len(), 1); + assert_eq!(state.unknown_procedures.len(), 0); + assert!(!err.is_retryable()); + } +} diff --git a/src/meta-srv/src/procedure/repartition/deallocate_region.rs b/src/meta-srv/src/procedure/repartition/deallocate_region.rs index 12233c27e7..3f5dc5bd8e 100644 --- a/src/meta-srv/src/procedure/repartition/deallocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/deallocate_region.rs @@ -88,7 +88,8 @@ impl State for DeallocateRegion { &ctx.persistent_ctx.schema_name, &ctx.persistent_ctx.table_name, ); - // Deallocates the regions on datanodes. + // Memory guards are not required here, + // because the table metadata still contains routes for the deallocating regions. Self::deallocate_regions( &ctx.node_manager, &ctx.leader_region_registry, @@ -116,7 +117,7 @@ impl State for DeallocateRegion { } impl DeallocateRegion { - async fn deallocate_regions( + pub(crate) async fn deallocate_regions( node_manager: &NodeManagerRef, leader_region_registry: &LeaderRegionRegistryRef, table: TableName, @@ -141,7 +142,7 @@ impl DeallocateRegion { Ok(()) } - fn filter_deallocatable_region_routes( + pub(crate) fn filter_deallocatable_region_routes( table_id: TableId, region_routes: &[RegionRoute], pending_deallocate_region_ids: &HashSet, @@ -165,7 +166,7 @@ impl DeallocateRegion { .collect::>() } - fn generate_region_routes( + pub(crate) fn generate_region_routes( region_routes: &[RegionRoute], pending_deallocate_region_ids: &HashSet, ) -> Vec { @@ -181,12 +182,21 @@ impl DeallocateRegion { #[cfg(test)] mod tests { use std::collections::HashSet; + use std::sync::Arc; + use common_meta::ddl::test_util::datanode_handler::RetryErrorDatanodeHandler; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::test_util::MockDatanodeManager; use store_api::storage::{RegionId, TableId}; + use crate::error::Error; + use crate::procedure::repartition::State; use crate::procedure::repartition::deallocate_region::DeallocateRegion; + use crate::procedure::repartition::plan::RepartitionPlanEntry; + use crate::procedure::repartition::test_util::{ + TestingEnv, current_parent_region_routes, new_parent_context, + }; fn test_region_routes(table_id: TableId) -> Vec { vec![ @@ -238,4 +248,36 @@ mod tests { assert_eq!(new_region_routes.len(), 1); assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 2)); } + + #[tokio::test] + async fn test_next_retryable_when_deallocate_regions_retry_later() { + let env = TestingEnv::new(); + let table_id = 1024; + let original_routes = test_region_routes(table_id); + + env.create_physical_table_metadata(table_id, original_routes.clone()) + .await; + + let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler)); + let mut ctx = new_parent_context(&env, node_manager, table_id); + ctx.persistent_ctx.plans = vec![RepartitionPlanEntry { + group_id: uuid::Uuid::new_v4(), + source_regions: vec![], + target_regions: vec![], + allocated_region_ids: vec![], + pending_deallocate_region_ids: vec![RegionId::new(table_id, 1)], + transition_map: vec![], + }]; + + let mut state = DeallocateRegion; + + let err = state + .next(&mut ctx, &TestingEnv::procedure_context()) + .await + .unwrap_err(); + + assert!(matches!(err, Error::DeallocateRegions { .. })); + assert!(err.is_retryable()); + assert_eq!(current_parent_region_routes(&ctx).await, original_routes); + } } diff --git a/src/meta-srv/src/procedure/repartition/dispatch.rs b/src/meta-srv/src/procedure/repartition/dispatch.rs index 02dc73362d..3a9f9376f1 100644 --- a/src/meta-srv/src/procedure/repartition/dispatch.rs +++ b/src/meta-srv/src/procedure/repartition/dispatch.rs @@ -31,7 +31,7 @@ use crate::procedure::repartition::{self, Context, State}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Dispatch; -fn build_region_mapping( +pub(crate) fn build_region_mapping( source_regions: &[RegionDescriptor], target_regions: &[RegionDescriptor], transition_map: &[Vec], @@ -106,7 +106,11 @@ impl State for Dispatch { Ok(( Box::new(Collect::new(procedure_metas)), - Status::suspended(procedures, true), + // The state is not persisted after sub-procedures are spawned. + // If metasrv restarts before all sub-procedures complete, + // it restores from the `Dispatch` state and re-dispatches them. + // This is safe because the sub-procedures are idempotent. + Status::suspended(procedures, false), )) } diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index f0cb1c4dd0..e5a06f79a8 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -41,14 +41,18 @@ use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, StringKey, UserMetadata, }; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; use uuid::Uuid; use crate::error::{self, Result}; +use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest; +use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion; +use crate::procedure::repartition::group::remap_manifest::RemapManifest; use crate::procedure::repartition::group::repartition_start::RepartitionStart; +use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::plan::RegionDescriptor; use crate::procedure::repartition::utils::get_datanode_table_value; use crate::procedure::repartition::{self}; @@ -192,6 +196,62 @@ impl RepartitionGroupProcedure { Ok(Self { state, context }) } + + async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> { + if !self.should_rollback_metadata() { + return Ok(()); + } + + let table_lock = + common_meta::lock_key::TableLock::Write(self.context.persistent_ctx.table_id).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; + UpdateMetadata::RollbackStaging + .rollback_staging_regions(&mut self.context) + .await?; + + if let Err(err) = self.context.invalidate_table_cache().await { + warn!( + err; + "Failed to broadcast the invalidate table cache message during repartition group rollback" + ); + } + + Ok(()) + } + + /// Returns whether group rollback should revert staging metadata. + /// + /// This uses an "after metadata apply, before exit staging" semantic. + /// Once execution reaches `UpdateMetadata::ApplyStaging` or any later staging state, + /// rollback must restore table-route metadata back to the pre-apply view. + /// + /// State flow: + /// `RepartitionStart -> SyncRegion -> UpdateMetadata::ApplyStaging -> EnterStagingRegion` + /// ` -> RemapManifest -> ApplyStagingManifest -> UpdateMetadata::ExitStaging -> RepartitionEnd` + /// ` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^` + /// ` rollback staging metadata` + /// + /// Notes: + /// - `RepartitionStart` / `SyncRegion`: no-op, metadata has not been staged yet. + /// - `UpdateMetadata::ApplyStaging` / `EnterStagingRegion` / `RemapManifest` / + /// `ApplyStagingManifest` / `UpdateMetadata::RollbackStaging`: rollback-active. + /// - `UpdateMetadata::ExitStaging` / `RepartitionEnd`: excluded, because metadata has + /// already moved into the post-commit exit path. + fn should_rollback_metadata(&self) -> bool { + self.state.as_any().is::() + || self.state.as_any().is::() + || self.state.as_any().is::() + || self + .state + .as_any() + .downcast_ref::() + .is_some_and(|state| { + matches!( + state, + UpdateMetadata::ApplyStaging | UpdateMetadata::RollbackStaging + ) + }) + } } #[async_trait::async_trait] @@ -200,6 +260,12 @@ impl Procedure for RepartitionGroupProcedure { Self::TYPE_NAME } + async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> { + self.rollback_inner(ctx) + .await + .map_err(ProcedureError::external) + } + #[tracing::instrument(skip_all, fields( state = %self.state.name(), table_id = self.context.persistent_ctx.table_id, @@ -238,7 +304,7 @@ impl Procedure for RepartitionGroupProcedure { } fn rollback_supported(&self) -> bool { - false + true } fn dump(&self) -> ProcedureResult { @@ -304,7 +370,7 @@ impl Context { pub struct GroupPrepareResult { /// The validated source region routes. pub source_routes: Vec, - /// The validated target region routes. + /// Validated target region routes used for metadata rollback (logical rollback). pub target_routes: Vec, /// The primary source region id (first source region), used for retrieving region options. pub central_region: RegionId, @@ -599,12 +665,149 @@ pub(crate) trait State: Sync + Send + Debug { mod tests { use std::assert_matches; use std::sync::Arc; + use std::time::Duration; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::test_util::MockKvBackendBuilder; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId}; + use common_procedure_test::MockContextProvider; + use partition::expr::PartitionExpr; + use store_api::storage::RegionId; + use super::{ + Context, PersistentContext, RepartitionGroupProcedure, RepartitionStart, State, + region_routes, + }; use crate::error::Error; - use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context}; + use crate::procedure::repartition::dispatch::build_region_mapping; + use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest; + use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion; + use crate::procedure::repartition::group::remap_manifest::RemapManifest; + use crate::procedure::repartition::group::repartition_start::RepartitionStart as GroupRepartitionStart; + use crate::procedure::repartition::group::sync_region::SyncRegion; + use crate::procedure::repartition::group::update_metadata::UpdateMetadata; + use crate::procedure::repartition::plan; + use crate::procedure::repartition::repartition_start::RepartitionStart as ParentRepartitionStart; + use crate::procedure::repartition::test_util::{ + TestingEnv, new_persistent_context, range_expr, + }; + + struct GroupRollbackFixture { + context: Context, + original_region_routes: Vec, + next_state: Option>, + } + + async fn new_group_rollback_fixture( + original_region_routes: Vec, + from_exprs: Vec, + to_exprs: Vec, + sync_region: bool, + ) -> GroupRollbackFixture { + let env = TestingEnv::new(); + let procedure_ctx = TestingEnv::procedure_context(); + let table_id = 1024; + let mut next_region_number = 10; + + env.create_physical_table_metadata(table_id, original_region_routes.clone()) + .await; + + let (_, physical_route) = env + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await + .unwrap(); + let allocation_plans = + ParentRepartitionStart::build_plan(&physical_route, &from_exprs, &to_exprs).unwrap(); + assert_eq!(allocation_plans.len(), 1); + + let repartition_plan = plan::convert_allocation_plan_to_repartition_plan( + table_id, + &mut next_region_number, + &allocation_plans[0], + ); + let region_mapping = build_region_mapping( + &repartition_plan.source_regions, + &repartition_plan.target_regions, + &repartition_plan.transition_map, + ); + let persistent_context = PersistentContext::new( + repartition_plan.group_id, + table_id, + "test_catalog".to_string(), + "test_schema".to_string(), + repartition_plan.source_regions, + repartition_plan.target_regions, + region_mapping, + sync_region, + repartition_plan.allocated_region_ids, + repartition_plan.pending_deallocate_region_ids, + Duration::from_secs(120), + ); + let mut context = env.create_context(persistent_context); + let (next_state, _) = GroupRepartitionStart + .next(&mut context, &procedure_ctx) + .await + .unwrap(); + + GroupRollbackFixture { + context, + original_region_routes, + next_state: Some(next_state), + } + } + + async fn new_split_group_rollback_fixture(sync_region: bool) -> GroupRollbackFixture { + new_group_rollback_fixture( + vec![ + new_region_route(RegionId::new(1024, 1), Some(range_expr("x", 0, 100))), + new_region_route(RegionId::new(1024, 2), Some(range_expr("x", 100, 200))), + new_region_route(RegionId::new(1024, 10), None), + ], + vec![range_expr("x", 0, 100)], + vec![range_expr("x", 0, 50), range_expr("x", 50, 100)], + sync_region, + ) + .await + } + + async fn new_merge_group_rollback_fixture(sync_region: bool) -> GroupRollbackFixture { + new_group_rollback_fixture( + vec![ + new_region_route(RegionId::new(1024, 1), Some(range_expr("x", 0, 100))), + new_region_route(RegionId::new(1024, 2), Some(range_expr("x", 100, 200))), + new_region_route(RegionId::new(1024, 3), Some(range_expr("x", 200, 300))), + ], + vec![range_expr("x", 0, 100), range_expr("x", 100, 200)], + vec![range_expr("x", 0, 200)], + sync_region, + ) + .await + } + + async fn stage_metadata(context: &mut Context) { + UpdateMetadata::ApplyStaging + .apply_staging_regions(context) + .await + .unwrap(); + } + + fn new_region_route(region_id: RegionId, partition_expr: Option) -> RegionRoute { + RegionRoute { + region: Region { + id: region_id, + partition_expr: partition_expr + .map(|expr| expr.as_json_str().unwrap()) + .unwrap_or_default(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + } + } #[tokio::test] async fn test_get_table_route_value_not_found_error() { @@ -653,4 +856,198 @@ mod tests { let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err(); assert!(err.is_retryable()); } + + #[tokio::test] + async fn test_group_rollback_supported() { + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(1024, vec![], vec![]); + let procedure = RepartitionGroupProcedure { + state: Box::new(RepartitionStart), + context: env.create_context(persistent_context), + }; + + assert!(procedure.rollback_supported()); + } + + #[tokio::test] + async fn test_group_rollback_is_noop_before_apply_staging() { + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(1024, vec![], vec![]); + let ctx = env.create_context(persistent_context.clone()); + let mut procedure = RepartitionGroupProcedure { + state: Box::new(RepartitionStart), + context: ctx, + }; + let provider = Arc::new(MockContextProvider::new(Default::default())); + let procedure_ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider, + }; + + procedure.rollback(&procedure_ctx).await.unwrap(); + + assert!(procedure.state.as_any().is::()); + assert_eq!(procedure.context.persistent_ctx, persistent_context); + } + + async fn assert_noop_rollback( + fixture: GroupRollbackFixture, + state: Box, + assert_state: impl FnOnce(&dyn State), + ) { + let original_region_routes = fixture.original_region_routes.clone(); + let procedure_ctx = TestingEnv::procedure_context(); + let mut procedure = RepartitionGroupProcedure { + state, + context: fixture.context, + }; + + procedure.rollback(&procedure_ctx).await.unwrap(); + + assert_state(&*procedure.state); + let table_route_value = procedure + .context + .get_table_route_value() + .await + .unwrap() + .into_inner(); + let region_routes = region_routes( + procedure.context.persistent_ctx.table_id, + &table_route_value, + ) + .unwrap(); + assert_eq!(region_routes.clone(), original_region_routes); + } + + async fn assert_metadata_rollback_restores_table_route( + mut fixture: GroupRollbackFixture, + state: Box, + ) { + let original_region_routes = fixture.original_region_routes.clone(); + let procedure_ctx = TestingEnv::procedure_context(); + stage_metadata(&mut fixture.context).await; + let mut procedure = RepartitionGroupProcedure { + state, + context: fixture.context, + }; + + procedure.rollback(&procedure_ctx).await.unwrap(); + + let table_route_value = procedure + .context + .get_table_route_value() + .await + .unwrap() + .into_inner(); + let region_routes = region_routes( + procedure.context.persistent_ctx.table_id, + &table_route_value, + ) + .unwrap(); + assert_eq!(region_routes.clone(), original_region_routes); + } + + #[tokio::test] + async fn test_group_rollback_is_noop_in_sync_region() { + let mut fixture = new_split_group_rollback_fixture(true).await; + assert!( + fixture + .next_state + .as_ref() + .unwrap() + .as_any() + .is::() + ); + let state = fixture.next_state.take().unwrap(); + + assert_noop_rollback(fixture, state, |state| { + assert!(state.as_any().is::()); + }) + .await; + } + + #[tokio::test] + async fn test_group_rollback_is_noop_in_exit_staging() { + let fixture = new_split_group_rollback_fixture(false).await; + + assert_noop_rollback(fixture, Box::new(UpdateMetadata::ExitStaging), |state| { + assert!(state.as_any().is::()); + assert!(matches!( + state.as_any().downcast_ref::(), + Some(UpdateMetadata::ExitStaging) + )); + }) + .await; + } + + #[tokio::test] + async fn test_group_rollback_restores_split_routes_from_apply_staging() { + let fixture = new_split_group_rollback_fixture(false).await; + assert_metadata_rollback_restores_table_route( + fixture, + Box::new(UpdateMetadata::ApplyStaging), + ) + .await; + } + + #[tokio::test] + async fn test_group_rollback_restores_split_routes_from_enter_staging_region() { + let fixture = new_split_group_rollback_fixture(false).await; + assert_metadata_rollback_restores_table_route(fixture, Box::new(EnterStagingRegion)).await; + } + + #[tokio::test] + async fn test_group_rollback_restores_split_routes_from_remap_manifest() { + let fixture = new_split_group_rollback_fixture(false).await; + assert_metadata_rollback_restores_table_route(fixture, Box::new(RemapManifest)).await; + } + + #[tokio::test] + async fn test_group_rollback_restores_split_routes_from_apply_staging_manifest() { + let fixture = new_split_group_rollback_fixture(false).await; + assert_metadata_rollback_restores_table_route(fixture, Box::new(ApplyStagingManifest)) + .await; + } + + #[tokio::test] + async fn test_group_rollback_restores_merge_routes_and_is_idempotent() { + let mut fixture = new_merge_group_rollback_fixture(false).await; + let original_region_routes = fixture.original_region_routes.clone(); + let procedure_ctx = TestingEnv::procedure_context(); + stage_metadata(&mut fixture.context).await; + let mut procedure = RepartitionGroupProcedure { + state: Box::new(UpdateMetadata::ApplyStaging), + context: fixture.context, + }; + + procedure.rollback(&procedure_ctx).await.unwrap(); + let table_route_value = procedure + .context + .get_table_route_value() + .await + .unwrap() + .into_inner(); + let once = region_routes( + procedure.context.persistent_ctx.table_id, + &table_route_value, + ) + .unwrap() + .clone(); + procedure.rollback(&procedure_ctx).await.unwrap(); + let table_route_value = procedure + .context + .get_table_route_value() + .await + .unwrap() + .into_inner(); + let twice = region_routes( + procedure.context.persistent_ctx.table_id, + &table_route_value, + ) + .unwrap() + .clone(); + + assert_eq!(once, original_region_routes); + assert_eq!(once, twice); + } } diff --git a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs index 2020e9e2f4..43e5ee31d9 100644 --- a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs +++ b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs @@ -332,7 +332,14 @@ impl ApplyStagingManifest { ); Ok(()) - } + }, + Err(error::Error::MailboxChannelClosed {..})=> error::RetryLaterSnafu { + reason: format!( + "Mailbox closed when sending apply staging manifests to datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ), + }.fail()?, Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( "Mailbox received timeout for apply staging manifests on datanode {:?}, elapsed: {:?}", diff --git a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs index 59de569c13..911e881ac3 100644 --- a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs @@ -315,7 +315,14 @@ impl EnterStagingRegion { ); Ok(()) - } + }, + Err(error::Error::MailboxChannelClosed {..})=> error::RetryLaterSnafu { + reason: format!( + "Mailbox closed when sending enter staging regions to datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ), + }.fail()?, Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( "Mailbox received timeout for enter staging regions on datanode {:?}, elapsed: {:?}", diff --git a/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs b/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs index 6e3460c2ce..1d6a75100e 100644 --- a/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs +++ b/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs @@ -184,6 +184,14 @@ impl RemapManifest { Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer) } + Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu { + reason: format!( + "Mailbox closed when sending remap manifests to datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ), + } + .fail()?, Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( "Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}", diff --git a/src/meta-srv/src/procedure/repartition/group/sync_region.rs b/src/meta-srv/src/procedure/repartition/group/sync_region.rs index dcd58c21e9..7422ae8607 100644 --- a/src/meta-srv/src/procedure/repartition/group/sync_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/sync_region.rs @@ -273,6 +273,14 @@ impl SyncRegion { } Ok(()) } + Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu { + reason: format!( + "Mailbox closed when sending sync region to datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ), + } + .fail()?, Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}", diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs index ecde5f0507..ff01161ff5 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/apply_staging_region.rs @@ -30,7 +30,7 @@ impl UpdateMetadata { /// Abort: /// - Target region not found. /// - Source region not found. - fn apply_staging_region_routes( + pub(crate) fn apply_staging_region_routes( group_id: GroupId, sources: &[RegionDescriptor], targets: &[RegionDescriptor], @@ -50,10 +50,12 @@ impl UpdateMetadata { region_id: target.region_id, }, )?; + // Set the new partition expression for the target region route. region_route.region.partition_expr = target .partition_expr .as_json_str() .context(error::SerializePartitionExprSnafu)?; + // Set leader staging state and write route policy for the target region route. region_route.set_leader_staging(); region_route.clear_ignore_all_writes(); } @@ -65,6 +67,7 @@ impl UpdateMetadata { region_id: source.region_id, }, )?; + // Set leader staging state for the source region route. region_route.set_leader_staging(); if pending_deallocate_region_ids.contains(&source.region_id) { // When a region is pending deallocation, it should ignore all writes. diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs index e9bef4cf8e..4e6bf67fc8 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs @@ -18,10 +18,12 @@ use common_error::ext::BoxedError; use common_meta::rpc::router::RegionRoute; use common_telemetry::{error, info}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use crate::error::{self, Result}; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::group::{Context, GroupId, region_routes}; +use crate::procedure::repartition::plan::RegionDescriptor; impl UpdateMetadata { /// Rolls back the staging regions. @@ -31,8 +33,9 @@ impl UpdateMetadata { /// - Target region not found. fn rollback_staging_region_routes( group_id: GroupId, - source_routes: &[RegionRoute], - target_routes: &[RegionRoute], + sources: &[RegionDescriptor], + original_target_routes: &[RegionRoute], + pending_deallocate_region_ids: &[RegionId], current_region_routes: &[RegionRoute], ) -> Result> { let mut region_routes = current_region_routes.to_vec(); @@ -40,26 +43,35 @@ impl UpdateMetadata { .iter_mut() .map(|route| (route.region.id, route)) .collect::>(); - - for source in source_routes { - let region_route = region_routes_map.get_mut(&source.region.id).context( + for source in sources { + let region_route = region_routes_map.get_mut(&source.region_id).context( error::RepartitionSourceRegionMissingSnafu { group_id, - region_id: source.region.id, + region_id: source.region_id, }, )?; - region_route.region.partition_expr = source.region.partition_expr.clone(); + // Clean leader staging state for source regions. region_route.clear_leader_staging(); - region_route.clear_ignore_all_writes(); + if pending_deallocate_region_ids.contains(&source.region_id) { + // Clean ignore all writes state for source regions if it's pending to be deallocated, + // which means the source region is merged into the target region. + region_route.clear_ignore_all_writes(); + } } - for target in target_routes { + for target in original_target_routes { let region_route = region_routes_map.get_mut(&target.region.id).context( error::RepartitionTargetRegionMissingSnafu { group_id, region_id: target.region.id, }, )?; + + // Revert the partition expression and write route policy to the original value for the target region. + region_route.region.partition_expr = target.region.partition_expr.clone(); + region_route.write_route_policy = target.write_route_policy; + + // Clean leader staging state for target regions. region_route.clear_leader_staging(); } @@ -83,8 +95,9 @@ impl UpdateMetadata { let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); let new_region_routes = Self::rollback_staging_region_routes( group_id, - &prepare_result.source_routes, + &ctx.persistent_ctx.sources, &prepare_result.target_routes, + &ctx.persistent_ctx.pending_deallocate_region_ids, region_routes, )?; @@ -113,87 +126,176 @@ impl UpdateMetadata { #[cfg(test)] mod tests { + use std::collections::HashSet; + use common_meta::peer::Peer; use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use store_api::storage::RegionId; use uuid::Uuid; use crate::procedure::repartition::group::update_metadata::UpdateMetadata; + use crate::procedure::repartition::plan::RegionDescriptor; use crate::procedure::repartition::test_util::range_expr; + fn new_region_route( + region_id: RegionId, + partition_expr: &str, + leader_state: Option, + ignore_all_writes: bool, + ) -> RegionRoute { + let mut route = RegionRoute { + region: Region { + id: region_id, + partition_expr: partition_expr.to_string(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + leader_state, + ..Default::default() + }; + + if ignore_all_writes { + route.set_ignore_all_writes(); + } + + route + } + + fn original_target_routes( + region_routes: &[RegionRoute], + targets: &[RegionDescriptor], + ) -> Vec { + let target_ids = targets + .iter() + .map(|target| target.region_id) + .collect::>(); + region_routes + .iter() + .filter(|route| target_ids.contains(&route.region.id)) + .cloned() + .collect() + } + #[test] - fn test_rollback_staging_region_routes() { + fn test_rollback_staging_region_routes_split_case() { let group_id = Uuid::new_v4(); let table_id = 1024; - let region_routes = vec![ - { - let mut route = RegionRoute { - region: Region { - id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), - ..Default::default() - }, - leader_peer: Some(Peer::empty(1)), - leader_state: Some(LeaderState::Staging), - ..Default::default() - }; - route.set_ignore_all_writes(); - route + let original_region_routes = vec![ + new_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + None, + false, + ), + new_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + None, + false, + ), + new_region_route(RegionId::new(table_id, 3), "", None, false), + ]; + let sources = vec![RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 100), + }]; + let targets = vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 50), }, - RegionRoute { - region: Region { - id: RegionId::new(table_id, 2), - partition_expr: String::new(), - ..Default::default() - }, - leader_peer: Some(Peer::empty(1)), - leader_state: Some(LeaderState::Staging), - ..Default::default() - }, - RegionRoute { - region: Region { - id: RegionId::new(table_id, 3), - partition_expr: String::new(), - ..Default::default() - }, - leader_peer: Some(Peer::empty(1)), - leader_state: Some(LeaderState::Downgrading), - ..Default::default() + RegionDescriptor { + region_id: RegionId::new(table_id, 3), + partition_expr: range_expr("x", 50, 100), }, ]; - let source_routes = vec![RegionRoute { - region: Region { - id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 20).as_json_str().unwrap(), - ..Default::default() - }, - leader_peer: Some(Peer::empty(1)), - ..Default::default() - }]; - let target_routes = vec![RegionRoute { - region: Region { - id: RegionId::new(table_id, 2), - partition_expr: range_expr("x", 0, 20).as_json_str().unwrap(), - ..Default::default() - }, - leader_peer: Some(Peer::empty(1)), - ..Default::default() - }]; - let new_region_routes = UpdateMetadata::rollback_staging_region_routes( + let applied_region_routes = UpdateMetadata::apply_staging_region_routes( group_id, - &source_routes, - &target_routes, - ®ion_routes, + &sources, + &targets, + &[], + &original_region_routes, ) .unwrap(); - assert!(!new_region_routes[0].is_leader_staging()); - assert!(!new_region_routes[0].is_ignore_all_writes()); - assert_eq!( - new_region_routes[0].region.partition_expr, - range_expr("x", 0, 20).as_json_str().unwrap(), - ); - assert!(!new_region_routes[1].is_leader_staging()); - assert!(!new_region_routes[1].is_ignore_all_writes()); - assert!(new_region_routes[2].is_leader_downgrading()); + let target_routes = original_target_routes(&original_region_routes, &targets); + let new_region_routes = UpdateMetadata::rollback_staging_region_routes( + group_id, + &sources, + &target_routes, + &[], + &applied_region_routes, + ) + .unwrap(); + + assert_eq!(new_region_routes, original_region_routes); + } + + #[test] + fn test_rollback_staging_region_routes_merge_case_is_idempotent() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let original_region_routes = vec![ + new_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + None, + false, + ), + new_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + None, + false, + ), + new_region_route( + RegionId::new(table_id, 3), + &range_expr("x", 200, 300).as_json_str().unwrap(), + None, + false, + ), + ]; + let sources = vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 100), + }, + RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 100, 200), + }, + ]; + let targets = vec![RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 200), + }]; + let target_routes = original_target_routes(&original_region_routes, &targets); + let applied_region_routes = UpdateMetadata::apply_staging_region_routes( + group_id, + &sources, + &targets, + &[RegionId::new(table_id, 2)], + &original_region_routes, + ) + .unwrap(); + + let once = UpdateMetadata::rollback_staging_region_routes( + group_id, + &sources, + &target_routes, + &[RegionId::new(table_id, 2)], + &applied_region_routes, + ) + .unwrap(); + let twice = UpdateMetadata::rollback_staging_region_routes( + group_id, + &sources, + &target_routes, + &[RegionId::new(table_id, 2)], + &once, + ) + .unwrap(); + + assert_eq!(once, original_region_routes); + assert_eq!(once, twice); } } diff --git a/src/meta-srv/src/procedure/repartition/repartition_start.rs b/src/meta-srv/src/procedure/repartition/repartition_start.rs index 1f657d58f2..5c6bcfdb06 100644 --- a/src/meta-srv/src/procedure/repartition/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/repartition_start.rs @@ -102,7 +102,7 @@ impl State for RepartitionStart { } impl RepartitionStart { - fn build_plan( + pub(crate) fn build_plan( physical_route: &PhysicalTableRouteValue, from_exprs: &[PartitionExpr], to_exprs: &[PartitionExpr], diff --git a/src/meta-srv/src/procedure/repartition/test_util.rs b/src/meta-srv/src/procedure/repartition/test_util.rs index 3cefd4a095..83856a49e6 100644 --- a/src/meta-srv/src/procedure/repartition/test_util.rs +++ b/src/meta-srv/src/procedure/repartition/test_util.rs @@ -16,22 +16,41 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; +use common_meta::ddl::DdlContext; +use common_meta::key::table_route::TableRouteValue; +use common_meta::key::test_utils::{new_test_table_info, new_test_table_info_with_name}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::node_manager::NodeManagerRef; +use common_meta::peer::Peer; +use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceBuilder; +use common_meta::test_util::new_ddl_context_with_kv_backend; +use common_procedure::{ + Context as ProcedureContext, ContextProvider, ProcedureId, ProcedureState, Status, +}; +use common_procedure_test::MockContextProvider; +use common_wal::options::{KafkaWalOptions, WalOptions}; use datatypes::value::Value; use partition::expr::{PartitionExpr, col}; -use store_api::storage::TableId; +use store_api::storage::{RegionId, RegionNumber, TableId}; +use table::table_name::TableName; +use tokio::sync::watch; use uuid::Uuid; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::metasrv::MetasrvInfo; use crate::procedure::repartition::group::{Context, PersistentContext, VolatileContext}; use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::{ + Context as ParentContext, PersistentContext as ParentPersistentContext, RepartitionProcedure, +}; use crate::procedure::test_util::MailboxContext; /// `TestingEnv` provides components during the tests. pub struct TestingEnv { + pub kv_backend: KvBackendRef, pub table_metadata_manager: TableMetadataManagerRef, pub mailbox_ctx: MailboxContext, pub server_addr: String, @@ -45,13 +64,14 @@ impl Default for TestingEnv { impl TestingEnv { pub fn new() -> Self { - let kv_backend = Arc::new(MemoryKvBackend::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); let mailbox_sequence = SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); let mailbox_ctx = MailboxContext::new(mailbox_sequence); Self { + kv_backend, table_metadata_manager, mailbox_ctx, server_addr: "localhost".to_string(), @@ -76,6 +96,65 @@ impl TestingEnv { volatile_ctx: VolatileContext::default(), } } + + pub fn procedure_context() -> ProcedureContext { + ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + } + } + + pub async fn create_physical_table_metadata( + &self, + table_id: TableId, + region_routes: Vec, + ) { + self.create_physical_table_metadata_with_wal_options( + table_id, + region_routes, + HashMap::default(), + ) + .await; + } + + pub async fn create_physical_table_metadata_with_wal_options( + &self, + table_id: TableId, + region_routes: Vec, + region_wal_options: HashMap, + ) { + self.table_metadata_manager + .create_table_metadata( + new_test_table_info(table_id), + TableRouteValue::physical(region_routes), + region_wal_options, + ) + .await + .unwrap(); + } + + pub async fn create_physical_table_metadata_for_repartition( + &self, + table_id: TableId, + region_routes: Vec, + region_wal_options: HashMap, + ) { + let mut table_info = new_test_table_info_with_name(table_id, "test_table"); + table_info.meta.column_ids = vec![0, 1, 2]; + + self.table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(region_routes), + region_wal_options, + ) + .await + .unwrap(); + } + + pub fn ddl_context(&self, node_manager: NodeManagerRef) -> DdlContext { + new_ddl_context_with_kv_backend(node_manager, self.kv_backend.clone()) + } } pub fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr { @@ -84,6 +163,18 @@ pub fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr { .and(col(col_name).lt(Value::Int64(end))) } +pub fn test_region_wal_options(region_numbers: &[RegionNumber]) -> HashMap { + let wal_options = serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "test_topic".to_string(), + })) + .unwrap(); + + region_numbers + .iter() + .map(|region_number| (*region_number, wal_options.clone())) + .collect() +} + pub fn new_persistent_context( table_id: TableId, sources: Vec, @@ -105,3 +196,110 @@ pub fn new_persistent_context( timeout: Duration::from_secs(120), } } + +pub fn test_region_route(region_id: RegionId, partition_expr: &str) -> RegionRoute { + RegionRoute { + region: Region { + id: region_id, + partition_expr: partition_expr.to_string(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + } +} + +pub async fn current_parent_region_routes(ctx: &ParentContext) -> Vec { + let table_route_value = ctx.get_table_route_value().await.unwrap().into_inner(); + table_route_value.region_routes().unwrap().clone() +} + +pub fn new_parent_context( + env: &TestingEnv, + node_manager: NodeManagerRef, + table_id: TableId, +) -> ParentContext { + let ddl_ctx = env.ddl_context(node_manager); + let persistent_ctx = ParentPersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + + ParentContext::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ) +} + +pub fn assert_parent_state(procedure: &RepartitionProcedure) { + assert!(procedure.state.as_any().is::()); +} + +pub fn extract_subprocedure_ids(status: Status) -> Vec { + let Status::Suspended { subprocedures, .. } = status else { + panic!("expected suspended status"); + }; + + subprocedures + .into_iter() + .map(|procedure| procedure.id) + .collect() +} + +pub fn procedure_state_receiver(state: ProcedureState) -> watch::Receiver { + let (tx, rx) = watch::channel(ProcedureState::Running); + tx.send(state).unwrap(); + rx +} + +pub fn procedure_context_with_receivers( + receivers: HashMap>, +) -> ProcedureContext { + ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(ProcedureStateReceiverProvider { + receivers, + inner: MockContextProvider::default(), + }), + } +} + +struct ProcedureStateReceiverProvider { + receivers: HashMap>, + inner: MockContextProvider, +} + +#[async_trait::async_trait] +impl ContextProvider for ProcedureStateReceiverProvider { + async fn procedure_state( + &self, + procedure_id: ProcedureId, + ) -> common_procedure::Result> { + self.inner.procedure_state(procedure_id).await + } + + async fn procedure_state_receiver( + &self, + procedure_id: ProcedureId, + ) -> common_procedure::Result>> { + Ok(self.receivers.get(&procedure_id).cloned()) + } + + async fn try_put_poison( + &self, + key: &common_procedure::PoisonKey, + procedure_id: ProcedureId, + ) -> common_procedure::Result<()> { + self.inner.try_put_poison(key, procedure_id).await + } + + async fn acquire_lock( + &self, + key: &common_procedure::StringKey, + ) -> common_procedure::local::DynamicKeyLockGuard { + self.inner.acquire_lock(key).await + } +} diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index bea2195573..5ea8e00038 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -190,6 +190,23 @@ pub(crate) async fn flush_region( operation: "Flush regions", } .fail(), + Err(error::Error::MailboxChannelClosed { .. }) => match error_strategy { + ErrorStrategy::Ignore => { + warn!( + "Failed to flush regions({:?}), the datanode({}) is unreachable(MailboxChannelClosed). Skip flush operation.", + region_ids, datanode + ); + Ok(()) + } + ErrorStrategy::Retry => error::RetryLaterSnafu { + reason: format!( + "Mailbox closed when sending flush region to datanode {:?}, elapsed: {:?}", + datanode, + now.elapsed() + ), + } + .fail()?, + }, Err(err) => Err(err), } }