From 48b495dfc357fa09bfb81640fcdf6e7a925e660c Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 2 Jul 2026 15:42:46 +0800 Subject: [PATCH] feat(soft-drop): deregister failure detectors and handle replayed open-regions during purge - `src/common/meta/src/ddl/drop_table.rs`: deregister failure detectors before transitioning to DeleteTombstone state - `src/common/meta/src/ddl/undrop_table.rs`: refactor `open_regions` into `open_regions_inner` with an `ignore_region_not_found` flag; expose `open_regions_ignore_region_not_found` for purge replayer - `src/common/meta/src/ddl/purge_dropped_table.rs`: use `open_regions_ignore_region_not_found` in replayed purge procedures - `src/common/meta/src/ddl/tests/drop_table.rs`: add tests for undrop idempotency and purge replay tolerance of dropped regions Signed-off-by: Lei, HUANG --- src/common/meta/src/ddl/drop_table.rs | 5 + .../meta/src/ddl/purge_dropped_table.rs | 4 +- src/common/meta/src/ddl/tests/drop_table.rs | 164 +++++++++++++++++- src/common/meta/src/ddl/undrop_table.rs | 53 +++++- 4 files changed, 216 insertions(+), 10 deletions(-) diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index ed3d551f54..f5b5bc7a22 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -200,6 +200,11 @@ impl DropTableProcedure { false, ) .await?; + self.context + .deregister_failure_detectors(convert_region_routes_to_detecting_regions( + &self.data.physical_region_routes, + )) + .await; self.data.state = DropTableState::DeleteTombstone; Ok(Status::executing(true)) diff --git a/src/common/meta/src/ddl/purge_dropped_table.rs b/src/common/meta/src/ddl/purge_dropped_table.rs index 90adaf6538..b95e5db976 100644 --- a/src/common/meta/src/ddl/purge_dropped_table.rs +++ b/src/common/meta/src/ddl/purge_dropped_table.rs @@ -29,7 +29,7 @@ use table::table_name::TableName; use crate::ddl::DdlContext; use crate::ddl::drop_table::executor::DropTableExecutor; -use crate::ddl::undrop_table::open_regions; +use crate::ddl::undrop_table::open_regions_ignore_region_not_found; use crate::ddl::utils::{ convert_region_routes_to_detecting_regions, is_metric_engine_logical_table, map_to_procedure_error, @@ -90,7 +90,7 @@ impl PurgeDroppedTableProcedure { async fn on_open_regions(&mut self) -> Result { if let Some(region_routes) = self.data.physical_region_routes() { - open_regions( + open_regions_ignore_region_not_found( &self.context, self.data.table_id(), self.data.table_name(), diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 4f10999d74..9a825a616c 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -14,17 +14,19 @@ use std::assert_matches; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; +use std::sync::{Arc, Mutex as StdMutex}; +use api::region::RegionResponse; use api::v1::region::{RegionRequest, region_request}; use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt, StackError}; use common_error::status_code::StatusCode; use common_procedure::Procedure; use common_procedure_test::{ execute_procedure_until, execute_procedure_until_done, new_test_procedure_context, }; +use snafu::ResultExt; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use table::metadata::TableId; @@ -42,7 +44,7 @@ use crate::ddl::test_util::{ }; use crate::ddl::undrop_table::UndropTableProcedure; use crate::ddl::{DetectingRegion, RegionFailureDetectorController, TableMetadata}; -use crate::error::Error; +use crate::error::{self, Error}; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::kv_backend::memory::MemoryKvBackend; @@ -333,7 +335,9 @@ async fn test_soft_drop_closes_regions_and_keeps_tombstone() { #[tokio::test] async fn test_hard_drop_keeps_delete_tombstone_flow() { let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(node_manager); + let detector_controller = Arc::new(RecordingRegionFailureDetectorController::default()); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.region_failure_detector_controller = detector_controller.clone(); let table_id = 1024; let table_name = "foo"; let task = test_create_table_task(table_name, table_id); @@ -372,6 +376,10 @@ async fn test_hard_drop_keeps_delete_tombstone_flow() { .await .unwrap(); assert!(dropped_table.is_none()); + assert_eq!( + detector_controller.deregistered().await, + vec![(1, RegionId::new(table_id, 1))] + ); } #[tokio::test] @@ -807,6 +815,54 @@ async fn test_undrop_table_fails_when_live_name_is_created_after_prepare() { assert_eq!(live_table.table_id(), live_table_id); } +#[tokio::test] +async fn test_undrop_table_replayed_restore_metadata_is_idempotent() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + let mut drop_procedure = DropTableProcedure::new( + new_drop_table_task(table_name, table_id, false), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut drop_procedure).await; + + let mut procedure = + UndropTableProcedure::new(new_undrop_table_task(table_id), ddl_context.clone()); + let ctx = new_test_procedure_context(); + procedure.execute(&ctx).await.unwrap(); + let restore_metadata_data = procedure.dump().unwrap(); + procedure.execute(&ctx).await.unwrap(); + + let mut replayed = + UndropTableProcedure::from_json(&restore_metadata_data, ddl_context.clone()).unwrap(); + execute_procedure_until_done(&mut replayed).await; + + let live_table = ddl_context + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table_name, + )) + .await + .unwrap() + .unwrap(); + assert_eq!(live_table.table_id(), table_id); +} + #[tokio::test] async fn test_purge_dropped_table_drops_regions_and_deletes_tombstone() { let (tx, mut rx) = mpsc::channel(8); @@ -950,6 +1006,106 @@ async fn test_purge_dropped_table_by_id_selects_tombstone_when_live_table_exists assert!(rx.try_recv().is_err()); } +#[tokio::test] +async fn test_purge_dropped_table_replayed_open_regions_ignores_dropped_regions() { + let (tx, mut rx) = mpsc::channel(8); + let dropped_regions = Arc::new(StdMutex::new(HashSet::new())); + let datanode_handler = DatanodeWatcher::new(tx).with_handler({ + let dropped_regions = dropped_regions.clone(); + move |_peer, request| { + let Some(body) = request.body.as_ref() else { + return Ok(RegionResponse::new(0)); + }; + match body { + region_request::Body::Open(req) + if dropped_regions.lock().unwrap().contains(&req.region_id) => + { + Err::(BoxedError::new(MockRegionNotFoundError)) + .context(error::ExternalSnafu) + } + region_request::Body::Drop(req) => { + dropped_regions.lock().unwrap().insert(req.region_id); + Ok(RegionResponse::new(0)) + } + _ => Ok(RegionResponse::new(0)), + } + } + }); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); + let mut ddl_context = new_ddl_context(node_manager); + ddl_context.soft_drop_enabled = true; + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + write_route_policy: None, + }]), + HashMap::new(), + ) + .await + .unwrap(); + let mut drop_procedure = DropTableProcedure::new( + new_drop_table_task(table_name, table_id, false), + ddl_context.clone(), + ); + execute_procedure_until_done(&mut drop_procedure).await; + while rx.try_recv().is_ok() {} + + let mut procedure = PurgeDroppedTableProcedure::new( + new_purge_dropped_table_task(table_id), + ddl_context.clone(), + ); + let ctx = new_test_procedure_context(); + procedure.execute(&ctx).await.unwrap(); + let open_regions_data = procedure.dump().unwrap(); + procedure.execute(&ctx).await.unwrap(); + procedure.execute(&ctx).await.unwrap(); + + let mut replayed = + PurgeDroppedTableProcedure::from_json(&open_regions_data, ddl_context.clone()).unwrap(); + execute_procedure_until_done(&mut replayed).await; + + assert!( + ddl_context + .table_metadata_manager + .get_dropped_table(&drop_procedure.data.task.table_name()) + .await + .unwrap() + .is_none() + ); +} + +#[derive(Debug, snafu::Snafu)] +#[snafu(display("mock region not found"))] +struct MockRegionNotFoundError; + +impl StackError for MockRegionNotFoundError { + fn debug_fmt(&self, _: usize, _: &mut Vec) {} + + fn next(&self) -> Option<&dyn StackError> { + None + } +} + +impl ErrorExt for MockRegionNotFoundError { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn status_code(&self) -> StatusCode { + StatusCode::RegionNotFound + } +} + #[tokio::test] async fn test_on_rollback() { let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); diff --git a/src/common/meta/src/ddl/undrop_table.rs b/src/common/meta/src/ddl/undrop_table.rs index 30360519dd..1e9c2b8650 100644 --- a/src/common/meta/src/ddl/undrop_table.rs +++ b/src/common/meta/src/ddl/undrop_table.rs @@ -18,6 +18,8 @@ use api::v1::region::{ OpenRequest as PbOpenRegionRequest, RegionRequest, RegionRequestHeader, region_request, }; use async_trait::async_trait; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, @@ -206,6 +208,47 @@ pub(crate) async fn open_regions( table_info: &table::metadata::TableInfo, region_routes: &[RegionRoute], region_wal_options: &HashMap, +) -> Result<()> { + open_regions_inner( + context, + table_id, + table_name, + table_info, + region_routes, + region_wal_options, + false, + ) + .await +} + +pub(crate) async fn open_regions_ignore_region_not_found( + context: &DdlContext, + table_id: TableId, + table_name: &TableName, + table_info: &table::metadata::TableInfo, + region_routes: &[RegionRoute], + region_wal_options: &HashMap, +) -> Result<()> { + open_regions_inner( + context, + table_id, + table_name, + table_info, + region_routes, + region_wal_options, + true, + ) + .await +} + +async fn open_regions_inner( + context: &DdlContext, + table_id: TableId, + table_name: &TableName, + table_info: &table::metadata::TableInfo, + region_routes: &[RegionRoute], + region_wal_options: &HashMap, + ignore_region_not_found: bool, ) -> Result<()> { let template = build_template_from_raw_table_info(table_info)?; let builder = CreateRequestBuilder::new(template, None); @@ -244,10 +287,12 @@ pub(crate) async fn open_regions( let datanode = datanode.clone(); let requester = requester.clone(); tasks.push(async move { - requester - .handle(request) - .await - .map_err(add_peer_context_if_needed(datanode)) + if let Err(err) = requester.handle(request).await + && !(ignore_region_not_found && err.status_code() == StatusCode::RegionNotFound) + { + return Err(add_peer_context_if_needed(datanode)(err)); + } + Ok(()) }); } }