From 2f242927a832ec348b3b6e5ce8e41399bf5cbeb3 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 7 Jan 2026 14:13:48 +0800 Subject: [PATCH] feat(repartition): implement region deallocation for repartition procedure (#7522) * feat: implement deallocate regions for repartition procedure Signed-off-by: WenyXu * feat(metric-engine): add force flag to drop physical regions with associated logical regions Signed-off-by: WenyXu * feat: update table metadata after deallocating regions Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * chore: update proto Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 2 +- Cargo.toml | 2 +- .../meta/src/ddl/drop_database/executor.rs | 8 +- src/common/meta/src/ddl/drop_table.rs | 10 +- .../meta/src/ddl/drop_table/executor.rs | 14 +- src/datanode/src/region_server.rs | 10 +- src/meta-srv/src/error.rs | 16 +- .../region_migration/open_candidate_region.rs | 4 +- src/meta-srv/src/procedure/repartition.rs | 116 ++++++++++- .../repartition/deallocate_region.rs | 194 +++++++++++++++++- .../src/procedure/repartition/group.rs | 22 +- .../repartition/repartition_start.rs | 14 +- .../src/procedure/repartition/utils.rs | 49 +++++ src/metric-engine/src/engine.rs | 70 ++++++- src/metric-engine/src/engine/drop.rs | 18 +- src/mito2/src/engine/drop_test.rs | 15 +- src/store-api/src/region_request.rs | 7 + 17 files changed, 518 insertions(+), 53 deletions(-) create mode 100644 src/meta-srv/src/procedure/repartition/utils.rs diff --git a/Cargo.lock b/Cargo.lock index fc0e240a40..fadd08359e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5467,7 +5467,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fa0f5716556b3276317701fffa702002e7fce275#fa0f5716556b3276317701fffa702002e7fce275" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0e316b86d765e4718d6f0ca77b1ad179f222b822#0e316b86d765e4718d6f0ca77b1ad179f222b822" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 9ed26f3328..a62ded4c53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fa0f5716556b3276317701fffa702002e7fce275" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0e316b86d765e4718d6f0ca77b1ad179f222b822" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 21ab264a92..99ecee5166 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -120,7 +120,13 @@ impl State for DropDatabaseExecutor { .await?; executor.invalidate_table_cache(ddl_ctx).await?; executor - .on_drop_regions(ddl_ctx, &self.physical_region_routes, true) + .on_drop_regions( + &ddl_ctx.node_manager, + &ddl_ctx.leader_region_registry, + &self.physical_region_routes, + true, + false, + ) .await?; info!("Table: {}({}) is dropped", self.table_name, self.table_id); diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index bc51ba1918..6fca1593b9 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod executor; +pub mod executor; mod metadata; use std::collections::HashMap; @@ -156,7 +156,13 @@ impl DropTableProcedure { pub async fn on_datanode_drop_regions(&mut self) -> Result { self.executor - .on_drop_regions(&self.context, &self.data.physical_region_routes, false) + .on_drop_regions( + &self.context.node_manager, + &self.context.leader_region_registry, + &self.data.physical_region_routes, + false, + false, + ) .await?; self.data.state = DropTableState::DeleteTombstone; Ok(Status::executing(true)) diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 63ec05ab1c..d639aa596f 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -36,6 +36,8 @@ use crate::error::{self, Result}; use crate::instruction::CacheIdent; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; +use crate::node_manager::NodeManagerRef; +use crate::region_registry::LeaderRegionRegistryRef; use crate::rpc::router::{ RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders, operating_leader_regions, @@ -212,16 +214,18 @@ impl DropTableExecutor { /// Drops region on datanode. pub async fn on_drop_regions( &self, - ctx: &DdlContext, + node_manager: &NodeManagerRef, + leader_region_registry: &LeaderRegionRegistryRef, region_routes: &[RegionRoute], fast_path: bool, + force: bool, ) -> Result<()> { // Drops leader regions on datanodes. let leaders = find_leaders(region_routes); let mut drop_region_tasks = Vec::with_capacity(leaders.len()); let table_id = self.table_id; for datanode in leaders { - let requester = ctx.node_manager.datanode(&datanode).await; + let requester = node_manager.datanode(&datanode).await; let regions = find_leader_regions(region_routes, &datanode); let region_ids = regions .iter() @@ -238,6 +242,7 @@ impl DropTableExecutor { body: Some(region_request::Body::Drop(PbDropRegionRequest { region_id: region_id.as_u64(), fast_path, + force, })), }; let datanode = datanode.clone(); @@ -262,7 +267,7 @@ impl DropTableExecutor { let followers = find_followers(region_routes); let mut close_region_tasks = Vec::with_capacity(followers.len()); for datanode in followers { - let requester = ctx.node_manager.datanode(&datanode).await; + let requester = node_manager.datanode(&datanode).await; let regions = find_follower_regions(region_routes, &datanode); let region_ids = regions .iter() @@ -307,8 +312,7 @@ impl DropTableExecutor { // Deletes the leader region from registry. let region_ids = operating_leader_regions(region_routes); - ctx.leader_region_registry - .batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id)); + leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id)); Ok(()) } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index e35af7536b..26ea247e5c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -1641,7 +1641,10 @@ mod tests { let response = mock_region_server .handle_request( region_id, - RegionRequest::Drop(RegionDropRequest { fast_path: false }), + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: false, + }), ) .await .unwrap(); @@ -1739,7 +1742,10 @@ mod tests { mock_region_server .handle_request( region_id, - RegionRequest::Drop(RegionDropRequest { fast_path: false }), + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: false, + }), ) .await .unwrap_err(); diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 8829de44f2..838dbba64a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -110,11 +110,11 @@ pub enum Error { }, #[snafu(display( - "Another procedure is opening the region: {} on peer: {}", + "Another procedure is operating the region: {} on peer: {}", region_id, peer_id ))] - RegionOpeningRace { + RegionOperatingRace { #[snafu(implicit)] location: Location, peer_id: DatanodeId, @@ -1059,6 +1059,15 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to deallocate regions for table: {}", table_id))] + DeallocateRegions { + #[snafu(implicit)] + location: Location, + table_id: TableId, + #[snafu(source)] + source: common_meta::error::Error, + }, } impl Error { @@ -1154,7 +1163,7 @@ impl ErrorExt for Error { | Error::InvalidUtf8Value { .. } | Error::UnexpectedInstructionReply { .. } | Error::Unexpected { .. } - | Error::RegionOpeningRace { .. } + | Error::RegionOperatingRace { .. } | Error::RegionRouteNotFound { .. } | Error::MigrationAbort { .. } | Error::MigrationRunning { .. } @@ -1206,6 +1215,7 @@ impl ErrorExt for Error { Error::Other { source, .. } => source.status_code(), Error::RepartitionCreateSubtasks { source, .. } => source.status_code(), Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(), + Error::DeallocateRegions { source, .. } => source.status_code(), Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted, #[cfg(feature = "pg_kvbackend")] diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index e3d8c027fc..b110a6007b 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -129,7 +129,7 @@ impl OpenCandidateRegion { let guard = ctx .opening_region_keeper .register(candidate.id, *region_id) - .context(error::RegionOpeningRaceSnafu { + .context(error::RegionOperatingRaceSnafu { peer_id: candidate.id, region_id: *region_id, })?; @@ -302,7 +302,7 @@ mod tests { .await .unwrap_err(); - assert_matches!(err, Error::RegionOpeningRace { .. }); + assert_matches!(err, Error::RegionOperatingRace { .. }); assert!(!err.is_retryable()); } diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index 1d6a2b2932..3d58f473b7 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -20,18 +20,29 @@ pub mod group; pub mod plan; pub mod repartition_end; pub mod repartition_start; +pub mod utils; use std::any::Any; use std::fmt::Debug; +use common_error::ext::BoxedError; use common_meta::cache_invalidator::CacheInvalidatorRef; -use common_meta::key::TableMetadataManagerRef; +use common_meta::instruction::CacheIdent; +use common_meta::key::datanode_table::RegionInfo; +use common_meta::key::table_route::TableRouteValue; +use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use common_meta::node_manager::NodeManagerRef; +use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; +use common_meta::region_registry::LeaderRegionRegistryRef; +use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; use store_api::storage::TableId; -use crate::error::Result; +use crate::error::{self, Result}; use crate::procedure::repartition::plan::RepartitionPlanEntry; +use crate::procedure::repartition::utils::get_datanode_table_value; use crate::service::mailbox::MailboxRef; #[cfg(test)] @@ -46,14 +57,115 @@ pub struct PersistentContext { pub plans: Vec, } +pub struct VolatileContext { + pub allocating_regions: Vec, +} + pub struct Context { pub persistent_ctx: PersistentContext, + pub volatile_ctx: VolatileContext, pub table_metadata_manager: TableMetadataManagerRef, + pub memory_region_keeper: MemoryRegionKeeperRef, + pub node_manager: NodeManagerRef, + pub leader_region_registry: LeaderRegionRegistryRef, pub mailbox: MailboxRef, pub server_addr: String, pub cache_invalidator: CacheInvalidatorRef, } +impl Context { + /// Retrieves the table route value for the given table id. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + /// + /// Abort: + /// - Table route not found. + pub async fn get_table_route_value( + &self, + ) -> Result> { + let table_id = self.persistent_ctx.table_id; + let table_route_value = self + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get_with_raw_bytes(table_id) + .await + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get table route for table: {}", table_id), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + Ok(table_route_value) + } + + /// Updates the table route. + /// + /// Retry: + /// - Failed to retrieve the metadata of datanode table. + /// + /// Abort: + /// - Table route not found. + /// - Failed to update the table route. + pub async fn update_table_route( + &self, + current_table_route_value: &DeserializedValueWithBytes, + new_region_routes: Vec, + ) -> Result<()> { + let table_id = self.persistent_ctx.table_id; + if new_region_routes.is_empty() { + return error::UnexpectedSnafu { + violated: format!("new_region_routes is empty for table: {}", table_id), + } + .fail(); + } + let datanode_id = new_region_routes + .first() + .unwrap() + .leader_peer + .as_ref() + .context(error::NoLeaderSnafu)? + .id; + let datanode_table_value = + get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?; + + let RegionInfo { + region_options, + region_wal_options, + .. + } = &datanode_table_value.region_info; + self.table_metadata_manager + .update_table_route( + table_id, + datanode_table_value.region_info.clone(), + current_table_route_value, + new_region_routes, + region_options, + region_wal_options, + ) + .await + .context(error::TableMetadataManagerSnafu) + } + + /// Broadcasts the invalidate table cache message. + pub async fn invalidate_table_cache(&self) -> Result<()> { + let table_id = self.persistent_ctx.table_id; + let subject = format!( + "Invalidate table cache for repartition table, table: {}", + table_id, + ); + let ctx = common_meta::cache_invalidator::Context { + subject: Some(subject), + }; + let _ = self + .cache_invalidator + .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) + .await; + Ok(()) + } +} + #[async_trait::async_trait] #[typetag::serde(tag = "repartition_state")] pub(crate) trait State: Sync + Send + Debug { diff --git a/src/meta-srv/src/procedure/repartition/deallocate_region.rs b/src/meta-srv/src/procedure/repartition/deallocate_region.rs index e297439aee..c369e9d5d1 100644 --- a/src/meta-srv/src/procedure/repartition/deallocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/deallocate_region.rs @@ -13,11 +13,23 @@ // limitations under the License. use std::any::Any; +use std::collections::{HashMap, HashSet}; +use common_meta::ddl::drop_table::executor::DropTableExecutor; +use common_meta::lock_key::TableLock; +use common_meta::node_manager::NodeManagerRef; +use common_meta::region_registry::LeaderRegionRegistryRef; +use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::storage::{RegionId, TableId}; +use table::table_name::TableName; +use table::table_reference::TableReference; -use crate::error::Result; +use crate::error::{self, Result}; +use crate::procedure::repartition::group::region_routes; use crate::procedure::repartition::repartition_end::RepartitionEnd; use crate::procedure::repartition::{Context, State}; @@ -30,7 +42,7 @@ impl State for DeallocateRegion { async fn next( &mut self, ctx: &mut Context, - _procedure_ctx: &ProcedureContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { let region_to_deallocate = ctx .persistent_ctx @@ -42,11 +54,185 @@ impl State for DeallocateRegion { return Ok((Box::new(RepartitionEnd), Status::done())); } - // TODO(weny): deallocate regions. - todo!() + let table_id = ctx.persistent_ctx.table_id; + let pending_deallocate_region_ids = ctx + .persistent_ctx + .plans + .iter() + .flat_map(|p| p.pending_deallocate_region_ids.iter()) + .cloned() + .collect::>(); + info!( + "Deallocating regions: {:?} for table: {} during repartition procedure", + pending_deallocate_region_ids, table_id + ); + + let table_lock = TableLock::Write(table_id).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; + let table_route_value = ctx.get_table_route_value().await?; + let deallocating_regions = { + let region_routes = region_routes(table_id, &table_route_value)?; + Self::filter_deallocatable_region_routes( + table_id, + region_routes, + &pending_deallocate_region_ids, + ) + }; + + let table_ref = TableReference::full( + &ctx.persistent_ctx.catalog_name, + &ctx.persistent_ctx.schema_name, + &ctx.persistent_ctx.table_name, + ); + // Deallocates the regions on datanodes. + Self::deallocate_regions( + &ctx.node_manager, + &ctx.leader_region_registry, + table_ref.into(), + table_id, + &deallocating_regions, + ) + .await?; + + // Safety: the table route must be physical, so we can safely unwrap the region routes. + let region_routes = table_route_value.region_routes().unwrap(); + let new_region_routes = + Self::generate_region_routes(region_routes, &pending_deallocate_region_ids); + ctx.update_table_route(&table_route_value, new_region_routes) + .await?; + ctx.invalidate_table_cache().await?; + + Ok((Box::new(RepartitionEnd), Status::executing(false))) } fn as_any(&self) -> &dyn Any { self } } + +impl DeallocateRegion { + #[allow(dead_code)] + async fn deallocate_regions( + node_manager: &NodeManagerRef, + leader_region_registry: &LeaderRegionRegistryRef, + table: TableName, + table_id: TableId, + region_routes: &[RegionRoute], + ) -> Result<()> { + let executor = DropTableExecutor::new(table, table_id, false); + // Note: Consider adding an option to forcefully drop the physical region, + // which would involve dropping all logical regions associated with that physical region. + executor + .on_drop_regions( + node_manager, + leader_region_registry, + region_routes, + false, + true, + ) + .await + .context(error::DeallocateRegionsSnafu { table_id })?; + + Ok(()) + } + + #[allow(dead_code)] + fn filter_deallocatable_region_routes( + table_id: TableId, + region_routes: &[RegionRoute], + pending_deallocate_region_ids: &HashSet, + ) -> Vec { + let region_routes_map = region_routes + .iter() + .map(|r| (r.region.id, r.clone())) + .collect::>(); + pending_deallocate_region_ids + .iter() + .filter_map(|region_id| match region_routes_map.get(region_id) { + Some(region_route) => Some(region_route.clone()), + None => { + warn!( + "Region {} not found during deallocate regions for table {:?}", + region_id, table_id + ); + None + } + }) + .collect::>() + } + + #[allow(dead_code)] + fn generate_region_routes( + region_routes: &[RegionRoute], + pending_deallocate_region_ids: &HashSet, + ) -> Vec { + // Safety: the table route must be physical, so we can safely unwrap the region routes. + region_routes + .iter() + .filter(|r| !pending_deallocate_region_ids.contains(&r.region.id)) + .cloned() + .collect() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::{RegionId, TableId}; + + use crate::procedure::repartition::deallocate_region::DeallocateRegion; + + fn test_region_routes(table_id: TableId) -> Vec { + vec![ + RegionRoute { + region: Region { + id: RegionId::new(table_id, 1), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(table_id, 2), + ..Default::default() + }, + leader_peer: Some(Peer::empty(2)), + ..Default::default() + }, + ] + } + + #[test] + fn test_filter_deallocatable_region_routes() { + let table_id = 1024; + let region_routes = test_region_routes(table_id); + let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]); + let deallocatable_region_routes = DeallocateRegion::filter_deallocatable_region_routes( + table_id, + ®ion_routes, + &pending_deallocate_region_ids, + ); + assert_eq!(deallocatable_region_routes.len(), 1); + assert_eq!( + deallocatable_region_routes[0].region.id, + RegionId::new(table_id, 1) + ); + } + + #[test] + fn test_generate_region_routes() { + let table_id = 1024; + let region_routes = test_region_routes(table_id); + let pending_deallocate_region_ids = HashSet::from([RegionId::new(table_id, 1)]); + let new_region_routes = DeallocateRegion::generate_region_routes( + ®ion_routes, + &pending_deallocate_region_ids, + ); + assert_eq!(new_region_routes.len(), 1); + assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 2)); + } +} diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index c4b72d02ea..7a5ae4a6f6 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -28,7 +28,7 @@ use std::time::Duration; use common_error::ext::BoxedError; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::instruction::CacheIdent; -use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo}; +use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; @@ -48,6 +48,7 @@ use uuid::Uuid; use crate::error::{self, Result}; use crate::procedure::repartition::group::repartition_start::RepartitionStart; use crate::procedure::repartition::plan::RegionDescriptor; +use crate::procedure::repartition::utils::get_datanode_table_value; use crate::procedure::repartition::{self}; use crate::service::mailbox::MailboxRef; @@ -261,24 +262,7 @@ impl Context { table_id: TableId, datanode_id: u64, ) -> Result { - let datanode_table_value = self - .table_metadata_manager - .datanode_table_manager() - .get(&DatanodeTableKey { - datanode_id, - table_id, - }) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get DatanodeTable: {table_id}"), - })? - .context(error::DatanodeTableNotFoundSnafu { - table_id, - datanode_id, - })?; - Ok(datanode_table_value) + get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await } /// Broadcasts the invalidate table cache message. diff --git a/src/meta-srv/src/procedure/repartition/repartition_start.rs b/src/meta-srv/src/procedure/repartition/repartition_start.rs index f9bed479c5..3792222dc9 100644 --- a/src/meta-srv/src/procedure/repartition/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/repartition_start.rs @@ -19,7 +19,7 @@ use common_procedure::{Context as ProcedureContext, Status}; use partition::expr::PartitionExpr; use partition::subtask::{self, RepartitionSubtask}; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt, ensure}; use uuid::Uuid; use crate::error::{self, Result}; @@ -51,12 +51,22 @@ impl State for RepartitionStart { ctx: &mut Context, _: &ProcedureContext, ) -> Result<(Box, Status)> { - let (_, table_route) = ctx + let (physical_table_id, table_route) = ctx .table_metadata_manager .table_route_manager() .get_physical_table_route(ctx.persistent_ctx.table_id) .await .context(error::TableMetadataManagerSnafu)?; + let table_id = ctx.persistent_ctx.table_id; + ensure!( + physical_table_id == table_id, + error::UnexpectedSnafu { + violated: format!( + "Repartition only works on the physical table, but got logical table: {}, physical table id: {}", + table_id, physical_table_id + ), + } + ); let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?; diff --git a/src/meta-srv/src/procedure/repartition/utils.rs b/src/meta-srv/src/procedure/repartition/utils.rs new file mode 100644 index 0000000000..161c0feedd --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/utils.rs @@ -0,0 +1,49 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::BoxedError; +use common_meta::key::TableMetadataManagerRef; +use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::TableId; + +use crate::error::{self, Result}; + +/// Returns the `datanode_table_value` +/// +/// Retry: +/// - Failed to retrieve the metadata of datanode table. +pub async fn get_datanode_table_value( + table_metadata_manager: &TableMetadataManagerRef, + table_id: TableId, + datanode_id: u64, +) -> Result { + let datanode_table_value = table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey { + datanode_id, + table_id, + }) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get DatanodeTable: {table_id}"), + })? + .context(error::DatanodeTableNotFoundSnafu { + table_id, + datanode_id, + })?; + Ok(datanode_table_value) +} diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 5798271ef8..cf50c01830 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -564,6 +564,7 @@ struct MetricEngineInner { #[cfg(test)] mod test { + use std::assert_matches::assert_matches; use std::collections::HashMap; use common_telemetry::info; @@ -573,12 +574,13 @@ mod test { use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use store_api::mito_engine_options::WAL_OPTIONS_KEY; use store_api::region_request::{ - PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, + PathType, RegionCloseRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, + RegionRequest, }; use super::*; use crate::maybe_skip_kafka_log_store_integration_test; - use crate::test_util::TestEnv; + use crate::test_util::{TestEnv, create_logical_region_request}; #[tokio::test] async fn close_open_regions() { @@ -882,4 +884,68 @@ mod test { } } } + + #[tokio::test] + async fn test_drop_region() { + let env = TestEnv::new().await; + let engine = env.metric(); + let physical_region_id1 = RegionId::new(1024, 0); + let logical_region_id1 = RegionId::new(1025, 0); + env.create_physical_region(physical_region_id1, "/test_dir1", vec![]) + .await; + let region_create_request1 = + create_logical_region_request(&["job"], physical_region_id1, "logical1"); + engine + .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![( + logical_region_id1, + region_create_request1, + )])) + .await + .unwrap(); + let err = engine + .handle_request( + physical_region_id1, + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: false, + }), + ) + .await + .unwrap_err(); + assert_matches!( + err.as_any().downcast_ref::().unwrap(), + &Error::PhysicalRegionBusy { .. } + ); + + engine + .handle_request( + physical_region_id1, + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: true, + }), + ) + .await + .unwrap(); + assert!( + engine + .inner + .state + .read() + .unwrap() + .physical_region_states() + .get(&physical_region_id1) + .is_none() + ); + assert!( + engine + .inner + .state + .read() + .unwrap() + .logical_regions() + .get(&logical_region_id1) + .is_none() + ); + } } diff --git a/src/metric-engine/src/engine/drop.rs b/src/metric-engine/src/engine/drop.rs index 2bdb61c70c..11e2fb7a5f 100644 --- a/src/metric-engine/src/engine/drop.rs +++ b/src/metric-engine/src/engine/drop.rs @@ -14,6 +14,7 @@ //! Drop a metric region +use common_telemetry::info; use snafu::ResultExt; use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest}; @@ -34,6 +35,7 @@ impl MetricEngineInner { ) -> Result { let data_region_id = utils::to_data_region_id(region_id); let fast_path = req.fast_path; + let force = req.force; // enclose the guard in a block to prevent the guard from polluting the async context let (is_physical_region, is_physical_region_busy) = { @@ -53,14 +55,16 @@ impl MetricEngineInner { if is_physical_region { // check if there is no logical region relates to this physical region - if is_physical_region_busy && !fast_path { + if is_physical_region_busy && !force { // reject if there is any present logical region return Err(PhysicalRegionBusySnafu { region_id: data_region_id, } .build()); } - + if is_physical_region_busy && force { + info!("Dropping physical region {} with force", data_region_id); + } return self.drop_physical_region(data_region_id).await; } @@ -102,14 +106,20 @@ impl MetricEngineInner { self.mito .handle_request( data_region_id, - RegionRequest::Drop(RegionDropRequest { fast_path: false }), + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: false, + }), ) .await .with_context(|_| CloseMitoRegionSnafu { region_id })?; self.mito .handle_request( metadata_region_id, - RegionRequest::Drop(RegionDropRequest { fast_path: false }), + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: false, + }), ) .await .with_context(|_| CloseMitoRegionSnafu { region_id })?; diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index b6231aa5a2..1231ec805c 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -71,7 +71,10 @@ async fn test_engine_drop_region_with_format(flat_format: bool) { engine .handle_request( region_id, - RegionRequest::Drop(RegionDropRequest { fast_path: false }), + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: false, + }), ) .await .unwrap_err(); @@ -105,7 +108,10 @@ async fn test_engine_drop_region_with_format(flat_format: bool) { engine .handle_request( region_id, - RegionRequest::Drop(RegionDropRequest { fast_path: false }), + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: false, + }), ) .await .unwrap(); @@ -249,7 +255,10 @@ async fn test_engine_drop_region_for_custom_store_with_format(flat_format: bool) engine .handle_request( custom_region_id, - RegionRequest::Drop(RegionDropRequest { fast_path: false }), + RegionRequest::Drop(RegionDropRequest { + fast_path: false, + force: false, + }), ) .await .unwrap(); diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index c7e069a3c1..0cd59398dc 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -271,6 +271,7 @@ fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> region_id, RegionDropRequest { fast_path: drop.fast_path, + force: drop.force, }, )) } @@ -534,7 +535,13 @@ impl RegionCreateRequest { #[derive(Debug, Clone)] pub struct RegionDropRequest { + /// Enables fast-path drop optimizations for logical regions. + /// Only applicable to the Metric Engine; ignored by others. pub fast_path: bool, + + /// Forces the drop of a physical region and all its associated logical regions. + /// Only relevant for physical regions managed by the Metric Engine. + pub force: bool, } /// Open region request.