diff --git a/Cargo.lock b/Cargo.lock index dff803ce08..a9493edf04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5726,7 +5726,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4a14b140f20d2a8fa2046fbbc5e0947203a3e34c#4a14b140f20d2a8fa2046fbbc5e0947203a3e34c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d8e09e8487c344323ee364b67f7187b1e0a6102c#d8e09e8487c344323ee364b67f7187b1e0a6102c" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", diff --git a/Cargo.toml b/Cargo.toml index f314d2b147..d55fedd621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ etcd-client = { version = "0.17", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a14b140f20d2a8fa2046fbbc5e0947203a3e34c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d8e09e8487c344323ee364b67f7187b1e0a6102c" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 99ecee5166..c478cdb746 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -126,6 +126,7 @@ impl State for DropDatabaseExecutor { &self.physical_region_routes, true, false, + false, ) .await?; info!("Table: {}({}) is dropped", self.table_name, self.table_id); diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 6fca1593b9..55c33330c4 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -162,6 +162,7 @@ impl DropTableProcedure { &self.data.physical_region_routes, false, false, + false, ) .await?; self.data.state = DropTableState::DeleteTombstone; diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index d639aa596f..666e2eef6e 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -219,6 +219,7 @@ impl DropTableExecutor { region_routes: &[RegionRoute], fast_path: bool, force: bool, + partial_drop: bool, ) -> Result<()> { // Drops leader regions on datanodes. let leaders = find_leaders(region_routes); @@ -243,6 +244,7 @@ impl DropTableExecutor { region_id: region_id.as_u64(), fast_path, force, + partial_drop, })), }; let datanode = datanode.clone(); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 4a48f130ff..0e4b9e7bc7 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -1750,6 +1750,7 @@ mod tests { RegionRequest::Drop(RegionDropRequest { fast_path: false, force: false, + partial_drop: false, }), ) .await @@ -1851,6 +1852,7 @@ mod tests { RegionRequest::Drop(RegionDropRequest { fast_path: false, force: false, + partial_drop: false, }), ) .await diff --git a/src/meta-srv/src/procedure/repartition/deallocate_region.rs b/src/meta-srv/src/procedure/repartition/deallocate_region.rs index 8566b77774..12233c27e7 100644 --- a/src/meta-srv/src/procedure/repartition/deallocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/deallocate_region.rs @@ -133,6 +133,7 @@ impl DeallocateRegion { region_routes, false, true, + true, ) .await .context(error::DeallocateRegionsSnafu { table_id })?; diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index f84906960c..7a1efedac4 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -919,6 +919,7 @@ mod test { RegionRequest::Drop(RegionDropRequest { fast_path: false, force: false, + partial_drop: false, }), ) .await @@ -934,6 +935,7 @@ mod test { RegionRequest::Drop(RegionDropRequest { fast_path: false, force: true, + partial_drop: false, }), ) .await diff --git a/src/metric-engine/src/engine/drop.rs b/src/metric-engine/src/engine/drop.rs index 6cd5f22a78..0b3d876fd8 100644 --- a/src/metric-engine/src/engine/drop.rs +++ b/src/metric-engine/src/engine/drop.rs @@ -74,7 +74,9 @@ impl MetricEngineInner { if is_physical_region_busy && force { info!("Dropping physical region {} with force", data_region_id); } - return self.drop_physical_region(data_region_id).await; + return self + .drop_physical_region(data_region_id, req.partial_drop) + .await; } if fast_path { @@ -105,7 +107,11 @@ impl MetricEngineInner { } } - async fn drop_physical_region(&self, region_id: RegionId) -> Result { + async fn drop_physical_region( + &self, + region_id: RegionId, + partial_drop: bool, + ) -> Result { let data_region_id = utils::to_data_region_id(region_id); let metadata_region_id = utils::to_metadata_region_id(region_id); @@ -118,6 +124,7 @@ impl MetricEngineInner { RegionRequest::Drop(RegionDropRequest { fast_path: false, force: false, + partial_drop, }), ) .await @@ -128,6 +135,7 @@ impl MetricEngineInner { RegionRequest::Drop(RegionDropRequest { fast_path: false, force: false, + partial_drop, }), ) .await diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 1231ec805c..b3da775117 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -74,6 +74,7 @@ async fn test_engine_drop_region_with_format(flat_format: bool) { RegionRequest::Drop(RegionDropRequest { fast_path: false, force: false, + partial_drop: false, }), ) .await @@ -111,6 +112,7 @@ async fn test_engine_drop_region_with_format(flat_format: bool) { RegionRequest::Drop(RegionDropRequest { fast_path: false, force: false, + partial_drop: false, }), ) .await @@ -258,6 +260,7 @@ async fn test_engine_drop_region_for_custom_store_with_format(flat_format: bool) RegionRequest::Drop(RegionDropRequest { fast_path: false, force: false, + partial_drop: false, }), ) .await diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 8f96f4013b..4210afefa1 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -39,8 +39,8 @@ use store_api::region_engine::{ use store_api::region_request::{ AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, - RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, - RegionRequest, RegionTruncateRequest, + RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest, + RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; use store_api::storage::{FileId, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -687,10 +687,10 @@ impl WorkerRequest { sender: sender.into(), request: DdlRequest::Create(v), }), - RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest { + RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, sender: sender.into(), - request: DdlRequest::Drop, + request: DdlRequest::Drop(v), }), RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, @@ -845,7 +845,7 @@ impl WorkerRequest { #[derive(Debug)] pub(crate) enum DdlRequest { Create(RegionCreateRequest), - Drop, + Drop(RegionDropRequest), Open((RegionOpenRequest, Option)), Close(RegionCloseRequest), Alter(RegionAlterRequest), diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index fbdca288b9..dacec92772 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -1095,7 +1095,10 @@ impl RegionWorkerLoop { for ddl in ddl_requests.drain(..) { let res = match ddl.request { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, - DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await, + DdlRequest::Drop(req) => { + self.handle_drop_request(ddl.region_id, req.partial_drop) + .await + } DdlRequest::Open((req, wal_entry_receiver)) => { self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender) .await; diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index c4d4b21e31..5d7149768c 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -41,6 +41,7 @@ where pub(crate) async fn handle_drop_request( &mut self, region_id: RegionId, + partial_drop: bool, ) -> Result { let region = self.regions.writable_region(region_id)?; @@ -113,10 +114,6 @@ where let gc_enabled = self.file_ref_manager.is_gc_enabled(); common_runtime::spawn_global(async move { - let gc_duration = listener - .on_later_drop_begin(region_id) - .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC)); - let removed = if gc_enabled { later_drop_task_with_global_gc( region_id, @@ -124,10 +121,14 @@ where path_type, object_store, dropping_regions, - gc_duration, + partial_drop, ) .await } else { + let gc_duration = listener + .on_later_drop_begin(region_id) + .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC)); + later_drop_task_without_global_gc( region_id, region_dir.clone(), @@ -177,7 +178,7 @@ async fn later_drop_task_without_global_gc( region_path, object_store, dropping_regions, - gc_duration, + Some(gc_duration), false, ) .await @@ -188,7 +189,7 @@ async fn remove_region_with_retry( region_path: String, object_store: ObjectStore, dropping_regions: std::sync::Arc, - gc_duration: Duration, + gc_duration: Option, mut force: bool, ) -> bool { for _ in 0..MAX_RETRY_TIMES { @@ -207,7 +208,9 @@ async fn remove_region_with_retry( } Ok(false) => (), } - sleep(gc_duration).await; + if let Some(duration) = gc_duration { + sleep(duration).await; + } // Force recycle after gc duration. force = true; } @@ -226,15 +229,19 @@ async fn later_drop_task_with_global_gc( path_type: PathType, object_store: ObjectStore, dropping_regions: RegionMapRef, - gc_duration: Duration, + partial_drop: bool, ) -> bool { - if path_type == PathType::Metadata { + // For metadata regions or regions marked for full deletion (such as when dropping a table) + // the region directory is forcefully removed immediately. + // + // TODO(discord9): Evaluate removing files instantly rather than waiting for the GC period. + if path_type == PathType::Metadata || !partial_drop { remove_region_with_retry( region_id, region_path, object_store, dropping_regions, - gc_duration, + None, true, ) .await diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 0cd59398dc..8186069920 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -272,6 +272,7 @@ fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> RegionDropRequest { fast_path: drop.fast_path, force: drop.force, + partial_drop: drop.partial_drop, }, )) } @@ -542,6 +543,10 @@ pub struct RegionDropRequest { /// Forces the drop of a physical region and all its associated logical regions. /// Only relevant for physical regions managed by the Metric Engine. pub force: bool, + + /// If true, indicates that only a portion of the region is being dropped, and files may still be referenced by other regions. + /// This is used to prevent deletion of files that are still in use by other regions. + pub partial_drop: bool, } /// Open region request.