diff --git a/Cargo.lock b/Cargo.lock index 3d28a03edb..7bbfd11a58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4280,6 +4280,7 @@ dependencies = [ "cache", "client", "common-base", + "common-catalog", "common-config", "common-error", "common-function", diff --git a/src/common/meta/src/key/table_repart.rs b/src/common/meta/src/key/table_repart.rs index 5aa1782d4e..0c067addde 100644 --- a/src/common/meta/src/key/table_repart.rs +++ b/src/common/meta/src/key/table_repart.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt::Display; +use futures::TryStreamExt; use serde::{Deserialize, Serialize}; use snafu::{OptionExt as _, ResultExt, ensure}; use store_api::storage::RegionId; @@ -28,7 +29,9 @@ use crate::key::{ }; use crate::kv_backend::KvBackendRef; use crate::kv_backend::txn::Txn; -use crate::rpc::store::BatchGetRequest; +use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream}; +use crate::rpc::KeyValue; +use crate::rpc::store::{BatchGetRequest, RangeRequest}; /// The key stores table repartition metadata. /// Specifically, it records the relation between source and destination regions after a repartition operation is completed. @@ -138,6 +141,13 @@ impl MetadataValue for TableRepartValue { pub type TableRepartValueDecodeResult = Result>>; +/// Decodes `KeyValue` to [TableRepartKey] and [TableRepartValue]. +pub fn table_repart_decoder(kv: KeyValue) -> Result<(TableRepartKey, TableRepartValue)> { + let key = TableRepartKey::from_bytes(&kv.key)?; + let value = TableRepartValue::try_from_raw_value(&kv.value)?; + Ok((key, value)) +} + pub struct TableRepartManager { kv_backend: KvBackendRef, } @@ -147,6 +157,25 @@ impl TableRepartManager { Self { kv_backend } } + /// Returns all table repartition entries. + pub async fn table_reparts(&self) -> Result> { + let prefix = TableRepartKey::range_prefix(); + let req = RangeRequest::new().with_prefix(prefix); + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + table_repart_decoder, + ) + .into_stream(); + + let res = stream.try_collect::>().await?; + Ok(res + .into_iter() + .map(|(key, value)| (key.table_id, value)) + .collect()) + } + /// Builds a create table repart transaction, /// it expected the `__table_repart/{table_id}` wasn't occupied. pub fn build_create_txn( diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 54dc172102..5487d05d47 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -18,6 +18,7 @@ async-trait.workspace = true bytes.workspace = true client.workspace = true common-base.workspace = true +common-catalog.workspace = true common-config.workspace = true common-error.workspace = true common-function.workspace = true diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index e202ce9f2c..3c62015179 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -244,7 +244,19 @@ impl DatanodeBuilder { table_id_schema_cache, schema_cache, )); - let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id))); + + let gc_enabled = self.opts.region_engine.iter().any(|engine| { + if let RegionEngineConfig::Mito(config) = engine { + config.gc.enable + } else { + false + } + }); + + let file_ref_manager = Arc::new(FileReferenceManager::with_gc_enabled( + Some(node_id), + gc_enabled, + )); let region_server = self .new_region_server( schema_metadata_manager, @@ -331,6 +343,7 @@ impl DatanodeBuilder { &self.opts, region_server.clone(), meta_client, + self.kv_backend.clone(), cache_invalidator, self.plugins.clone(), stat, diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 21eda4c71a..4d2e9119f5 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -332,13 +332,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid arguments for GC: {}", msg))] - InvalidGcArgs { - msg: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to list SST entries from storage"))] ListStorageSsts { #[snafu(implicit)] @@ -455,11 +448,9 @@ impl ErrorExt for Error { AsyncTaskExecute { source, .. } => source.status_code(), - CreateDir { .. } - | RemoveDir { .. } - | ShutdownInstance { .. } - | DataFusion { .. } - | InvalidGcArgs { .. } => StatusCode::Internal, + CreateDir { .. } | RemoveDir { .. } | ShutdownInstance { .. } | DataFusion { .. } => { + StatusCode::Internal + } RegionNotFound { .. } => StatusCode::RegionNotFound, RegionNotReady { .. } => StatusCode::RegionNotReady, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index ee538e1280..be662dfe94 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -31,6 +31,7 @@ use common_meta::heartbeat::handler::{ }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; +use common_meta::kv_backend::KvBackendRef; use common_stat::ResourceStatRef; use common_telemetry::{debug, error, info, trace, warn}; use common_workload::DatanodeWorkloadType; @@ -79,6 +80,7 @@ impl HeartbeatTask { opts: &DatanodeOptions, region_server: RegionServer, meta_client: MetaClientRef, + kv_backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef, plugins: Plugins, resource_stat: ResourceStatRef, @@ -94,7 +96,7 @@ impl HeartbeatTask { Arc::new(ParseMailboxMessageHandler), Arc::new(SuspendHandler::new(region_server.suspend_state())), Arc::new( - RegionHeartbeatResponseHandler::new(region_server.clone()) + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend) .with_open_region_parallelism(opts.init_regions_parallelism), ), Arc::new(InvalidateCacheHandler::new(cache_invalidator)), diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index daca5eb0a4..da09144d24 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -18,6 +18,7 @@ use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; use common_meta::instruction::{Instruction, InstructionReply}; +use common_meta::kv_backend::KvBackendRef; use common_telemetry::error; use snafu::OptionExt; use store_api::storage::GcReport; @@ -56,6 +57,7 @@ pub struct RegionHeartbeatResponseHandler { flush_tasks: TaskTracker<()>, open_region_parallelism: usize, gc_tasks: TaskTracker, + kv_backend: KvBackendRef, } #[async_trait::async_trait] @@ -70,27 +72,29 @@ pub trait InstructionHandler: Send + Sync { #[derive(Clone)] pub struct HandlerContext { - region_server: RegionServer, - downgrade_tasks: TaskTracker<()>, - flush_tasks: TaskTracker<()>, - gc_tasks: TaskTracker, + pub region_server: RegionServer, + pub downgrade_tasks: TaskTracker<()>, + pub flush_tasks: TaskTracker<()>, + pub gc_tasks: TaskTracker, + pub kv_backend: KvBackendRef, } impl HandlerContext { #[cfg(test)] - pub fn new_for_test(region_server: RegionServer) -> Self { + pub fn new_for_test(region_server: RegionServer, kv_backend: KvBackendRef) -> Self { Self { region_server, downgrade_tasks: TaskTracker::new(), flush_tasks: TaskTracker::new(), gc_tasks: TaskTracker::new(), + kv_backend, } } } impl RegionHeartbeatResponseHandler { /// Returns the [RegionHeartbeatResponseHandler]. - pub fn new(region_server: RegionServer) -> Self { + pub fn new(region_server: RegionServer, kv_backend: KvBackendRef) -> Self { Self { region_server, downgrade_tasks: TaskTracker::new(), @@ -98,6 +102,7 @@ impl RegionHeartbeatResponseHandler { // Default to half of the number of CPUs. open_region_parallelism: (num_cpus::get() / 2).max(1), gc_tasks: TaskTracker::new(), + kv_backend, } } @@ -254,6 +259,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { downgrade_tasks: self.downgrade_tasks.clone(), flush_tasks: self.flush_tasks.clone(), gc_tasks: self.gc_tasks.clone(), + kv_backend: self.kv_backend.clone(), }; let _handle = common_runtime::spawn_global(async move { let reply = handler.handle(&context, instruction).await; @@ -285,6 +291,7 @@ mod tests { use common_meta::instruction::{ DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion, }; + use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; @@ -330,7 +337,9 @@ mod tests { fn test_is_acceptable() { common_telemetry::init_default_ut_logging(); let region_server = mock_region_server(); - let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let heartbeat_handler = + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend); let heartbeat_env = HeartbeatResponseTestEnv::new(); let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); @@ -409,7 +418,9 @@ mod tests { common_telemetry::init_default_ut_logging(); let mut region_server = mock_region_server(); - let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let heartbeat_handler = + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend); let mut engine_env = TestEnv::with_prefix("close-region").await; let engine = engine_env.create_engine(MitoConfig::default()).await; @@ -457,7 +468,9 @@ mod tests { common_telemetry::init_default_ut_logging(); let mut region_server = mock_region_server(); - let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let heartbeat_handler = + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend); let mut engine_env = TestEnv::with_prefix("open-region").await; let engine = engine_env.create_engine(MitoConfig::default()).await; @@ -505,7 +518,9 @@ mod tests { common_telemetry::init_default_ut_logging(); let mut region_server = mock_region_server(); - let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let heartbeat_handler = + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend); let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await; let engine = engine_env.create_engine(MitoConfig::default()).await; @@ -537,7 +552,9 @@ mod tests { common_telemetry::init_default_ut_logging(); let mut region_server = mock_region_server(); - let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let heartbeat_handler = + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend); let mut engine_env = TestEnv::with_prefix("downgrade-region").await; let engine = engine_env.create_engine(MitoConfig::default()).await; diff --git a/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs b/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs index 1ad5baa56a..7f45703355 100644 --- a/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs +++ b/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs @@ -117,6 +117,7 @@ mod tests { use std::sync::Arc; use common_meta::instruction::RemapManifest; + use common_meta::kv_backend::memory::MemoryKvBackend; use datatypes::value::Value; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; @@ -137,7 +138,8 @@ mod tests { let mut mock_region_server = mock_region_server(); let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME); mock_region_server.register_engine(mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let region_id = RegionId::new(1024, 1); let reply = ApplyStagingManifestsHandler .handle( @@ -168,7 +170,8 @@ mod tests { region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0))); }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let region_id = RegionId::new(1024, 1); let reply = ApplyStagingManifestsHandler .handle( @@ -231,7 +234,8 @@ mod tests { region_server.register_engine(Arc::new(engine.clone())); prepare_region(®ion_server).await; - let handler_context = HandlerContext::new_for_test(region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(region_server, kv_backend); let region_id2 = RegionId::new(1024, 2); let reply = RemapManifestHandler .handle( diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index 770d6a75cc..98f35c7de1 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -84,6 +84,7 @@ mod tests { use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; use common_meta::heartbeat::mailbox::MessageMeta; use common_meta::instruction::Instruction; + use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; @@ -113,7 +114,9 @@ mod tests { common_telemetry::init_default_ut_logging(); let mut region_server = mock_region_server(); - let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let heartbeat_handler = + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend); let mut engine_env = TestEnv::with_prefix("close-regions").await; let engine = engine_env.create_engine(MitoConfig::default()).await; region_server.register_engine(Arc::new(engine.clone())); diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index 779023a52f..684bb118e2 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -232,6 +232,7 @@ mod tests { use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; use common_meta::heartbeat::mailbox::MessageMeta; use common_meta::instruction::{DowngradeRegion, Instruction}; + use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; @@ -255,7 +256,8 @@ mod tests { let mut mock_region_server = mock_region_server(); let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME); mock_region_server.register_engine(mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let region_id = RegionId::new(1024, 1); let waits = vec![None, Some(Duration::from_millis(100u64))]; @@ -299,7 +301,8 @@ mod tests { })) }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let waits = vec![None, Some(Duration::from_millis(100u64))]; for flush_timeout in waits { @@ -335,7 +338,8 @@ mod tests { })) }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let flush_timeout = Duration::from_millis(100); let reply = DowngradeRegionsHandler @@ -369,7 +373,8 @@ mod tests { })) }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let waits = vec![ Some(Duration::from_millis(100u64)), @@ -432,7 +437,8 @@ mod tests { })) }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let waits = vec![ Some(Duration::from_millis(100u64)), @@ -483,7 +489,8 @@ mod tests { Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound))); }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let reply = DowngradeRegionsHandler .handle( &handler_context, @@ -514,7 +521,8 @@ mod tests { })); }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let reply = DowngradeRegionsHandler .handle( &handler_context, @@ -541,7 +549,9 @@ mod tests { common_telemetry::init_default_ut_logging(); let mut region_server = mock_region_server(); - let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let heartbeat_handler = + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend); let mut engine_env = TestEnv::with_prefix("downgrade-regions").await; let engine = engine_env.create_engine(MitoConfig::default()).await; region_server.register_engine(Arc::new(engine.clone())); diff --git a/src/datanode/src/heartbeat/handler/enter_staging.rs b/src/datanode/src/heartbeat/handler/enter_staging.rs index 2b9e35ded6..e28e067f80 100644 --- a/src/datanode/src/heartbeat/handler/enter_staging.rs +++ b/src/datanode/src/heartbeat/handler/enter_staging.rs @@ -107,6 +107,7 @@ mod tests { use std::sync::Arc; use common_meta::instruction::EnterStagingRegion; + use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; @@ -127,7 +128,8 @@ mod tests { let mut mock_region_server = mock_region_server(); let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME); mock_region_server.register_engine(mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let region_id = RegionId::new(1024, 1); let replies = EnterStagingRegionsHandler .handle( @@ -156,7 +158,8 @@ mod tests { region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0))); }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let replies = EnterStagingRegionsHandler .handle( &handler_context, @@ -194,7 +197,8 @@ mod tests { region_server.register_engine(Arc::new(engine.clone())); prepare_region(®ion_server).await; - let handler_context = HandlerContext::new_for_test(region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(region_server, kv_backend); let replies = EnterStagingRegionsHandler .handle( &handler_context, diff --git a/src/datanode/src/heartbeat/handler/flush_region.rs b/src/datanode/src/heartbeat/handler/flush_region.rs index a86d672eca..dafc92ff0f 100644 --- a/src/datanode/src/heartbeat/handler/flush_region.rs +++ b/src/datanode/src/heartbeat/handler/flush_region.rs @@ -177,6 +177,7 @@ mod tests { use std::sync::{Arc, RwLock}; use common_meta::instruction::{FlushErrorStrategy, FlushRegions}; + use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::engine::MITO_ENGINE_NAME; use store_api::storage::RegionId; @@ -201,7 +202,8 @@ mod tests { }); mock_region_server.register_test_region(*region_id, mock_engine); } - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); // Async hint mode let flush_instruction = FlushRegions::async_batch(region_ids.clone()); @@ -238,7 +240,8 @@ mod tests { })) }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let flush_instruction = FlushRegions::sync_single(region_id); let reply = FlushRegionsHandler @@ -273,7 +276,8 @@ mod tests { })) }); mock_region_server.register_test_region(region_ids[0], mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); // Sync batch with fail-fast strategy let flush_instruction = @@ -304,7 +308,8 @@ mod tests { })) }); mock_region_server.register_test_region(region_ids[0], mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); // Sync batch with try-all strategy let flush_instruction = diff --git a/src/datanode/src/heartbeat/handler/gc_worker.rs b/src/datanode/src/heartbeat/handler/gc_worker.rs index 9329dcb0c6..0ba7ef7a97 100644 --- a/src/datanode/src/heartbeat/handler/gc_worker.rs +++ b/src/datanode/src/heartbeat/handler/gc_worker.rs @@ -12,13 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply}; -use common_telemetry::{debug, warn}; -use mito2::gc::LocalGcWorker; -use snafu::{OptionExt, ResultExt, ensure}; -use store_api::storage::{FileRefsManifest, RegionId}; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; -use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu}; +use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply}; +use common_meta::key::table_info::TableInfoManager; +use common_meta::key::table_route::TableRouteManager; +use common_telemetry::{debug, warn}; +use mito2::access_layer::{AccessLayer, AccessLayerRef}; +use mito2::engine::MitoEngine; +use mito2::gc::LocalGcWorker; +use mito2::region::MitoRegionRef; +use snafu::{OptionExt, ResultExt}; +use store_api::path_utils::table_dir; +use store_api::region_request::PathType; +use store_api::storage::{FileRefsManifest, GcReport, RegionId}; +use table::requests::STORAGE_KEY; + +use crate::error::{GcMitoEngineSnafu, GetMetadataSnafu, Result, UnexpectedSnafu}; use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; pub struct GcRegionsHandler; @@ -35,43 +46,80 @@ impl InstructionHandler for GcRegionsHandler { let region_ids = gc_regions.regions.clone(); debug!("Received gc regions instruction: {:?}", region_ids); - let (region_id, gc_worker) = match self - .create_gc_worker( - ctx, - region_ids, - &gc_regions.file_refs_manifest, - gc_regions.full_file_listing, - ) - .await - { - Ok(worker) => worker, - Err(e) => { - return Some(InstructionReply::GcRegions(GcRegionsReply { - result: Err(format!("Failed to create GC worker: {}", e)), - })); - } - }; + if region_ids.is_empty() { + return Some(InstructionReply::GcRegions(GcRegionsReply { + result: Ok(GcReport::default()), + })); + } + // Always use the smallest region id on datanode as the target region id for task tracker + let mut sorted_region_ids = gc_regions.regions.clone(); + sorted_region_ids.sort_by_key(|r| r.region_number()); + let target_region_id = sorted_region_ids[0]; + + // Group regions by table_id + let mut table_to_regions: HashMap> = HashMap::new(); + for rid in region_ids { + table_to_regions + .entry(rid.table_id()) + .or_default() + .push(rid); + } + + let file_refs_manifest = gc_regions.file_refs_manifest.clone(); + let full_file_listing = gc_regions.full_file_listing; + + let ctx_clone = ctx.clone(); let register_result = ctx .gc_tasks .try_register( - region_id, + target_region_id, Box::pin(async move { - debug!("Starting gc worker for region {}", region_id); - let report = gc_worker - .run() - .await - .context(GcMitoEngineSnafu { region_id })?; - debug!("Gc worker for region {} finished", region_id); - Ok(report) + let mut reports = Vec::with_capacity(table_to_regions.len()); + for (table_id, regions) in table_to_regions { + debug!( + "Starting gc worker for table {}, regions: {:?}", + table_id, regions + ); + let gc_worker = GcRegionsHandler::create_gc_worker( + &ctx_clone, + table_id, + regions, + &file_refs_manifest, + full_file_listing, + ) + .await?; + + let report = gc_worker.run().await.context(GcMitoEngineSnafu { + region_id: target_region_id, + })?; + debug!( + "Gc worker for table {} finished, report: {:?}", + table_id, report + ); + reports.push(report); + } + + // Merge reports + let mut merged_report = GcReport::default(); + for report in reports { + merged_report + .deleted_files + .extend(report.deleted_files.into_iter()); + merged_report + .deleted_indexes + .extend(report.deleted_indexes.into_iter()); + } + Ok(merged_report) }), ) .await; + if register_result.is_busy() { - warn!("Another gc task is running for the region: {region_id}"); + warn!("Another gc task is running for the region: {target_region_id}"); return Some(InstructionReply::GcRegions(GcRegionsReply { result: Err(format!( - "Another gc task is running for the region: {region_id}" + "Another gc task is running for the region: {target_region_id}" )), })); } @@ -89,17 +137,15 @@ impl InstructionHandler for GcRegionsHandler { } impl GcRegionsHandler { - /// Create a GC worker for the given region IDs. - /// Return the first region ID(after sort by given region id) and the GC worker. + /// Create a GC worker for the given table and region IDs. async fn create_gc_worker( - &self, ctx: &HandlerContext, - mut region_ids: Vec, + table_id: u32, + region_ids: Vec, file_ref_manifest: &FileRefsManifest, full_file_listing: bool, - ) -> Result<(RegionId, LocalGcWorker)> { - // always use the smallest region id on datanode as the target region id - region_ids.sort_by_key(|r| r.region_number()); + ) -> Result { + debug_assert!(!region_ids.is_empty(), "region_ids should not be empty"); let mito_engine = ctx .region_server @@ -108,50 +154,13 @@ impl GcRegionsHandler { violated: "MitoEngine not found".to_string(), })?; - let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu { - msg: "No region ids provided".to_string(), - })?; - - // also need to ensure all regions are on this datanode - ensure!( - region_ids - .iter() - .all(|rid| mito_engine.find_region(*rid).is_some()), - InvalidGcArgsSnafu { - msg: format!( - "Some regions are not on current datanode:{:?}", - region_ids - .iter() - .filter(|rid| mito_engine.find_region(**rid).is_none()) - .collect::>() - ), - } - ); - - // Find the access layer from one of the regions that exists on this datanode - let access_layer = mito_engine - .find_region(region_id) - .with_context(|| InvalidGcArgsSnafu { - msg: format!( - "None of the regions is on current datanode:{:?}", - region_ids - ), - })? - .access_layer(); - - // if region happen to be dropped before this but after gc scheduler send gc instr, - // need to deal with it properly(it is ok for region to be dropped after GC worker started) - // region not found here can only be drop table/database case, since region migration is prevented by lock in gc procedure - // TODO(discord9): add integration test for this drop case - let mito_regions = region_ids - .iter() - .filter_map(|rid| mito_engine.find_region(*rid).map(|r| (*rid, r))) - .collect(); + let (access_layer, mito_regions) = + Self::get_access_layer(ctx, &mito_engine, table_id, ®ion_ids).await?; let cache_manager = mito_engine.cache_manager(); let gc_worker = LocalGcWorker::try_new( - access_layer.clone(), + access_layer, Some(cache_manager), mito_regions, mito_engine.mito_config().gc.clone(), @@ -160,8 +169,197 @@ impl GcRegionsHandler { full_file_listing, ) .await - .context(GcMitoEngineSnafu { region_id })?; + .context(GcMitoEngineSnafu { + region_id: region_ids[0], + })?; - Ok((region_id, gc_worker)) + Ok(gc_worker) + } + + /// Get the access layer for the given table and region IDs. + /// It also returns the mito regions if they are found in the engine. + /// + /// This method validates: + /// 1. Any found region must be a Leader (not Follower) + /// 2. Any missing region must not be routed to another datanode + /// + /// The AccessLayer is always constructed from table metadata for consistency. + async fn get_access_layer( + ctx: &HandlerContext, + mito_engine: &MitoEngine, + table_id: u32, + region_ids: &[RegionId], + ) -> Result<(AccessLayerRef, BTreeMap>)> { + // 1. Collect mito regions and validate Leader status + let mut mito_regions = BTreeMap::new(); + + for rid in region_ids { + let region = mito_engine.find_region(*rid); + + if let Some(ref r) = region { + // Validation: Check if region is a leader + if r.is_follower() { + return Err(UnexpectedSnafu { + violated: format!( + "Region {} is a follower, cannot perform GC on follower regions", + rid + ), + } + .build()); + } + } + mito_regions.insert(*rid, region); + } + + // 2. Validate that missing regions are not routed to other datanodes + let missing_regions: Vec<_> = mito_regions + .iter() + .filter(|(_, r)| r.is_none()) + .map(|(rid, _)| *rid) + .collect(); + + if !missing_regions.is_empty() { + Self::validate_regions_not_routed_elsewhere(ctx, table_id, &missing_regions).await?; + } + + // 3. Construct AccessLayer directly from table metadata + let access_layer = Self::construct_access_layer(ctx, mito_engine, table_id).await?; + + Ok((access_layer, mito_regions)) + } + + /// Manually construct an access layer from table metadata. + async fn construct_access_layer( + ctx: &HandlerContext, + mito_engine: &MitoEngine, + table_id: u32, + ) -> Result { + let table_info_manager = TableInfoManager::new(ctx.kv_backend.clone()); + let table_info_value = table_info_manager + .get(table_id) + .await + .context(GetMetadataSnafu)? + .with_context(|| UnexpectedSnafu { + violated: format!("Table metadata not found for table {}", table_id), + })?; + + let table_dir = table_dir(&table_info_value.region_storage_path(), table_id); + let storage_name = table_info_value + .table_info + .meta + .options + .extra_options + .get(STORAGE_KEY); + let engine = &table_info_value.table_info.meta.engine; + let path_type = match engine.as_str() { + common_catalog::consts::MITO2_ENGINE => PathType::Bare, + common_catalog::consts::MITO_ENGINE => PathType::Bare, + common_catalog::consts::METRIC_ENGINE => PathType::Data, + _ => PathType::Bare, + }; + + let object_store = if let Some(name) = storage_name { + mito_engine + .object_store_manager() + .find(name) + .cloned() + .with_context(|| UnexpectedSnafu { + violated: format!("Object store {} not found", name), + })? + } else { + mito_engine + .object_store_manager() + .default_object_store() + .clone() + }; + + Ok(Arc::new(AccessLayer::new( + table_dir, + path_type, + object_store, + mito_engine.puffin_manager_factory().clone(), + mito_engine.intermediate_manager().clone(), + ))) + } + + /// Validate that the given regions are not routed to other datanodes. + /// + /// If any region is still active on another datanode (has a leader_peer in route table), + /// this function returns an error to prevent accidental deletion of files + /// that are still in use. + async fn validate_regions_not_routed_elsewhere( + ctx: &HandlerContext, + table_id: u32, + missing_region_ids: &[RegionId], + ) -> Result<()> { + if missing_region_ids.is_empty() { + return Ok(()); + } + + let table_route_manager = TableRouteManager::new(ctx.kv_backend.clone()); + + // Get table route + let table_route = match table_route_manager + .table_route_storage() + .get(table_id) + .await + .context(GetMetadataSnafu)? + { + Some(route) => route, + None => { + // Table route not found, all regions are likely deleted + debug!( + "Table route not found for table {}, regions {:?} are considered deleted", + table_id, missing_region_ids + ); + return Ok(()); + } + }; + + // Get region routes for physical table + let region_routes = match table_route.region_routes() { + Ok(routes) => routes, + Err(_) => { + // Logical table, skip validation + debug!( + "Table {} is a logical table, skipping region route validation", + table_id + ); + return Ok(()); + } + }; + + let region_routes_map: HashMap = region_routes + .iter() + .map(|route| (route.region.id, route)) + .collect(); + + // Check each missing region + for region_id in missing_region_ids { + if let Some(route) = region_routes_map.get(region_id) { + if let Some(leader_peer) = &route.leader_peer { + // Region still has a leader on some datanode. + return Err(UnexpectedSnafu { + violated: format!( + "Region {} is not on this datanode but is routed to datanode {}. \ + GC request may have been sent to wrong datanode.", + region_id, leader_peer.id + ), + } + .build()); + } + + return Err(UnexpectedSnafu { + violated: format!( + "Region {} has no leader in route table; refusing GC without explicit tombstone/deleted state.", + region_id + ), + } + .build()); + } + // Region not in route table: treat as deleted and allow GC. + } + + Ok(()) } } diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 8b39d70d77..5cb3dcee21 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -80,6 +80,7 @@ mod tests { use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; use common_meta::heartbeat::mailbox::MessageMeta; use common_meta::instruction::{Instruction, OpenRegion}; + use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; @@ -119,7 +120,9 @@ mod tests { common_telemetry::init_default_ut_logging(); let mut region_server = mock_region_server(); - let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let heartbeat_handler = + RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend); let mut engine_env = TestEnv::with_prefix("open-regions").await; let engine = engine_env.create_engine(MitoConfig::default()).await; region_server.register_engine(Arc::new(engine.clone())); diff --git a/src/datanode/src/heartbeat/handler/remap_manifest.rs b/src/datanode/src/heartbeat/handler/remap_manifest.rs index f3fcf72710..6fc44cf01e 100644 --- a/src/datanode/src/heartbeat/handler/remap_manifest.rs +++ b/src/datanode/src/heartbeat/handler/remap_manifest.rs @@ -97,6 +97,7 @@ mod tests { use std::sync::Arc; use common_meta::instruction::RemapManifest; + use common_meta::kv_backend::memory::MemoryKvBackend; use datatypes::value::Value; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; @@ -117,7 +118,8 @@ mod tests { let mut mock_region_server = mock_region_server(); let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME); mock_region_server.register_engine(mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let region_id = RegionId::new(1024, 1); let reply = RemapManifestHandler .handle( @@ -147,7 +149,8 @@ mod tests { region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0))); }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let reply = RemapManifestHandler .handle( &handler_context, @@ -207,7 +210,8 @@ mod tests { region_server.register_engine(Arc::new(engine.clone())); prepare_region(®ion_server).await; - let handler_context = HandlerContext::new_for_test(region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(region_server, kv_backend); let region_id2 = RegionId::new(1024, 2); let reply = RemapManifestHandler .handle( diff --git a/src/datanode/src/heartbeat/handler/sync_region.rs b/src/datanode/src/heartbeat/handler/sync_region.rs index 4e719fc811..7f28491183 100644 --- a/src/datanode/src/heartbeat/handler/sync_region.rs +++ b/src/datanode/src/heartbeat/handler/sync_region.rs @@ -97,6 +97,9 @@ impl SyncRegionHandler { #[cfg(test)] mod tests { + use std::sync::Arc; + + use common_meta::kv_backend::memory::MemoryKvBackend; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{RegionRole, SyncRegionFromRequest}; use store_api::storage::RegionId; @@ -111,7 +114,8 @@ mod tests { let (mock_engine, _) = MockRegionEngine::new(METRIC_ENGINE_NAME); mock_region_server.register_engine(mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let handler = SyncRegionHandler; let region_id = RegionId::new(1024, 1); @@ -141,7 +145,8 @@ mod tests { }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let handler = SyncRegionHandler; let sync_region = common_meta::instruction::SyncRegion { @@ -171,7 +176,8 @@ mod tests { }); mock_region_server.register_test_region(region_id, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let handler = SyncRegionHandler; let sync_region = common_meta::instruction::SyncRegion { diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index d89d0d08b2..b2036a6ef4 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -220,9 +220,11 @@ impl InstructionHandler for UpgradeRegionsHandler { #[cfg(test)] mod tests { + use std::sync::Arc; use std::time::Duration; use common_meta::instruction::UpgradeRegion; + use common_meta::kv_backend::memory::MemoryKvBackend; use mito2::engine::MITO_ENGINE_NAME; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -237,8 +239,8 @@ mod tests { let mut mock_region_server = mock_region_server(); let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME); mock_region_server.register_engine(mock_engine); - - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let region_id = RegionId::new(1024, 1); let region_id2 = RegionId::new(1024, 2); @@ -286,7 +288,8 @@ mod tests { }); mock_region_server.register_test_region(region_id, mock_engine.clone()); mock_region_server.register_test_region(region_id2, mock_engine); - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let replay_timeout = Duration::from_millis(100u64); let reply = UpgradeRegionsHandler::new_test() .handle( @@ -330,8 +333,8 @@ mod tests { region_engine.handle_request_delay = Some(Duration::from_secs(100)); }); mock_region_server.register_test_region(region_id, mock_engine); - - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let replay_timeout = Duration::from_millis(100u64); let reply = UpgradeRegionsHandler::new_test() .handle( @@ -365,7 +368,8 @@ mod tests { }); mock_region_server.register_test_region(region_id, mock_engine); let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)]; - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); for replay_timeout in waits { let reply = UpgradeRegionsHandler::new_test() .handle( @@ -420,8 +424,8 @@ mod tests { region_engine.handle_request_delay = Some(Duration::from_millis(100)); }); mock_region_server.register_test_region(region_id, mock_engine); - - let handler_context = HandlerContext::new_for_test(mock_region_server); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend); let reply = UpgradeRegionsHandler::new_test() .handle( &handler_context, diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index e384465d1b..3b8336c9ac 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -92,7 +92,9 @@ impl FrontendBuilder { options: &FrontendOptions, meta_client: meta_client::MetaClientRef, ) -> Self { - let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new()); + use common_meta::kv_backend::memory::MemoryKvBackend; + + let kv_backend = Arc::new(MemoryKvBackend::new()); let layered_cache_registry = Arc::new( common_meta::cache::LayeredCacheRegistryBuilder::default() diff --git a/src/meta-srv/src/gc.rs b/src/meta-srv/src/gc.rs index b1bed015f3..4c4269edb7 100644 --- a/src/meta-srv/src/gc.rs +++ b/src/meta-srv/src/gc.rs @@ -19,6 +19,7 @@ use store_api::storage::RegionId; mod candidate; mod ctx; +mod dropped; mod handler; #[cfg(test)] mod mock; @@ -32,6 +33,8 @@ pub use options::GcSchedulerOptions; pub use procedure::BatchGcProcedure; pub(crate) use scheduler::{GcScheduler, GcTickerRef}; +/// Mapping from region ID to its associated peers (leader and followers). pub type Region2Peers = HashMap)>; +/// Mapping from leader peer to the set of region IDs it is responsible for. pub(crate) type Peer2Regions = HashMap>; diff --git a/src/meta-srv/src/gc/ctx.rs b/src/meta-srv/src/gc/ctx.rs index c1bd3a372c..6f00ff50a4 100644 --- a/src/meta-srv/src/gc/ctx.rs +++ b/src/meta-srv/src/gc/ctx.rs @@ -17,6 +17,7 @@ use std::time::Duration; use common_meta::datanode::RegionStat; use common_meta::key::TableMetadataManagerRef; +use common_meta::key::table_repart::TableRepartValue; use common_meta::key::table_route::PhysicalTableRouteValue; use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher}; use common_telemetry::debug; @@ -26,6 +27,7 @@ use table::metadata::TableId; use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result, TableMetadataManagerSnafu}; +use crate::gc::Region2Peers; use crate::gc::procedure::BatchGcProcedure; use crate::service::mailbox::MailboxRef; @@ -33,16 +35,24 @@ use crate::service::mailbox::MailboxRef; pub(crate) trait SchedulerCtx: Send + Sync { async fn get_table_to_region_stats(&self) -> Result>>; + async fn get_table_reparts(&self) -> Result>; + async fn get_table_route( &self, table_id: TableId, ) -> Result<(TableId, PhysicalTableRouteValue)>; + async fn batch_get_table_route( + &self, + table_ids: &[TableId], + ) -> Result>; + async fn gc_regions( &self, region_ids: &[RegionId], full_file_listing: bool, timeout: Duration, + region_routes_override: Region2Peers, ) -> Result; } @@ -99,6 +109,14 @@ impl SchedulerCtx for DefaultGcSchedulerCtx { Ok(table_to_region_stats) } + async fn get_table_reparts(&self) -> Result> { + self.table_metadata_manager + .table_repart_manager() + .table_reparts() + .await + .context(TableMetadataManagerSnafu) + } + async fn get_table_route( &self, table_id: TableId, @@ -110,14 +128,31 @@ impl SchedulerCtx for DefaultGcSchedulerCtx { .context(TableMetadataManagerSnafu) } + async fn batch_get_table_route( + &self, + table_ids: &[TableId], + ) -> Result> { + self.table_metadata_manager + .table_route_manager() + .batch_get_physical_table_routes(table_ids) + .await + .context(TableMetadataManagerSnafu) + } + async fn gc_regions( &self, region_ids: &[RegionId], full_file_listing: bool, timeout: Duration, + region_routes_override: Region2Peers, ) -> Result { - self.gc_regions_inner(region_ids, full_file_listing, timeout) - .await + self.gc_regions_inner( + region_ids, + full_file_listing, + timeout, + region_routes_override, + ) + .await } } @@ -127,6 +162,7 @@ impl DefaultGcSchedulerCtx { region_ids: &[RegionId], full_file_listing: bool, timeout: Duration, + region_routes_override: Region2Peers, ) -> Result { debug!( "Sending GC instruction for {} regions (full_file_listing: {})", @@ -141,6 +177,7 @@ impl DefaultGcSchedulerCtx { region_ids.to_vec(), full_file_listing, timeout, + region_routes_override, ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); diff --git a/src/meta-srv/src/gc/dropped.rs b/src/meta-srv/src/gc/dropped.rs new file mode 100644 index 0000000000..8d5af4eaca --- /dev/null +++ b/src/meta-srv/src/gc/dropped.rs @@ -0,0 +1,288 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Dropped region GC handling. +//! +//! This module handles garbage collection for "dropped regions". A dropped region is: +//! 1. Recorded in `table_repart` metadata (the only place that still remembers it) +//! 2. No longer exists in `table_route` metadata (removed from region routing) +//! 3. No longer present in any datanode's heartbeat (physically closed) +//! +//! This differs from "active regions" which exist in both metadata and heartbeats. +//! The `table_repart` entry serves as a tombstone that tracks which regions were +//! merged/split and need their files cleaned up. + +use std::collections::{HashMap, HashSet}; +use std::time::Instant; + +use common_meta::key::table_repart::TableRepartValue; +use common_meta::peer::Peer; +use common_telemetry::{debug, warn}; +use store_api::storage::RegionId; +use table::metadata::TableId; + +use crate::error::Result; +use crate::gc::Region2Peers; +use crate::gc::ctx::SchedulerCtx; +use crate::gc::options::GcSchedulerOptions; +use crate::gc::tracker::RegionGcTracker; + +/// Information about a dropped region ready for GC. +#[derive(Debug, Clone)] +pub(crate) struct DroppedRegionInfo { + pub region_id: RegionId, + pub table_id: TableId, + /// The destination regions this region was split into (if any). + #[allow(unused)] + pub dst_regions: HashSet, +} + +/// Result of collecting and assigning dropped regions to peers. +#[derive(Debug, Default)] +pub(crate) struct DroppedRegionAssignment { + /// Dropped regions grouped by the peer responsible for GC. + pub regions_by_peer: HashMap>, + /// Regions that require full file listing (always true for dropped regions). + pub force_full_listing: HashMap>, + /// Override region routes for dropped regions (they have no active route). + pub region_routes_override: HashMap, +} + +/// Collector for dropped regions that need GC. +pub(crate) struct DroppedRegionCollector<'a> { + ctx: &'a dyn SchedulerCtx, + config: &'a GcSchedulerOptions, + tracker: &'a tokio::sync::Mutex, +} + +impl<'a> DroppedRegionCollector<'a> { + pub fn new( + ctx: &'a dyn SchedulerCtx, + config: &'a GcSchedulerOptions, + tracker: &'a tokio::sync::Mutex, + ) -> Self { + Self { + ctx, + config, + tracker, + } + } + + /// Collect and assign dropped regions for GC. + /// + /// This method: + /// 1. Identifies regions in repartition metadata that are no longer active + /// 2. Filters out regions still in cooldown period + /// 3. Assigns each dropped region to an available peer for cleanup + pub async fn collect_and_assign( + &self, + table_reparts: &[(TableId, TableRepartValue)], + ) -> Result { + // get active region ids for all tables involved in repartitioning + let active_region_ids: HashSet = { + let table_ids = table_reparts + .iter() + .map(|(table_id, _)| *table_id) + .collect::>(); + let mut active_region_ids = HashSet::new(); + + let table_routes = self.ctx.batch_get_table_route(&table_ids).await?; + for table_route in table_routes.values() { + for region in &table_route.region_routes { + active_region_ids.insert(region.region.id); + } + } + + active_region_ids + }; + + let dropped_regions = self.identify_dropped_regions(table_reparts, &active_region_ids); + + if dropped_regions.is_empty() { + return Ok(DroppedRegionAssignment::default()); + } + + let dropped_regions = self.filter_by_cooldown(dropped_regions).await; + + if dropped_regions.is_empty() { + return Ok(DroppedRegionAssignment::default()); + } + + self.assign_to_peers(dropped_regions).await + } + + /// Identify dropped regions: regions in `table_repart` but not in table routes. + /// The `assign_to_peers` step later double check they're also absent from `table_route`. + fn identify_dropped_regions( + &self, + table_reparts: &[(TableId, TableRepartValue)], + active_region_ids: &HashSet, + ) -> HashMap>> { + let mut dropped_regions: HashMap>> = + HashMap::new(); + + for (table_id, repart) in table_reparts { + if repart.src_to_dst.is_empty() { + continue; + } + + let entry = dropped_regions.entry(*table_id).or_default(); + for (src_region, dst_regions) in repart.clone().src_to_dst { + if !active_region_ids.contains(&src_region) { + entry.insert(src_region, dst_regions.into_iter().collect()); + } + } + } + + dropped_regions.retain(|_, regions| !regions.is_empty()); + dropped_regions + } + + /// Filter out dropped regions that are still in their cooldown period. + async fn filter_by_cooldown( + &self, + dropped_regions: HashMap>>, + ) -> HashMap>> { + let now = Instant::now(); + let tracker = self.tracker.lock().await; + let mut filtered = HashMap::new(); + + for (table_id, regions) in dropped_regions { + let mut kept = HashMap::new(); + for (region_id, dst_regions) in regions { + if let Some(gc_info) = tracker.get(®ion_id) { + let elapsed = now.saturating_duration_since(gc_info.last_gc_time); + if elapsed < self.config.gc_cooldown_period { + debug!("Skipping dropped region {} due to cooldown", region_id); + continue; + } + } + kept.insert(region_id, dst_regions); + } + + if !kept.is_empty() { + filtered.insert(table_id, kept); + } + } + + filtered + } + + /// Assign dropped regions to available peers for GC execution. + /// + /// For dropped regions, we need to: + /// 1. Find an available peer from the table's current route + /// 2. Use consistent hashing (region_id % peer_count) for load distribution + /// 3. Create route overrides since dropped regions have no active route + async fn assign_to_peers( + &self, + dropped_regions: HashMap>>, + ) -> Result { + let mut assignment = DroppedRegionAssignment::default(); + + for (table_id, regions) in dropped_regions { + let (phy_table_id, table_route) = match self.ctx.get_table_route(table_id).await { + Ok(route) => route, + Err(e) => { + warn!( + "Failed to get table route for table {}: {}, skipping dropped regions", + table_id, e + ); + continue; + } + }; + + if phy_table_id != table_id { + continue; + } + + let active_region_ids: HashSet = table_route + .region_routes + .iter() + .map(|r| r.region.id) + .collect(); + + let mut leader_peers: Vec = table_route + .region_routes + .iter() + .filter_map(|r| r.leader_peer.clone()) + .collect(); + leader_peers.sort_by_key(|peer| peer.id); + leader_peers.dedup_by_key(|peer| peer.id); + + if leader_peers.is_empty() { + warn!( + "No leader peers found for table {}, skipping dropped regions", + table_id + ); + continue; + } + + for (region_id, dst_regions) in regions { + if active_region_ids.contains(®ion_id) { + debug!( + "Skipping dropped region {} since it still exists in table route", + region_id + ); + continue; + } + + let selected_idx = (region_id.as_u64() as usize) % leader_peers.len(); + let peer = leader_peers[selected_idx].clone(); + + let info = DroppedRegionInfo { + region_id, + table_id, + dst_regions, + }; + + assignment + .regions_by_peer + .entry(peer.clone()) + .or_default() + .push(info); + + assignment + .force_full_listing + .entry(peer.clone()) + .or_default() + .insert(region_id); + + assignment + .region_routes_override + .entry(peer.clone()) + .or_default() + .insert(region_id, (peer, Vec::new())); + } + } + + Ok(assignment) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dropped_region_info() { + let info = DroppedRegionInfo { + region_id: RegionId::new(1, 1), + table_id: 1, + dst_regions: HashSet::new(), + }; + assert_eq!(info.region_id, RegionId::new(1, 1)); + assert_eq!(info.table_id, 1); + } +} diff --git a/src/meta-srv/src/gc/handler.rs b/src/meta-srv/src/gc/handler.rs index ddf6d3d977..71d09d6796 100644 --- a/src/meta-srv/src/gc/handler.rs +++ b/src/meta-srv/src/gc/handler.rs @@ -15,61 +15,71 @@ use std::collections::{HashMap, HashSet}; use std::time::Instant; +use common_catalog::consts::MITO_ENGINE; +use common_meta::datanode::{RegionManifestInfo, RegionStat}; use common_meta::peer::Peer; use common_telemetry::{debug, error, info, warn}; use futures::StreamExt; use itertools::Itertools; +use ordered_float::OrderedFloat; +use store_api::region_engine::RegionRole; use store_api::storage::{GcReport, RegionId}; use table::metadata::TableId; use crate::error::Result; +use crate::gc::Region2Peers; use crate::gc::candidate::GcCandidate; +use crate::gc::dropped::DroppedRegionCollector; use crate::gc::scheduler::{GcJobReport, GcScheduler}; use crate::gc::tracker::RegionGcInfo; impl GcScheduler { - /// Iterate through all region stats, find region that might need gc, and send gc instruction to - /// the corresponding datanode with improved parallel processing and retry logic. pub(crate) async fn trigger_gc(&self) -> Result { let start_time = Instant::now(); info!("Starting GC cycle"); - // Step 1: Get all region statistics + // limit gc region scope to regions whose datanode have reported stats(by heartbeat) let table_to_region_stats = self.ctx.get_table_to_region_stats().await?; info!( "Fetched region stats for {} tables", table_to_region_stats.len() ); - // Step 2: Select GC candidates based on our scoring algorithm let per_table_candidates = self.select_gc_candidates(&table_to_region_stats).await?; - if per_table_candidates.is_empty() { + let table_reparts = self.ctx.get_table_reparts().await?; + let dropped_collector = + DroppedRegionCollector::new(self.ctx.as_ref(), &self.config, &self.region_gc_tracker); + let dropped_assignment = dropped_collector.collect_and_assign(&table_reparts).await?; + + if per_table_candidates.is_empty() && dropped_assignment.regions_by_peer.is_empty() { info!("No GC candidates found, skipping GC cycle"); return Ok(Default::default()); } - // Step 3: Aggregate candidates by datanode - let datanode_to_candidates = self + let mut datanode_to_candidates = self .aggregate_candidates_by_datanode(per_table_candidates) .await?; - // TODO(discord9): add deleted regions from repartition mapping + self.merge_dropped_regions(&mut datanode_to_candidates, &dropped_assignment); if datanode_to_candidates.is_empty() { info!("No valid datanode candidates found, skipping GC cycle"); return Ok(Default::default()); } - // Step 4: Process datanodes concurrently with limited parallelism let report = self - .parallel_process_datanodes(datanode_to_candidates) + .parallel_process_datanodes( + datanode_to_candidates, + dropped_assignment.force_full_listing, + dropped_assignment.region_routes_override, + ) .await; let duration = start_time.elapsed(); info!( "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}", - report.per_datanode_reports.len(), // Reuse field for datanode count + report.per_datanode_reports.len(), report.failed_datanodes.len(), duration ); @@ -78,7 +88,19 @@ impl GcScheduler { Ok(report) } - /// Aggregate GC candidates by their corresponding datanode peer. + fn merge_dropped_regions( + &self, + datanode_to_candidates: &mut HashMap>, + assignment: &crate::gc::dropped::DroppedRegionAssignment, + ) { + for (peer, dropped_infos) in &assignment.regions_by_peer { + let entry = datanode_to_candidates.entry(peer.clone()).or_default(); + for info in dropped_infos { + entry.push((info.table_id, dropped_candidate(info.region_id))); + } + } + } + pub(crate) async fn aggregate_candidates_by_datanode( &self, per_table_candidates: HashMap>, @@ -134,6 +156,8 @@ impl GcScheduler { pub(crate) async fn parallel_process_datanodes( &self, datanode_to_candidates: HashMap>, + force_full_listing_by_peer: HashMap>, + region_routes_override_by_peer: HashMap, ) -> GcJobReport { let mut report = GcJobReport::default(); @@ -146,10 +170,25 @@ impl GcScheduler { .map(|(peer, candidates)| { let scheduler = self; let peer_clone = peer.clone(); + let force_full_listing = force_full_listing_by_peer + .get(&peer) + .cloned() + .unwrap_or_default(); + let region_routes_override = region_routes_override_by_peer + .get(&peer) + .cloned() + .unwrap_or_default(); async move { ( peer, - scheduler.process_datanode_gc(peer_clone, candidates).await, + scheduler + .process_datanode_gc( + peer_clone, + candidates, + force_full_listing, + region_routes_override, + ) + .await, ) } }) @@ -181,6 +220,8 @@ impl GcScheduler { &self, peer: Peer, candidates: Vec<(TableId, GcCandidate)>, + force_full_listing: HashSet, + region_routes_override: Region2Peers, ) -> Result { info!( "Starting GC for datanode {} with {} candidate regions", @@ -198,8 +239,9 @@ impl GcScheduler { let (gc_report, fully_listed_regions) = { // Partition regions into full listing and fast listing in a single pass - let batch_full_listing_decisions = - self.batch_should_use_full_listing(&all_region_ids).await; + let batch_full_listing_decisions = self + .batch_should_use_full_listing(&all_region_ids, &force_full_listing) + .await; let need_full_list_regions = batch_full_listing_decisions .iter() @@ -224,7 +266,12 @@ impl GcScheduler { if !fast_list_regions.is_empty() { match self .ctx - .gc_regions(&fast_list_regions, false, self.config.mailbox_timeout) + .gc_regions( + &fast_list_regions, + false, + self.config.mailbox_timeout, + region_routes_override.clone(), + ) .await { Ok(report) => combined_report.merge(report), @@ -245,7 +292,12 @@ impl GcScheduler { if !need_full_list_regions.is_empty() { match self .ctx - .gc_regions(&need_full_list_regions, true, self.config.mailbox_timeout) + .gc_regions( + &need_full_list_regions, + true, + self.config.mailbox_timeout, + region_routes_override, + ) .await { Ok(report) => combined_report.merge(report), @@ -288,11 +340,26 @@ impl GcScheduler { async fn batch_should_use_full_listing( &self, region_ids: &[RegionId], + force_full_listing: &HashSet, ) -> HashMap { let mut result = HashMap::new(); let mut gc_tracker = self.region_gc_tracker.lock().await; let now = Instant::now(); for ®ion_id in region_ids { + if force_full_listing.contains(®ion_id) { + gc_tracker + .entry(region_id) + .and_modify(|info| { + info.last_full_listing_time = Some(now); + info.last_gc_time = now; + }) + .or_insert_with(|| RegionGcInfo { + last_gc_time: now, + last_full_listing_time: Some(now), + }); + result.insert(region_id, true); + continue; + } let use_full_listing = { if let Some(gc_info) = gc_tracker.get(®ion_id) { if let Some(last_full_listing) = gc_info.last_full_listing_time { @@ -320,3 +387,36 @@ impl GcScheduler { result } } + +fn dropped_candidate(region_id: RegionId) -> GcCandidate { + GcCandidate { + region_id, + score: OrderedFloat(0.0), + region_stat: dropped_region_stat(region_id), + } +} + +fn dropped_region_stat(region_id: RegionId) -> RegionStat { + RegionStat { + id: region_id, + rcus: 0, + wcus: 0, + approximate_bytes: 0, + engine: MITO_ENGINE.to_string(), + role: RegionRole::Leader, + num_rows: 0, + memtable_size: 0, + manifest_size: 0, + sst_size: 0, + sst_num: 0, + index_size: 0, + region_manifest: RegionManifestInfo::Mito { + manifest_version: 0, + flushed_entry_id: 0, + file_removed_cnt: 0, + }, + written_bytes: 0, + data_topic_latest_entry_id: 0, + metadata_topic_latest_entry_id: 0, + } +} diff --git a/src/meta-srv/src/gc/mock.rs b/src/meta-srv/src/gc/mock.rs index c5f840f391..67e886fba3 100644 --- a/src/meta-srv/src/gc/mock.rs +++ b/src/meta-srv/src/gc/mock.rs @@ -25,7 +25,9 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use common_catalog::consts::MITO_ENGINE; use common_meta::datanode::{RegionManifestInfo, RegionStat}; +use common_meta::key::table_repart::TableRepartValue; use common_meta::key::table_route::PhysicalTableRouteValue; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; @@ -37,6 +39,7 @@ use table::metadata::TableId; use tokio::sync::mpsc::Sender; use crate::error::Result; +use crate::gc::Region2Peers; use crate::gc::candidate::GcCandidate; use crate::gc::ctx::SchedulerCtx; use crate::gc::options::GcSchedulerOptions; @@ -61,6 +64,7 @@ pub fn new_empty_report_with(region_ids: impl IntoIterator) -> #[derive(Debug, Default)] pub struct MockSchedulerCtx { pub table_to_region_stats: Arc>>>>, + pub table_reparts: Arc>>, pub table_routes: Arc>>, pub file_refs: Arc>>, pub gc_reports: Arc>>, @@ -104,6 +108,12 @@ impl MockSchedulerCtx { self } + #[allow(dead_code)] + pub fn with_table_reparts(self, table_reparts: HashMap) -> Self { + *self.table_reparts.lock().unwrap() = table_reparts; + self + } + /// Set an error to be returned by `get_table_to_region_stats` #[allow(dead_code)] pub fn with_get_table_to_region_stats_error(self, error: crate::error::Error) -> Self { @@ -147,6 +157,16 @@ impl SchedulerCtx for MockSchedulerCtx { .unwrap_or_default()) } + async fn get_table_reparts(&self) -> Result> { + Ok(self + .table_reparts + .lock() + .unwrap() + .iter() + .map(|(table_id, value)| (*table_id, value.clone())) + .collect()) + } + async fn get_table_route( &self, table_id: TableId, @@ -165,11 +185,30 @@ impl SchedulerCtx for MockSchedulerCtx { .unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default()))) } + async fn batch_get_table_route( + &self, + table_ids: &[TableId], + ) -> Result> { + let mut result = HashMap::new(); + for &table_id in table_ids { + let route = self + .table_routes + .lock() + .unwrap() + .get(&table_id) + .cloned() + .unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default())); + result.insert(table_id, route.1); + } + Ok(result) + } + async fn gc_regions( &self, region_ids: &[RegionId], _full_file_listing: bool, _timeout: Duration, + _region_routes_override: Region2Peers, ) -> Result { *self.gc_regions_calls.lock().unwrap() += 1; @@ -368,7 +407,7 @@ fn mock_region_stat( }, rcus: 0, wcus: 0, - engine: "mito".to_string(), + engine: MITO_ENGINE.to_string(), num_rows: 0, memtable_size: 0, manifest_size: 0, diff --git a/src/meta-srv/src/gc/mock/basic.rs b/src/meta-srv/src/gc/mock/basic.rs index 40cb932287..f0455568bf 100644 --- a/src/meta-srv/src/gc/mock/basic.rs +++ b/src/meta-srv/src/gc/mock/basic.rs @@ -31,7 +31,7 @@ async fn test_parallel_process_datanodes_empty() { let env = TestEnv::new(); let report = env .scheduler - .parallel_process_datanodes(HashMap::new()) + .parallel_process_datanodes(HashMap::new(), HashMap::new(), HashMap::new()) .await; assert_eq!(report.per_datanode_reports.len(), 0); @@ -85,7 +85,7 @@ async fn test_parallel_process_datanodes_with_candidates() { )]); let report = scheduler - .parallel_process_datanodes(datanode_to_candidates) + .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; assert_eq!(report.per_datanode_reports.len(), 1); diff --git a/src/meta-srv/src/gc/mock/concurrent.rs b/src/meta-srv/src/gc/mock/concurrent.rs index 0d5bf4af3f..40cbb15168 100644 --- a/src/meta-srv/src/gc/mock/concurrent.rs +++ b/src/meta-srv/src/gc/mock/concurrent.rs @@ -96,7 +96,7 @@ async fn test_concurrent_table_processing_limits() { )]); let report = scheduler - .parallel_process_datanodes(datanode_to_candidates) + .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; // Should process all datanodes @@ -168,7 +168,7 @@ async fn test_datanode_processes_tables_with_partial_gc_failures() { )]); let report = scheduler - .parallel_process_datanodes(datanode_to_candidates) + .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; // Should have one datanode with mixed results @@ -260,6 +260,8 @@ async fn test_region_gc_concurrency_limit() { .process_datanode_gc( peer, candidates.into_iter().map(|c| (table_id, c)).collect(), + HashSet::new(), + HashMap::new(), ) .await .unwrap(); @@ -371,7 +373,7 @@ async fn test_region_gc_concurrency_with_partial_failures() { )]); let report = scheduler - .parallel_process_datanodes(datanode_to_candidates) + .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; let report = report.per_datanode_reports.get(&peer.id).unwrap(); @@ -501,7 +503,7 @@ async fn test_region_gc_concurrency_with_retryable_errors() { candidates.into_iter().map(|c| (table_id, c)).collect(), )]); let report = scheduler - .parallel_process_datanodes(datanode_to_candidates) + .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; let report = report.per_datanode_reports.get(&peer.id).unwrap(); diff --git a/src/meta-srv/src/gc/mock/full_list.rs b/src/meta-srv/src/gc/mock/full_list.rs index 649334938a..6b188c0869 100644 --- a/src/meta-srv/src/gc/mock/full_list.rs +++ b/src/meta-srv/src/gc/mock/full_list.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -76,7 +76,12 @@ async fn test_full_file_listing_first_time_gc() { // First GC - should use full listing since region has never been GC'd let reports = scheduler - .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .process_datanode_gc( + peer.clone(), + vec![(table_id, mock_candidate(region_id))], + HashSet::new(), + HashMap::new(), + ) .await .unwrap(); @@ -143,7 +148,12 @@ async fn test_full_file_listing_interval_enforcement() { // First GC - should use full listing let reports1 = scheduler - .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .process_datanode_gc( + peer.clone(), + vec![(table_id, mock_candidate(region_id))], + HashSet::new(), + HashMap::new(), + ) .await .unwrap(); assert_eq!(reports1.deleted_files.len(), 1); @@ -164,7 +174,12 @@ async fn test_full_file_listing_interval_enforcement() { // Second GC - should use full listing again since interval has passed let _reports2 = scheduler - .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .process_datanode_gc( + peer.clone(), + vec![(table_id, mock_candidate(region_id))], + HashSet::new(), + HashMap::new(), + ) .await .unwrap(); @@ -233,7 +248,12 @@ async fn test_full_file_listing_no_interval_passed() { // First GC - should use full listing let reports1 = scheduler - .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .process_datanode_gc( + peer.clone(), + vec![(table_id, mock_candidate(region_id))], + HashSet::new(), + HashMap::new(), + ) .await .unwrap(); assert_eq!(reports1.deleted_files.len(), 1); @@ -251,7 +271,12 @@ async fn test_full_file_listing_no_interval_passed() { // Second GC immediately - should NOT use full listing since interval hasn't passed let reports2 = scheduler - .process_datanode_gc(peer.clone(), vec![(table_id, mock_candidate(region_id))]) + .process_datanode_gc( + peer.clone(), + vec![(table_id, mock_candidate(region_id))], + HashSet::new(), + HashMap::new(), + ) .await .unwrap(); assert_eq!(reports2.deleted_files.len(), 1); diff --git a/src/meta-srv/src/gc/mock/misc.rs b/src/meta-srv/src/gc/mock/misc.rs index eb5a9de2c2..5aa7256edd 100644 --- a/src/meta-srv/src/gc/mock/misc.rs +++ b/src/meta-srv/src/gc/mock/misc.rs @@ -70,7 +70,7 @@ async fn test_empty_file_refs_manifest() { )]); let report = scheduler - .parallel_process_datanodes(datanode_to_candidates) + .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; assert_eq!(report.per_datanode_reports.len(), 1); @@ -147,7 +147,7 @@ async fn test_multiple_regions_per_table() { )]); let report = scheduler - .parallel_process_datanodes(datanode_to_candidates) + .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; assert_eq!(report.per_datanode_reports.len(), 1); diff --git a/src/meta-srv/src/gc/procedure.rs b/src/meta-srv/src/gc/procedure.rs index 0a6c82bfbd..0165d47859 100644 --- a/src/meta-srv/src/gc/procedure.rs +++ b/src/meta-srv/src/gc/procedure.rs @@ -167,6 +167,9 @@ pub struct BatchGcData { regions: Vec, full_file_listing: bool, region_routes: Region2Peers, + /// Routes assigned by the scheduler for regions missing from table routes. + #[serde(default)] + region_routes_override: Region2Peers, /// Related regions (e.g., for shared files after repartition). /// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value. related_regions: HashMap>, @@ -199,6 +202,7 @@ impl BatchGcProcedure { regions: Vec, full_file_listing: bool, timeout: Duration, + region_routes_override: Region2Peers, ) -> Self { Self { mailbox, @@ -210,6 +214,7 @@ impl BatchGcProcedure { full_file_listing, timeout, region_routes: HashMap::new(), + region_routes_override, related_regions: HashMap::new(), file_refs: FileRefsManifest::default(), gc_report: None, @@ -339,10 +344,16 @@ impl BatchGcProcedure { let regions_to_discover = regions_set.into_iter().collect_vec(); - let (region_to_peer, _) = self + let (mut region_to_peer, _) = self .discover_route_for_regions(®ions_to_discover) .await?; + for (region_id, route) in &self.data.region_routes_override { + region_to_peer + .entry(*region_id) + .or_insert_with(|| route.clone()); + } + self.data.region_routes = region_to_peer; Ok(()) @@ -356,11 +367,19 @@ impl BatchGcProcedure { let related_regions = &self.data.related_regions; let region_routes = &self.data.region_routes; let timeout = self.data.timeout; + let dropped_regions = self + .data + .region_routes_override + .keys() + .collect::>(); // Group regions by datanode to minimize RPC calls let mut datanode2query_regions: HashMap> = HashMap::new(); for region_id in query_regions { + if dropped_regions.contains(region_id) { + continue; + } if let Some((leader, followers)) = region_routes.get(region_id) { datanode2query_regions .entry(leader.clone()) @@ -403,12 +422,21 @@ impl BatchGcProcedure { let mut all_manifest_versions = HashMap::new(); let mut all_cross_region_refs = HashMap::new(); - for (peer, regions) in datanode2query_regions { + let mut peers = HashSet::new(); + peers.extend(datanode2query_regions.keys().cloned()); + peers.extend(datanode2related_regions.keys().cloned()); + + for peer in peers { + let regions = datanode2query_regions.remove(&peer).unwrap_or_default(); let related_regions_for_peer = datanode2related_regions.remove(&peer).unwrap_or_default(); + if regions.is_empty() && related_regions_for_peer.is_empty() { + continue; + } + let instruction = GetFileRefs { - query_regions: regions.clone(), + query_regions: regions, related_regions: related_regions_for_peer, }; diff --git a/src/meta-srv/src/gc/tracker.rs b/src/meta-srv/src/gc/tracker.rs index 836030c89f..066cafdd47 100644 --- a/src/meta-srv/src/gc/tracker.rs +++ b/src/meta-srv/src/gc/tracker.rs @@ -57,6 +57,13 @@ impl GcScheduler { } } + let table_reparts = self.ctx.get_table_reparts().await?; + for (_, repart) in table_reparts { + for src_region in repart.src_to_dst.keys() { + current_regions.insert(*src_region); + } + } + // Remove stale entries from tracker let mut tracker = self.region_gc_tracker.lock().await; let initial_count = tracker.len(); diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 309f76b621..65bc4f0082 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -147,6 +147,8 @@ use crate::region::opener::PartitionExprFetcherRef; use crate::request::{RegionEditRequest, WorkerRequest}; use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId}; use crate::sst::file_ref::FileReferenceManagerRef; +use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::wal::entry_distributor::{ DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers, }; @@ -303,6 +305,22 @@ impl MitoEngine { self.inner.workers.gc_limiter() } + pub fn object_store_manager(&self) -> &ObjectStoreManagerRef { + self.inner.workers.object_store_manager() + } + + pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory { + self.inner.workers.puffin_manager_factory() + } + + pub fn intermediate_manager(&self) -> &IntermediateManager { + self.inner.workers.intermediate_manager() + } + + pub fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef { + self.inner.workers.schema_metadata_manager() + } + /// Get all tmp ref files for given region ids, excluding files that's already in manifest. pub async fn get_snapshot_of_file_refs( &self, @@ -467,6 +485,11 @@ impl MitoEngine { self.inner.workers.get_region(region_id) } + /// Returns all regions. + pub fn regions(&self) -> Vec { + self.inner.workers.all_regions().collect() + } + fn encode_manifest_info_to_extensions( region_id: &RegionId, manifest_info: RegionManifestInfo, diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index a190484248..7e57d1782c 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -31,7 +31,7 @@ use common_time::Timestamp; use itertools::Itertools; use object_store::{Entry, Lister}; use serde::{Deserialize, Serialize}; -use snafu::ResultExt as _; +use snafu::{ResultExt as _, ensure}; use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId}; use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; use tokio_stream::StreamExt; @@ -41,7 +41,8 @@ use crate::cache::CacheManagerRef; use crate::cache::file_cache::FileType; use crate::config::MitoConfig; use crate::error::{ - DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu, + DurationOutOfRangeSnafu, InvalidRequestSnafu, JoinSnafu, OpenDalSnafu, Result, + TooManyGcJobsSnafu, UnexpectedSnafu, }; use crate::manifest::action::{RegionManifest, RemovedFile}; use crate::metrics::{GC_DELETE_FILE_CNT, GC_ORPHANED_INDEX_FILES, GC_SKIPPED_UNPARSABLE_FILES}; @@ -174,7 +175,7 @@ impl Default for GcConfig { pub struct LocalGcWorker { pub(crate) access_layer: AccessLayerRef, pub(crate) cache_manager: Option, - pub(crate) regions: BTreeMap, + pub(crate) regions: BTreeMap>, /// Lingering time before deleting files. pub(crate) opt: GcConfig, /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries. @@ -222,12 +223,28 @@ impl LocalGcWorker { pub async fn try_new( access_layer: AccessLayerRef, cache_manager: Option, - regions_to_gc: BTreeMap, + regions_to_gc: BTreeMap>, opt: GcConfig, file_ref_manifest: FileRefsManifest, limiter: &GcLimiterRef, full_file_listing: bool, ) -> Result { + if let Some(first_region_id) = regions_to_gc.keys().next() { + let table_id = first_region_id.table_id(); + for region_id in regions_to_gc.keys() { + ensure!( + region_id.table_id() == table_id, + InvalidRequestSnafu { + region_id: *region_id, + reason: format!( + "Region {} does not belong to table {}", + region_id, table_id + ), + } + ); + } + } + let permit = limiter.permit()?; Ok(Self { @@ -269,7 +286,9 @@ impl LocalGcWorker { let tmp_ref_files = self.read_tmp_ref_files().await?; for (region_id, region) in &self.regions { let per_region_time = std::time::Instant::now(); - if region.manifest_ctx.current_state() == RegionRoleState::Follower { + if region.as_ref().map(|r| r.manifest_ctx.current_state()) + == Some(RegionRoleState::Follower) + { return UnexpectedSnafu { reason: format!( "Region {} is in Follower state, should not run GC on follower regions", @@ -282,7 +301,9 @@ impl LocalGcWorker { .get(region_id) .cloned() .unwrap_or_else(HashSet::new); - let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?; + let files = self + .do_region_gc(*region_id, region.clone(), &tmp_ref_files) + .await?; let index_files = files .iter() .filter_map(|f| f.index_version().map(|v| (f.file_id(), v))) @@ -323,44 +344,73 @@ impl LocalGcWorker { /// to avoid deleting files that are still needed. pub async fn do_region_gc( &self, - region: MitoRegionRef, + region_id: RegionId, + region: Option, tmp_ref_files: &HashSet, ) -> Result> { - let region_id = region.region_id(); + debug!( + "Doing gc for region {}, {}", + region_id, + if region.is_some() { + "region found" + } else { + "region not found, might be dropped" + } + ); - debug!("Doing gc for region {}", region_id); + ensure!( + region.is_some() || self.full_file_listing, + InvalidRequestSnafu { + region_id, + reason: "region not found and full_file_listing is false; refusing GC without full listing".to_string(), + } + ); - let manifest = region.manifest_ctx.manifest().await; - // If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use. - let file_ref_manifest_version = self - .file_ref_manifest - .manifest_version - .get(®ion.region_id()) - .cloned(); - if file_ref_manifest_version != Some(manifest.manifest_version) { - // should be rare enough(few seconds after leader update manifest version), just skip gc for this region - warn!( - "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region", - file_ref_manifest_version, - region.region_id(), - manifest.manifest_version - ); - return Ok(vec![]); - } + let manifest = if let Some(region) = ®ion { + let manifest = region.manifest_ctx.manifest().await; + // If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use. + let file_ref_manifest_version = self + .file_ref_manifest + .manifest_version + .get(®ion.region_id()) + .cloned(); + if file_ref_manifest_version != Some(manifest.manifest_version) { + // should be rare enough(few seconds after leader update manifest version), just skip gc for this region + warn!( + "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region", + file_ref_manifest_version, + region.region_id(), + manifest.manifest_version + ); + return Ok(vec![]); + } + Some(manifest) + } else { + None + }; - // do the time consuming listing only when full_file_listing is true - // and do it first to make sure we have the latest manifest etc. - let all_entries = if self.full_file_listing { - self.list_from_object_store(region.region_id(), manifest.clone()) + let all_entries = if let Some(manifest) = &manifest + && self.full_file_listing + { + // do the time consuming listing only when full_file_listing is true(and region is open) + // and do it first to make sure we have the latest manifest etc. + self.list_from_object_store(region_id, manifest.files.len()) + .await? + } else if manifest.is_none() && self.full_file_listing { + // if region is already dropped, we have no manifest to refer to, + // so only do gc if `full_file_listing` is true, otherwise just skip it + // TODO(discord9): is doing one serial listing enough here? + self.list_from_object_store(region_id, Self::CONCURRENCY_LIST_PER_FILES) .await? } else { vec![] }; - let region_id = manifest.metadata.region_id; - let current_files = &manifest.files; - - let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?; + let recently_removed_files = if let Some(manifest) = &manifest { + self.get_removed_files_expel_times(manifest).await? + } else { + Default::default() + }; if recently_removed_files.is_empty() { // no files to remove, skip @@ -372,10 +422,16 @@ impl LocalGcWorker { .map(|s| s.len()) .sum::(); - let in_manifest = current_files - .iter() - .map(|(file_id, meta)| (*file_id, meta.index_version())) - .collect::>(); + let current_files = manifest.as_ref().map(|m| &m.files); + + let in_manifest = if let Some(current_files) = current_files { + current_files + .iter() + .map(|(file_id, meta)| (*file_id, meta.index_version())) + .collect::>() + } else { + Default::default() + }; let in_tmp_ref = tmp_ref_files .iter() @@ -395,8 +451,13 @@ impl LocalGcWorker { let unused_file_cnt = deletable_files.len(); debug!( - "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {} ", - current_files.len(), + "gc: for region{}{region_id}: In manifest files: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {}", + if region.is_none() { + "(region dropped)" + } else { + "" + }, + current_files.map(|c| c.len()).unwrap_or(0), tmp_ref_files.len(), removed_file_cnt, deletable_files.len() @@ -414,8 +475,10 @@ impl LocalGcWorker { "Successfully deleted {} unused files for region {}", unused_file_cnt, region_id ); - self.update_manifest_removed_files(®ion, deletable_files.clone()) - .await?; + if let Some(region) = ®ion { + self.update_manifest_removed_files(region, deletable_files.clone()) + .await?; + } Ok(deletable_files) } @@ -532,11 +595,10 @@ impl LocalGcWorker { async fn list_from_object_store( &self, region_id: RegionId, - manifest: Arc, + file_cnt_hint: usize, ) -> Result> { let start = tokio::time::Instant::now(); - let current_files = &manifest.files; - let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES) + let concurrency = (file_cnt_hint / Self::CONCURRENCY_LIST_PER_FILES) .max(1) .min(self.opt.max_concurrent_lister_per_gc_job); @@ -770,6 +832,8 @@ impl LocalGcWorker { // files that may linger, which means they are not in use but may still be kept for a while let threshold = may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis())); + // TODO(discord9): if region is already closed, maybe handle threshold differently? + // is consider all files to be recently removed acceptable? let mut recently_removed_files = recently_removed_files; let may_linger_files = match threshold { Some(threshold) => recently_removed_files.split_off(&threshold), diff --git a/src/mito2/src/gc/worker_test.rs b/src/mito2/src/gc/worker_test.rs index 5e064b73b3..1baa391744 100644 --- a/src/mito2/src/gc/worker_test.rs +++ b/src/mito2/src/gc/worker_test.rs @@ -33,11 +33,19 @@ use crate::test_util::{ async fn create_gc_worker( mito_engine: &MitoEngine, - regions: BTreeMap, + regions: BTreeMap>, file_ref_manifest: &FileRefsManifest, full_file_listing: bool, ) -> LocalGcWorker { - let access_layer = regions.first_key_value().unwrap().1.access_layer.clone(); + let access_layer = regions + .first_key_value() + .as_ref() + .unwrap() + .1 + .as_ref() + .unwrap() + .access_layer + .clone(); let cache_manager = mito_engine.cache_manager(); LocalGcWorker::try_new( @@ -130,7 +138,7 @@ async fn test_gc_worker_basic_truncate() { ); let version = manifest.manifest_version; - let regions = BTreeMap::from([(region_id, region.clone())]); + let regions = BTreeMap::from([(region_id, Some(region.clone()))]); let file_ref_manifest = FileRefsManifest { file_refs: Default::default(), manifest_version: [(region_id, version)].into(), @@ -225,7 +233,7 @@ async fn test_gc_worker_truncate_with_ref() { ); let version = manifest.manifest_version; - let regions = BTreeMap::from([(region_id, region.clone())]); + let regions = BTreeMap::from([(region_id, Some(region.clone()))]); let file_ref_manifest = FileRefsManifest { file_refs: [( region_id, @@ -311,7 +319,7 @@ async fn test_gc_worker_basic_compact() { let version = manifest.manifest_version; - let regions = BTreeMap::from([(region_id, region.clone())]); + let regions = BTreeMap::from([(region_id, Some(region.clone()))]); let file_ref_manifest = FileRefsManifest { file_refs: Default::default(), manifest_version: [(region_id, version)].into(), @@ -388,7 +396,7 @@ async fn test_gc_worker_compact_with_ref() { let version = manifest.manifest_version; - let regions = BTreeMap::from([(region_id, region.clone())]); + let regions = BTreeMap::from([(region_id, Some(region.clone()))]); let file_ref_manifest = FileRefsManifest { file_refs: HashMap::from([( region_id, diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index 384f071d61..6490d2d1dc 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -42,6 +42,9 @@ pub struct FileReferenceManager { node_id: Option, /// TODO(discord9): use no hash hasher since table id is sequential. files_per_region: DashMap, + /// Whether global GC is enabled. + /// This is only meaningful when using object store (not local filesystem). + gc_enabled: bool, } pub type FileReferenceManagerRef = Arc; @@ -51,9 +54,28 @@ impl FileReferenceManager { Self { node_id, files_per_region: Default::default(), + gc_enabled: false, } } + /// Creates a new FileReferenceManager with GC configuration. + pub fn with_gc_enabled(node_id: Option, gc_enabled: bool) -> Self { + Self { + node_id, + files_per_region: Default::default(), + gc_enabled, + } + } + + /// Returns whether global GC is enabled. + /// + /// This is useful for determining the file deletion strategy: + /// - If GC is enabled (and using object store), files are tracked but not immediately deleted + /// - If GC is disabled (or using local filesystem), files are deleted immediately + pub fn is_gc_enabled(&self) -> bool { + self.gc_enabled + } + fn ref_file_set(&self, region_id: RegionId) -> Option> { let file_refs = if let Some(file_refs) = self.files_per_region.get(®ion_id) { file_refs.clone() diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 41678cdf58..fbdca288b9 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -150,6 +150,14 @@ pub(crate) struct WorkerGroup { file_ref_manager: FileReferenceManagerRef, /// Gc limiter to limit concurrent gc jobs. gc_limiter: GcLimiterRef, + /// Object store manager. + object_store_manager: ObjectStoreManagerRef, + /// Puffin manager factory. + puffin_manager_factory: PuffinManagerFactory, + /// Intermediate manager. + intermediate_manager: IntermediateManager, + /// Schema metadata manager. + schema_metadata_manager: SchemaMetadataManagerRef, } impl WorkerGroup { @@ -260,6 +268,10 @@ impl WorkerGroup { cache_manager, file_ref_manager, gc_limiter, + object_store_manager, + puffin_manager_factory, + intermediate_manager, + schema_metadata_manager, }) } @@ -338,6 +350,22 @@ impl WorkerGroup { .iter() .flat_map(|worker| worker.regions.list_regions()) } + + pub(crate) fn object_store_manager(&self) -> &ObjectStoreManagerRef { + &self.object_store_manager + } + + pub(crate) fn puffin_manager_factory(&self) -> &PuffinManagerFactory { + &self.puffin_manager_factory + } + + pub(crate) fn intermediate_manager(&self) -> &IntermediateManager { + &self.intermediate_manager + } + + pub(crate) fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef { + &self.schema_metadata_manager + } } // Tests methods. @@ -447,6 +475,10 @@ impl WorkerGroup { cache_manager, file_ref_manager, gc_limiter, + object_store_manager, + puffin_manager_factory, + intermediate_manager, + schema_metadata_manager, }) } diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 8afe15bfde..c4d4b21e31 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -23,7 +23,7 @@ use object_store::util::join_path; use object_store::{EntryMode, ObjectStore}; use snafu::ResultExt; use store_api::logstore::LogStore; -use store_api::region_request::AffectedRows; +use store_api::region_request::{AffectedRows, PathType}; use store_api::storage::RegionId; use tokio::time::sleep; @@ -57,6 +57,7 @@ where // Writes dropping marker // We rarely drop a region so we still operate in the worker loop. let region_dir = region.access_layer.build_region_dir(region_id); + let path_type = region.access_layer.path_type(); let table_dir = region.access_layer.table_dir().to_string(); let marker_path = join_path(®ion_dir, DROPPING_MARKER_FILE); region @@ -104,34 +105,46 @@ where self.region_count.dec(); - // Detaches a background task to delete the region dir let object_store = region.access_layer.object_store().clone(); let dropping_regions = self.dropping_regions.clone(); let listener = self.listener.clone(); let intm_manager = self.intermediate_manager.clone(); let cache_manager = self.cache_manager.clone(); + let gc_enabled = self.file_ref_manager.is_gc_enabled(); + common_runtime::spawn_global(async move { let gc_duration = listener .on_later_drop_begin(region_id) .unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC)); - let removed = later_drop_task( - region_id, - region_dir.clone(), - object_store, - dropping_regions, - gc_duration, - ) - .await; + + let removed = if gc_enabled { + later_drop_task_with_global_gc( + region_id, + region_dir.clone(), + path_type, + object_store, + dropping_regions, + gc_duration, + ) + .await + } else { + later_drop_task_without_global_gc( + region_id, + region_dir.clone(), + object_store, + dropping_regions, + gc_duration, + ) + .await + }; + if let Err(err) = intm_manager.prune_region_dir(®ion_id).await { warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id); } - // Clean manifest cache for the region if let Some(write_cache) = cache_manager.write_cache() && let Some(manifest_cache) = write_cache.manifest_cache() { - // We pass the table dir so we can remove the table dir in manifest cache - // when the last region in the same host is dropped. manifest_cache.clean_manifests(&table_dir).await; } @@ -152,14 +165,32 @@ where /// This task will retry on failure and keep running until finished. Any resource /// captured by it will not be released before then. Be sure to only pass weak reference /// if something is depended on ref-count mechanism. -async fn later_drop_task( +async fn later_drop_task_without_global_gc( region_id: RegionId, region_path: String, object_store: ObjectStore, dropping_regions: RegionMapRef, gc_duration: Duration, ) -> bool { - let mut force = false; + remove_region_with_retry( + region_id, + region_path, + object_store, + dropping_regions, + gc_duration, + false, + ) + .await +} + +async fn remove_region_with_retry( + region_id: RegionId, + region_path: String, + object_store: ObjectStore, + dropping_regions: std::sync::Arc, + gc_duration: Duration, + mut force: bool, +) -> bool { for _ in 0..MAX_RETRY_TIMES { let result = remove_region_dir_once(®ion_path, &object_store, force).await; match result { @@ -189,6 +220,31 @@ async fn later_drop_task( false } +async fn later_drop_task_with_global_gc( + region_id: RegionId, + region_path: String, + path_type: PathType, + object_store: ObjectStore, + dropping_regions: RegionMapRef, + gc_duration: Duration, +) -> bool { + if path_type == PathType::Metadata { + remove_region_with_retry( + region_id, + region_path, + object_store, + dropping_regions, + gc_duration, + true, + ) + .await + } else { + // left for global gc + dropping_regions.remove_region(region_id); + true + } +} + // TODO(ruihang): place the marker in a separate dir /// Removes region dir if there is no parquet files, returns whether the directory is removed. /// If `force = true`, always removes the dir. diff --git a/tests-integration/src/tests/gc.rs b/tests-integration/src/tests/gc.rs index 6d72eb36db..585205ed37 100644 --- a/tests-integration/src/tests/gc.rs +++ b/tests-integration/src/tests/gc.rs @@ -203,6 +203,7 @@ async fn test_gc_basic(store_type: &StorageType) { regions.clone(), false, // full_file_listing Duration::from_secs(10), // timeout + Default::default(), ); // Submit the procedure to the procedure manager