From 29bbff3c904adf5d6708cb633ca8e17ca771a8e4 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 18 Nov 2025 10:45:09 +0800 Subject: [PATCH] feat: gc worker only local regions&test (#7203) * feat: gc worker only on local region Signed-off-by: discord9 * more check Signed-off-by: discord9 * chore: stuff Signed-off-by: discord9 * fix: ignore async index file for now Signed-off-by: discord9 * fix: file removal rate calc Signed-off-by: discord9 * chore: per review Signed-off-by: discord9 * chore: per review Signed-off-by: discord9 * clippy Signed-off-by: discord9 --------- Signed-off-by: discord9 --- docs/rfcs/2025-07-23-global-gc-worker.md | 31 ++ src/common/meta/src/datanode.rs | 4 + src/common/meta/src/ddl/tests/alter_table.rs | 2 +- src/common/meta/src/instruction.rs | 2 +- src/common/meta/src/region_registry.rs | 1 + .../src/heartbeat/handler/file_ref.rs | 1 - .../src/heartbeat/handler/gc_worker.rs | 76 ++-- src/datanode/src/region_server.rs | 2 +- .../handler/collect_leader_region_handler.rs | 1 + src/meta-srv/src/handler/failure_handler.rs | 1 + .../src/handler/persist_stats_handler.rs | 1 + .../src/handler/region_lease_handler.rs | 1 + src/meta-srv/src/selector/weight_compute.rs | 3 + src/metric-engine/src/engine/sync.rs | 4 +- src/mito2/src/compaction.rs | 3 +- src/mito2/src/compaction/compactor.rs | 19 +- src/mito2/src/engine.rs | 19 +- src/mito2/src/engine/alter_test.rs | 2 +- src/mito2/src/engine/sync_test.rs | 6 +- src/mito2/src/gc.rs | 198 ++++----- src/mito2/src/gc/worker_test.rs | 401 ++++++++++++++++++ src/mito2/src/manifest/action.rs | 63 +-- src/mito2/src/manifest/checkpointer.rs | 2 + src/mito2/src/manifest/manager.rs | 65 +-- src/mito2/src/metrics.rs | 2 +- src/mito2/src/region.rs | 19 +- src/mito2/src/region/opener.rs | 32 +- src/mito2/src/sst/file_purger.rs | 7 +- src/mito2/src/test_util.rs | 38 +- src/mito2/src/test_util/scheduler_util.rs | 3 +- src/store-api/src/region_engine.rs | 8 +- tests-integration/tests/http.rs | 4 +- 32 files changed, 749 insertions(+), 272 deletions(-) create mode 100644 src/mito2/src/gc/worker_test.rs diff --git a/docs/rfcs/2025-07-23-global-gc-worker.md b/docs/rfcs/2025-07-23-global-gc-worker.md index 69d1e3ac34..331ed01f38 100644 --- a/docs/rfcs/2025-07-23-global-gc-worker.md +++ b/docs/rfcs/2025-07-23-global-gc-worker.md @@ -106,6 +106,37 @@ This mechanism may be too complex to implement at once. We can consider a two-ph Also the read replica shouldn't be later in manifest version for more than the lingering time of obsolete files, otherwise it might ref to files that are already deleted by the GC worker. - need to upload tmp manifest to object storage, which may introduce additional complexity and potential performance overhead. But since long-running queries are typically not frequent, the performance impact is expected to be minimal. +one potential race condition with region-migration is illustrated below: + +```mermaid +sequenceDiagram + participant gc_worker as GC Worker(same dn as region 1) + participant region1 as Region 1 (Leader → Follower) + participant region2 as Region 2 (Follower → Leader) + participant region_dir as Region Directory + + gc_worker->>region1: Start GC, get region manifest + activate region1 + region1-->>gc_worker: Region 1 manifest + deactivate region1 + gc_worker->>region_dir: Scan region directory + + Note over region1,region2: Region Migration Occurs + region1-->>region2: Downgrade to Follower + region2-->>region1: Becomes Leader + + region2->>region_dir: Add new file + + gc_worker->>region_dir: Continue scanning + gc_worker-->>region_dir: Discovers new file + Note over gc_worker: New file not in Region 1's manifest + gc_worker->>gc_worker: Mark file as orphan(incorrectly) +``` +which could cause gc worker to incorrectly mark the new file as orphan and delete it, if config the lingering time for orphan files(files not mentioned anywhere(in used or unused)) is not long enough. + +A good enough solution could be to use lock to prevent gc worker to happen on the region if region migration is happening on the region, and vise versa. + +The race condition between gc worker and repartition also needs to be considered carefully. For now, acquiring lock for both region-migration and repartition during gc worker process could be a simple solution. ## Conclusion and Rationale diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index ffa85b4a7e..8b521d8e43 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -132,6 +132,8 @@ pub enum RegionManifestInfo { Mito { manifest_version: u64, flushed_entry_id: u64, + /// Number of files removed in the manifest's `removed_files` field. + file_removed_cnt: u64, }, Metric { data_manifest_version: u64, @@ -271,9 +273,11 @@ impl From for RegionManifestInfo { store_api::region_engine::RegionManifestInfo::Mito { manifest_version, flushed_entry_id, + file_removed_cnt, } => RegionManifestInfo::Mito { manifest_version, flushed_entry_id, + file_removed_cnt, }, store_api::region_engine::RegionManifestInfo::Metric { data_manifest_version, diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index e16a85b403..a9ba4a0aa8 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -182,7 +182,7 @@ fn alter_request_handler(_peer: Peer, request: RegionRequest) -> Result, /// The file references manifest containing temporary file references. pub file_refs_manifest: FileRefsManifest, diff --git a/src/common/meta/src/region_registry.rs b/src/common/meta/src/region_registry.rs index 1f672d563d..f1741b281b 100644 --- a/src/common/meta/src/region_registry.rs +++ b/src/common/meta/src/region_registry.rs @@ -67,6 +67,7 @@ impl LeaderRegionManifestInfo { RegionManifestInfo::Mito { manifest_version, flushed_entry_id, + file_removed_cnt: _, } => LeaderRegionManifestInfo::Mito { manifest_version, flushed_entry_id, diff --git a/src/datanode/src/heartbeat/handler/file_ref.rs b/src/datanode/src/heartbeat/handler/file_ref.rs index ccad7922b5..9309864bba 100644 --- a/src/datanode/src/heartbeat/handler/file_ref.rs +++ b/src/datanode/src/heartbeat/handler/file_ref.rs @@ -39,7 +39,6 @@ impl InstructionHandler for GetFileRefsHandler { error: Some("MitoEngine not found".to_string()), })); }; - match mito_engine .get_snapshot_of_unmanifested_refs(get_file_refs.region_ids) .await diff --git a/src/datanode/src/heartbeat/handler/gc_worker.rs b/src/datanode/src/heartbeat/handler/gc_worker.rs index 75b0005e93..e936033681 100644 --- a/src/datanode/src/heartbeat/handler/gc_worker.rs +++ b/src/datanode/src/heartbeat/handler/gc_worker.rs @@ -15,7 +15,7 @@ use common_meta::instruction::{GcRegions, GcRegionsReply, InstructionReply}; use common_telemetry::{debug, warn}; use mito2::gc::LocalGcWorker; -use snafu::{OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::{FileRefsManifest, RegionId}; use crate::error::{GcMitoEngineSnafu, InvalidGcArgsSnafu, Result, UnexpectedSnafu}; @@ -35,20 +35,6 @@ impl InstructionHandler for GcRegionsHandler { let region_ids = gc_regions.regions.clone(); debug!("Received gc regions instruction: {:?}", region_ids); - let is_same_table = region_ids.windows(2).all(|w| { - let t1 = w[0].table_id(); - let t2 = w[1].table_id(); - t1 == t2 - }); - if !is_same_table { - return Some(InstructionReply::GcRegions(GcRegionsReply { - result: Err(format!( - "Regions to GC should belong to the same table, found: {:?}", - region_ids - )), - })); - } - let (region_id, gc_worker) = match self .create_gc_worker( ctx, @@ -103,6 +89,8 @@ impl InstructionHandler for GcRegionsHandler { } impl GcRegionsHandler { + /// Create a GC worker for the given region IDs. + /// Return the first region ID(after sort by given region id) and the GC worker. async fn create_gc_worker( &self, ctx: &HandlerContext, @@ -112,22 +100,51 @@ impl GcRegionsHandler { ) -> Result<(RegionId, LocalGcWorker)> { // always use the smallest region id on datanode as the target region id region_ids.sort_by_key(|r| r.region_number()); + + ensure!( + region_ids.windows(2).all(|w| { + let t1 = w[0].table_id(); + let t2 = w[1].table_id(); + t1 == t2 + }), + InvalidGcArgsSnafu { + msg: format!( + "Regions to GC should belong to the same table, found: {:?}", + region_ids + ), + } + ); + let mito_engine = ctx .region_server .mito_engine() .with_context(|| UnexpectedSnafu { violated: "MitoEngine not found".to_string(), })?; - let region_id = *region_ids.first().with_context(|| UnexpectedSnafu { - violated: "No region ids provided".to_string(), + + let region_id = *region_ids.first().with_context(|| InvalidGcArgsSnafu { + msg: "No region ids provided".to_string(), })?; - let mito_config = mito_engine.mito_config(); + // also need to ensure all regions are on this datanode + ensure!( + region_ids + .iter() + .all(|rid| mito_engine.find_region(*rid).is_some()), + InvalidGcArgsSnafu { + msg: format!( + "Some regions are not on current datanode:{:?}", + region_ids + .iter() + .filter(|rid| mito_engine.find_region(**rid).is_none()) + .collect::>() + ), + } + ); // Find the access layer from one of the regions that exists on this datanode - let access_layer = region_ids - .iter() - .find_map(|rid| mito_engine.find_region(*rid)) + let access_layer = mito_engine + .find_region(region_id) .with_context(|| InvalidGcArgsSnafu { msg: format!( "None of the regions is on current datanode:{:?}", @@ -136,14 +153,25 @@ impl GcRegionsHandler { })? .access_layer(); + let mito_regions = region_ids + .iter() + .map(|rid| { + mito_engine + .find_region(*rid) + .map(|r| (*rid, r)) + .with_context(|| InvalidGcArgsSnafu { + msg: format!("Region {} not found on datanode", rid), + }) + }) + .collect::>()?; + let cache_manager = mito_engine.cache_manager(); let gc_worker = LocalGcWorker::try_new( access_layer.clone(), Some(cache_manager), - region_ids.into_iter().collect(), - Default::default(), - mito_config.clone().into(), + mito_regions, + mito_engine.mito_config().gc.clone(), file_ref_manifest.clone(), &mito_engine.gc_limiter(), full_file_listing, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ff80c8b10a..c823bebafe 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -518,7 +518,7 @@ impl RegionServer { let manifest_info = match manifest_info { ManifestInfo::MitoManifestInfo(info) => { - RegionManifestInfo::mito(info.data_manifest_version, 0) + RegionManifestInfo::mito(info.data_manifest_version, 0, 0) } ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric( info.data_manifest_version, diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs index fc81143b82..ddb4cd0ea3 100644 --- a/src/meta-srv/src/handler/collect_leader_region_handler.rs +++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs @@ -73,6 +73,7 @@ mod tests { region_manifest: RegionManifestInfo::Mito { manifest_version, flushed_entry_id: 0, + file_removed_cnt: 0, }, rcus: 0, wcus: 0, diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 7039678654..eb79a1c30d 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -102,6 +102,7 @@ mod tests { region_manifest: RegionManifestInfo::Mito { manifest_version: 0, flushed_entry_id: 0, + file_removed_cnt: 0, }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index abc2fa3c3e..75281f982a 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -294,6 +294,7 @@ mod tests { region_manifest: RegionManifestInfo::Mito { manifest_version: 1, flushed_entry_id: 100, + file_removed_cnt: 0, }, written_bytes, data_topic_latest_entry_id: 200, diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 1dd49cd44e..680af35a06 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -173,6 +173,7 @@ mod test { region_manifest: RegionManifestInfo::Mito { manifest_version: 0, flushed_entry_id: 0, + file_removed_cnt: 0, }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index 4e651e4ecc..6508f78efe 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -195,6 +195,7 @@ mod tests { region_manifest: RegionManifestInfo::Mito { manifest_version: 0, flushed_entry_id: 0, + file_removed_cnt: 0, }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, @@ -224,6 +225,7 @@ mod tests { region_manifest: RegionManifestInfo::Mito { manifest_version: 0, flushed_entry_id: 0, + file_removed_cnt: 0, }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, @@ -253,6 +255,7 @@ mod tests { region_manifest: RegionManifestInfo::Mito { manifest_version: 0, flushed_entry_id: 0, + file_removed_cnt: 0, }, data_topic_latest_entry_id: 0, metadata_topic_latest_entry_id: 0, diff --git a/src/metric-engine/src/engine/sync.rs b/src/metric-engine/src/engine/sync.rs index 741938f8d7..4a2741c12b 100644 --- a/src/metric-engine/src/engine/sync.rs +++ b/src/metric-engine/src/engine/sync.rs @@ -45,7 +45,7 @@ impl MetricEngineInner { .metadata_flushed_entry_id() .unwrap_or_default(); let metadata_region_manifest = - RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id); + RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0); let metadata_synced = self .mito .sync_region(metadata_region_id, metadata_region_manifest) @@ -57,7 +57,7 @@ impl MetricEngineInner { let data_manifest_version = manifest_info.data_manifest_version(); let data_flushed_entry_id = manifest_info.data_flushed_entry_id(); let data_region_manifest = - RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id); + RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0); let data_synced = self .mito diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index be4c12aa1b..e6492722f3 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -1111,9 +1111,8 @@ mod tests { checkpoint_distance: 10, remove_file_options: Default::default(), }, - Default::default(), - Default::default(), FormatType::PrimaryKey, + &Default::default(), ) .await .unwrap(); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 71698471c3..8db9edc2b8 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -172,21 +172,16 @@ pub async fn open_compaction_region( compress_type: manifest_compress_type(mito_config.compress_manifest), checkpoint_distance: mito_config.manifest_checkpoint_distance, remove_file_options: RemoveFileOptions { - keep_count: mito_config.experimental_manifest_keep_removed_file_count, - keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl, + enable_gc: mito_config.gc.enable, }, }; - RegionManifestManager::open( - region_manifest_options, - Default::default(), - Default::default(), - ) - .await? - .context(EmptyRegionDirSnafu { - region_id: req.region_id, - region_dir: ®ion_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type), - })? + RegionManifestManager::open(region_manifest_options, &Default::default()) + .await? + .with_context(|| EmptyRegionDirSnafu { + region_id: req.region_id, + region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type), + })? }; let manifest = manifest_manager.manifest(); diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 587552d02f..dda93d47f4 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -31,7 +31,7 @@ mod catchup_test; #[cfg(test)] mod close_test; #[cfg(test)] -mod compaction_test; +pub(crate) mod compaction_test; #[cfg(test)] mod create_test; #[cfg(test)] @@ -297,15 +297,12 @@ impl MitoEngine { let file_ref_mgr = self.file_ref_manager(); let region_ids = region_ids.into_iter().collect::>(); - - // Convert region IDs to MitoRegionRef objects, error if any region doesn't exist + // Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode + // as regions on other datanodes are not managed by this engine. let regions: Vec = region_ids .into_iter() - .map(|region_id| { - self.find_region(region_id) - .with_context(|| RegionNotFoundSnafu { region_id }) - }) - .collect::>()?; + .filter_map(|region_id| self.find_region(region_id)) + .collect(); file_ref_mgr .get_snapshot_of_unmanifested_refs(regions) @@ -369,7 +366,11 @@ impl MitoEngine { } /// Returns a scanner to scan for `request`. - async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result { + pub(crate) async fn scanner( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { self.scan_region(region_id, request)?.scanner().await } diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 7717bbceb7..d5948da3c1 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -199,7 +199,7 @@ async fn test_alter_region_with_format(flat_format: bool) { assert_eq!(manifests.len(), 1); let (return_region_id, manifest) = manifests.remove(0); assert_eq!(return_region_id, region_id); - assert_eq!(manifest, RegionManifestInfo::mito(2, 1)); + assert_eq!(manifest, RegionManifestInfo::mito(2, 1, 0)); let column_metadatas = parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY).unwrap(); assert_column_metadatas( diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index 5d6d5802f2..6b98d4ba0f 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -151,7 +151,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) { scan_check(&follower_engine, region_id, expected, 0, 0).await; // Returns error since the max manifest is 1 - let manifest_info = RegionManifestInfo::mito(2, 0); + let manifest_info = RegionManifestInfo::mito(2, 0, 0); let err = follower_engine .sync_region(region_id, manifest_info) .await @@ -159,7 +159,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) { let err = err.as_any().downcast_ref::().unwrap(); assert_matches!(err, Error::InstallManifestTo { .. }); - let manifest_info = RegionManifestInfo::mito(1, 0); + let manifest_info = RegionManifestInfo::mito(1, 0, 0); follower_engine .sync_region(region_id, manifest_info) .await @@ -264,7 +264,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) { scan_check(&follower_engine, region_id, expected, 0, 0).await; // Sync the region from the leader engine to the follower engine - let manifest_info = RegionManifestInfo::mito(2, 0); + let manifest_info = RegionManifestInfo::mito(2, 0, 0); follower_engine .sync_region(region_id, manifest_info) .await diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index 822fd6820d..074ec094f0 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -21,7 +21,7 @@ //! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint. //! -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -30,7 +30,7 @@ use common_telemetry::{debug, error, info, warn}; use common_time::Timestamp; use object_store::{Entry, Lister}; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt as _, ensure}; +use snafu::{OptionExt, ResultExt as _}; use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId}; use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; use tokio_stream::StreamExt; @@ -39,15 +39,16 @@ use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ - DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu, - Result, TooManyGcJobsSnafu, UnexpectedSnafu, + DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu, }; -use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; -use crate::manifest::storage::manifest_compress_type; -use crate::metrics::GC_DEL_FILE_CNT; -use crate::region::opener::new_manifest_dir; +use crate::manifest::action::RegionManifest; +use crate::metrics::GC_DELETE_FILE_CNT; +use crate::region::{MitoRegionRef, RegionRoleState}; use crate::sst::file::delete_files; -use crate::sst::location::{self, region_dir_from_table_dir}; +use crate::sst::location::{self}; + +#[cfg(test)] +mod worker_test; /// Limit the amount of concurrent GC jobs on the datanode pub struct GcLimiter { @@ -95,16 +96,18 @@ impl GcLimiter { } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] pub struct GcConfig { /// Whether GC is enabled. pub enable: bool, /// Lingering time before deleting files. /// Should be long enough to allow long running queries to finish. + /// If set to None, then unused files will be deleted immediately. /// /// TODO(discord9): long running queries should actively write tmp manifest files /// to prevent deletion of files they are using. #[serde(with = "humantime_serde")] - pub lingering_time: Duration, + pub lingering_time: Option, /// Lingering time before deleting unknown files(files with undetermine expel time). /// expel time is the time when the file is considered as removed, as in removed from the manifest. /// This should only occur rarely, as manifest keep tracks in `removed_files` field @@ -124,10 +127,10 @@ impl Default for GcConfig { fn default() -> Self { Self { enable: false, - // expect long running queries to be finished within a reasonable time - lingering_time: Duration::from_secs(60 * 5), - // 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer - unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6), + // expect long running queries to be finished(or at least be able to notify it's using a deleted file) within a reasonable time + lingering_time: Some(Duration::from_secs(60)), + // 1 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer + unknown_file_lingering_time: Duration::from_secs(60 * 60), max_concurrent_lister_per_gc_job: 32, max_concurrent_gc_job: 4, } @@ -137,10 +140,9 @@ impl Default for GcConfig { pub struct LocalGcWorker { pub(crate) access_layer: AccessLayerRef, pub(crate) cache_manager: Option, - pub(crate) manifest_mgrs: HashMap, + pub(crate) regions: BTreeMap, /// Lingering time before deleting files. pub(crate) opt: GcConfig, - pub(crate) manifest_open_config: ManifestOpenConfig, /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries. /// /// Also contains manifest versions of regions when the tmp ref files are generated. @@ -186,58 +188,37 @@ impl LocalGcWorker { pub async fn try_new( access_layer: AccessLayerRef, cache_manager: Option, - regions_to_gc: BTreeSet, + regions_to_gc: BTreeMap, opt: GcConfig, - manifest_open_config: ManifestOpenConfig, file_ref_manifest: FileRefsManifest, limiter: &GcLimiterRef, full_file_listing: bool, ) -> Result { - let table_id = regions_to_gc - .first() - .context(UnexpectedSnafu { - reason: "Expect at least one region, found none", - })? - .table_id(); let permit = limiter.permit()?; - let mut zelf = Self { + + Ok(Self { access_layer, cache_manager, - manifest_mgrs: HashMap::new(), + regions: regions_to_gc, opt, - manifest_open_config, file_ref_manifest, _permit: permit, full_file_listing, - }; - - // dedup just in case - for region_id in regions_to_gc { - ensure!( - region_id.table_id() == table_id, - UnexpectedSnafu { - reason: format!( - "All regions should belong to the same table, found region {} and table {}", - region_id, table_id - ), - } - ); - let mgr = zelf.open_mgr_for(region_id).await?; - zelf.manifest_mgrs.insert(region_id, mgr); - } - - Ok(zelf) + }) } /// Get tmp ref files for all current regions /// - /// Outdated regions are added to `outdated_regions` set + /// Outdated regions are added to `outdated_regions` set, which means their manifest version in + /// self.file_ref_manifest is older than the current manifest version on datanode. + /// so they need to retry GC later by metasrv with updated tmp ref files. pub async fn read_tmp_ref_files( &self, outdated_regions: &mut HashSet, ) -> Result>> { - for (region_id, region_mgr) in &self.manifest_mgrs { - let current_version = region_mgr.manifest().manifest_version; + // verify manifest version before reading tmp ref files + for (region_id, mito_region) in &self.regions { + let current_version = mito_region.manifest_ctx.manifest_version().await; if ¤t_version > self .file_ref_manifest @@ -253,7 +234,6 @@ impl LocalGcWorker { outdated_regions.insert(*region_id); } } - // TODO(discord9): verify manifest version before reading tmp ref files let mut tmp_ref_files = HashMap::new(); for (region_id, file_refs) in &self.file_ref_manifest.file_refs { @@ -282,15 +262,22 @@ impl LocalGcWorker { let mut outdated_regions = HashSet::new(); let mut deleted_files = HashMap::new(); let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?; - for region_id in self.manifest_mgrs.keys() { - debug!("Doing gc for region {}", region_id); + for (region_id, region) in &self.regions { + if region.manifest_ctx.current_state() == RegionRoleState::Follower { + return UnexpectedSnafu { + reason: format!( + "Region {} is in Follower state, should not run GC on follower regions", + region_id + ), + } + .fail(); + } let tmp_ref_files = tmp_ref_files .get(region_id) .cloned() .unwrap_or_else(HashSet::new); - let files = self.do_region_gc(*region_id, &tmp_ref_files).await?; + let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?; deleted_files.insert(*region_id, files); - debug!("Gc for region {} finished", region_id); } info!( "LocalGcWorker finished after {} secs.", @@ -319,19 +306,17 @@ impl LocalGcWorker { /// to avoid deleting files that are still needed. pub async fn do_region_gc( &self, - region_id: RegionId, + region: MitoRegionRef, tmp_ref_files: &HashSet, ) -> Result> { + let region_id = region.region_id(); + debug!("Doing gc for region {}", region_id); - let manifest = self - .manifest_mgrs - .get(®ion_id) - .context(RegionNotFoundSnafu { region_id })? - .manifest(); + let manifest = region.manifest_ctx.manifest().await; let region_id = manifest.metadata.region_id; let current_files = &manifest.files; - let recently_removed_files = self.get_removed_files_expel_times(region_id).await?; + let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?; if recently_removed_files.is_empty() { // no files to remove, skip @@ -365,13 +350,11 @@ impl LocalGcWorker { unused_len, region_id ); + // TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once + // index file design is stable let file_pairs: Vec<(FileId, FileId)> = unused_files .iter() - .filter_map(|file_id| { - current_files - .get(file_id) - .map(|meta| (meta.file_id().file_id(), meta.index_file_id().file_id())) - }) + .map(|file_id| (*file_id, *file_id)) .collect(); info!( @@ -386,6 +369,9 @@ impl LocalGcWorker { "Successfully deleted {} unused files for region {}", unused_len, region_id ); + // TODO(discord9): update region manifest about deleted files + self.update_manifest_removed_files(®ion, unused_files.clone()) + .await?; Ok(unused_files) } @@ -401,40 +387,31 @@ impl LocalGcWorker { .await?; // FIXME(discord9): if files are already deleted before calling delete_files, the metric will be inaccurate, no clean way to fix it now - GC_DEL_FILE_CNT.add(file_ids.len() as i64); + GC_DELETE_FILE_CNT.add(file_ids.len() as i64); Ok(()) } - /// Get the manifest manager for the region. - async fn open_mgr_for(&self, region_id: RegionId) -> Result { - let table_dir = self.access_layer.table_dir(); - let path_type = self.access_layer.path_type(); - let mito_config = &self.manifest_open_config; + /// Update region manifest for clear the actually deleted files + async fn update_manifest_removed_files( + &self, + region: &MitoRegionRef, + deleted_files: Vec, + ) -> Result<()> { + debug!( + "Trying to update manifest removed files for region {}", + region.region_id() + ); - let region_manifest_options = RegionManifestOptions { - manifest_dir: new_manifest_dir(®ion_dir_from_table_dir( - table_dir, region_id, path_type, - )), - object_store: self.access_layer.object_store().clone(), - compress_type: manifest_compress_type(mito_config.compress_manifest), - checkpoint_distance: mito_config.manifest_checkpoint_distance, - remove_file_options: RemoveFileOptions { - keep_count: mito_config.experimental_manifest_keep_removed_file_count, - keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl, - }, - }; + let mut manager = region.manifest_ctx.manifest_manager.write().await; + let cnt = deleted_files.len(); + manager.clear_deleted_files(deleted_files); + debug!( + "Updated region_id={} region manifest to clear {cnt} deleted files", + region.region_id(), + ); - RegionManifestManager::open( - region_manifest_options, - Default::default(), - Default::default(), - ) - .await? - .context(EmptyRegionDirSnafu { - region_id, - region_dir: ®ion_dir_from_table_dir(table_dir, region_id, path_type), - }) + Ok(()) } /// Get all the removed files in delta manifest files and their expel times. @@ -443,14 +420,8 @@ impl LocalGcWorker { /// pub async fn get_removed_files_expel_times( &self, - region_id: RegionId, + region_manifest: &Arc, ) -> Result>> { - let region_manifest = self - .manifest_mgrs - .get(®ion_id) - .context(RegionNotFoundSnafu { region_id })? - .manifest(); - let mut ret = BTreeMap::new(); for files in ®ion_manifest.removed_files.removed_files { let expel_time = Timestamp::new_millisecond(files.removed_at); @@ -627,12 +598,17 @@ impl LocalGcWorker { ) -> Result> { let start = tokio::time::Instant::now(); let now = chrono::Utc::now(); - let may_linger_until = now - - chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| { - DurationOutOfRangeSnafu { - input: self.opt.lingering_time, - } - })?; + let may_linger_until = self + .opt + .lingering_time + .map(|lingering_time| { + chrono::Duration::from_std(lingering_time) + .with_context(|_| DurationOutOfRangeSnafu { + input: lingering_time, + }) + .map(|t| now - t) + }) + .transpose()?; let unknown_file_may_linger_until = now - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context( @@ -642,9 +618,15 @@ impl LocalGcWorker { )?; // files that may linger, which means they are not in use but may still be kept for a while - let threshold = Timestamp::new_millisecond(may_linger_until.timestamp_millis()); + let threshold = + may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis())); let mut recently_removed_files = recently_removed_files; - let may_linger_files = recently_removed_files.split_off(&threshold); + let may_linger_files = match threshold { + Some(threshold) => recently_removed_files.split_off(&threshold), + None => BTreeMap::new(), + }; + debug!("may_linger_files: {:?}", may_linger_files); + let may_linger_filenames = may_linger_files.values().flatten().collect::>(); let eligible_for_removal = recently_removed_files diff --git a/src/mito2/src/gc/worker_test.rs b/src/mito2/src/gc/worker_test.rs new file mode 100644 index 0000000000..6e3f5288c0 --- /dev/null +++ b/src/mito2/src/gc/worker_test.rs @@ -0,0 +1,401 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::sync::Arc; + +use api::v1::Rows; +use common_telemetry::init_default_ut_logging; +use store_api::region_engine::RegionEngine as _; +use store_api::region_request::{RegionCompactRequest, RegionRequest}; +use store_api::storage::{FileRefsManifest, RegionId}; + +use crate::config::MitoConfig; +use crate::engine::MitoEngine; +use crate::engine::compaction_test::{delete_and_flush, put_and_flush}; +use crate::gc::{GcConfig, LocalGcWorker}; +use crate::region::MitoRegionRef; +use crate::test_util::{ + CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, rows_schema, +}; + +async fn create_gc_worker( + mito_engine: &MitoEngine, + regions: BTreeMap, + file_ref_manifest: &FileRefsManifest, + full_file_listing: bool, +) -> LocalGcWorker { + let access_layer = regions.first_key_value().unwrap().1.access_layer.clone(); + let cache_manager = mito_engine.cache_manager(); + + LocalGcWorker::try_new( + access_layer, + Some(cache_manager), + regions, + mito_engine.mito_config().gc.clone(), + file_ref_manifest.clone(), + &mito_engine.gc_limiter(), + full_file_listing, + ) + .await + .unwrap() +} + +/// Test insert/flush then truncate can allow gc worker to delete files +#[tokio::test] +async fn test_gc_worker_basic_truncate() { + init_default_ut_logging(); + + let mut env = TestEnv::new().await; + env.log_store = Some(env.create_log_store().await); + // use in memory object store for gc test, so it will use `ObjectStoreFilePurger` + env.object_store_manager = Some(Arc::new(env.create_in_memory_object_store_manager())); + + let engine = env + .new_mito_engine(MitoConfig { + gc: GcConfig { + enable: true, + // for faster delete file + lingering_time: None, + ..Default::default() + }, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + flush_region(&engine, region_id, None).await; + + let region = engine.get_region(region_id).unwrap(); + let manifest = region.manifest_ctx.manifest().await; + + let to_be_deleted_file_id = *manifest.files.iter().next().unwrap().0; + + assert_eq!(manifest.files.len(), 1); + + engine + .handle_request( + region.region_id, + RegionRequest::Truncate(store_api::region_request::RegionTruncateRequest::All), + ) + .await + .unwrap(); + + let manifest = region.manifest_ctx.manifest().await; + assert!( + manifest.removed_files.removed_files[0] + .file_ids + .contains(&to_be_deleted_file_id) + && manifest.removed_files.removed_files[0].file_ids.len() == 1 + && manifest.files.is_empty(), + "Manifest after truncate: {:?}", + manifest + ); + let version = manifest.manifest_version; + + let regions = BTreeMap::from([(region_id, region.clone())]); + let file_ref_manifest = FileRefsManifest { + file_refs: Default::default(), + manifest_version: [(region_id, version)].into(), + }; + let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; + let report = gc_worker.run().await.unwrap(); + assert_eq!( + report.deleted_files.get(®ion_id).unwrap(), + &vec![to_be_deleted_file_id], + ); + assert!(report.need_retry_regions.is_empty()); + + let manifest = region.manifest_ctx.manifest().await; + assert!(manifest.removed_files.removed_files.is_empty() && manifest.files.is_empty()); +} + +/// Truncate with file refs should not delete files +#[tokio::test] +async fn test_gc_worker_truncate_with_ref() { + init_default_ut_logging(); + + let mut env = TestEnv::new().await; + env.log_store = Some(env.create_log_store().await); + // use in memory object store for gc test, so it will use `ObjectStoreFilePurger` + env.object_store_manager = Some(Arc::new(env.create_in_memory_object_store_manager())); + + let engine = env + .new_mito_engine(MitoConfig { + gc: GcConfig { + enable: true, + // for faster delete file + lingering_time: None, + ..Default::default() + }, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + flush_region(&engine, region_id, None).await; + + let region = engine.get_region(region_id).unwrap(); + let manifest = region.manifest_ctx.manifest().await; + + assert_eq!(manifest.files.len(), 1); + + let to_be_deleted_file_id = *manifest.files.iter().next().unwrap().0; + + engine + .handle_request( + region.region_id, + RegionRequest::Truncate(store_api::region_request::RegionTruncateRequest::All), + ) + .await + .unwrap(); + + let manifest = region.manifest_ctx.manifest().await; + assert!( + manifest.removed_files.removed_files[0] + .file_ids + .contains(&to_be_deleted_file_id) + && manifest.removed_files.removed_files[0].file_ids.len() == 1 + && manifest.files.is_empty(), + "Manifest after truncate: {:?}", + manifest + ); + let version = manifest.manifest_version; + + let regions = BTreeMap::from([(region_id, region.clone())]); + let file_ref_manifest = FileRefsManifest { + file_refs: [(region_id, HashSet::from([to_be_deleted_file_id]))].into(), + manifest_version: [(region_id, version)].into(), + }; + let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; + let report = gc_worker.run().await.unwrap(); + assert!(report.deleted_files.get(®ion_id).unwrap().is_empty()); + assert!(report.need_retry_regions.is_empty()); + + let manifest = region.manifest_ctx.manifest().await; + assert!( + manifest.removed_files.removed_files[0].file_ids.len() == 1 && manifest.files.is_empty(), + "Manifest: {:?}", + manifest + ); +} + +/// Test insert/flush then compact can allow gc worker to delete files +#[tokio::test] +async fn test_gc_worker_basic_compact() { + init_default_ut_logging(); + + let mut env = TestEnv::new().await; + env.log_store = Some(env.create_log_store().await); + // use in memory object store for gc test, so it will use `ObjectStoreFilePurger` + env.object_store_manager = Some(Arc::new(env.create_in_memory_object_store_manager())); + + let engine = env + .new_mito_engine(MitoConfig { + gc: GcConfig { + enable: true, + // for faster delete file + lingering_time: None, + ..Default::default() + }, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + put_and_flush(&engine, region_id, &column_schemas, 0..10).await; + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + put_and_flush(&engine, region_id, &column_schemas, 20..30).await; + delete_and_flush(&engine, region_id, &column_schemas, 15..30).await; + put_and_flush(&engine, region_id, &column_schemas, 15..25).await; + + let result = engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let region = engine.get_region(region_id).unwrap(); + let manifest = region.manifest_ctx.manifest().await; + assert_eq!(manifest.removed_files.removed_files[0].file_ids.len(), 3); + + let version = manifest.manifest_version; + + let regions = BTreeMap::from([(region_id, region.clone())]); + let file_ref_manifest = FileRefsManifest { + file_refs: Default::default(), + manifest_version: [(region_id, version)].into(), + }; + + let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; + let report = gc_worker.run().await.unwrap(); + + assert_eq!(report.deleted_files.get(®ion_id).unwrap().len(), 3,); + assert!(report.need_retry_regions.is_empty()); +} + +/// Compact with file refs should not delete files +#[tokio::test] +async fn test_gc_worker_compact_with_ref() { + init_default_ut_logging(); + + let mut env = TestEnv::new().await; + env.log_store = Some(env.create_log_store().await); + // use in memory object store for gc test, so it will use `ObjectStoreFilePurger` + env.object_store_manager = Some(Arc::new(env.create_in_memory_object_store_manager())); + + let engine = env + .new_mito_engine(MitoConfig { + gc: GcConfig { + enable: true, + // for faster delete file + lingering_time: None, + ..Default::default() + }, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + put_and_flush(&engine, region_id, &column_schemas, 0..10).await; + put_and_flush(&engine, region_id, &column_schemas, 10..20).await; + put_and_flush(&engine, region_id, &column_schemas, 20..30).await; + delete_and_flush(&engine, region_id, &column_schemas, 15..30).await; + put_and_flush(&engine, region_id, &column_schemas, 15..25).await; + + let result = engine + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + .unwrap(); + assert_eq!(result.affected_rows, 0); + + let region = engine.get_region(region_id).unwrap(); + let manifest = region.manifest_ctx.manifest().await; + assert_eq!(manifest.removed_files.removed_files[0].file_ids.len(), 3); + + let version = manifest.manifest_version; + + let regions = BTreeMap::from([(region_id, region.clone())]); + let file_ref_manifest = FileRefsManifest { + file_refs: HashMap::from([( + region_id, + manifest.removed_files.removed_files[0] + .file_ids + .iter() + .cloned() + .collect(), + )]), + manifest_version: [(region_id, version)].into(), + }; + + let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await; + let report = gc_worker.run().await.unwrap(); + + assert_eq!(report.deleted_files.get(®ion_id).unwrap().len(), 0); + assert!(report.need_retry_regions.is_empty()); +} diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index af09e6c861..dedb228e25 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -25,10 +25,9 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{FileId, RegionId, SequenceNumber}; use strum::Display; -use crate::error::{ - DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu, -}; +use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu}; use crate::manifest::manager::RemoveFileOptions; +use crate::region::ManifestStats; use crate::sst::FormatType; use crate::sst::file::FileMeta; use crate::wal::EntryId; @@ -236,13 +235,13 @@ impl RegionManifestBuilder { self.flushed_entry_id = truncated_entry_id; self.flushed_sequence = truncated_sequence; self.truncated_entry_id = Some(truncated_entry_id); - self.files.clear(); self.removed_files.add_removed_files( self.files.values().map(|meta| meta.file_id).collect(), truncate .timestamp_ms .unwrap_or_else(|| Utc::now().timestamp_millis()), ); + self.files.clear(); } TruncateKind::Partial { files_to_remove } => { self.removed_files.add_removed_files( @@ -294,6 +293,29 @@ pub struct RemovedFilesRecord { pub removed_files: Vec, } +impl RemovedFilesRecord { + /// Clear the actually deleted files from the list of removed files + pub fn clear_deleted_files(&mut self, deleted_files: Vec) { + let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files); + for files in self.removed_files.iter_mut() { + files.file_ids.retain(|fid| !deleted_file_set.contains(fid)); + } + + self.removed_files.retain(|fs| !fs.file_ids.is_empty()); + } + + pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) { + let cnt = self + .removed_files + .iter() + .map(|r| r.file_ids.len() as u64) + .sum(); + stats + .file_removed_cnt + .store(cnt, std::sync::atomic::Ordering::Relaxed); + } +} + #[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)] pub struct RemovedFiles { /// The timestamp is the time when @@ -306,6 +328,9 @@ pub struct RemovedFiles { impl RemovedFilesRecord { /// Add a record of removed files with the current timestamp. pub fn add_removed_files(&mut self, file_ids: HashSet, at: i64) { + if file_ids.is_empty() { + return; + } self.removed_files.push(RemovedFiles { removed_at: at, file_ids, @@ -313,35 +338,13 @@ impl RemovedFilesRecord { } pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> { - let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum(); - if opt.keep_count > 0 && total_removed_files <= opt.keep_count { + if !opt.enable_gc { + // If GC is not enabled, always keep removed files empty. + self.removed_files.clear(); return Ok(()); } - let mut cur_file_cnt = total_removed_files; - - let can_evict_until = chrono::Utc::now() - - chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu { - input: opt.keep_ttl, - })?; - - self.removed_files.sort_unstable_by_key(|f| f.removed_at); - let updated = std::mem::take(&mut self.removed_files) - .into_iter() - .filter_map(|f| { - if f.removed_at < can_evict_until.timestamp_millis() - && (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count) - { - // can evict all files - // TODO(discord9): maybe only evict to below keep_count? Maybe not, or the update might be too frequent. - cur_file_cnt -= f.file_ids.len(); - None - } else { - Some(f) - } - }) - .collect(); - self.removed_files = updated; + // if GC is enabled, rely on gc worker to delete files, and evict removed files based on options. Ok(()) } diff --git a/src/mito2/src/manifest/checkpointer.rs b/src/mito2/src/manifest/checkpointer.rs index 3f3164ad93..3cb0694e71 100644 --- a/src/mito2/src/manifest/checkpointer.rs +++ b/src/mito2/src/manifest/checkpointer.rs @@ -129,6 +129,8 @@ impl Checkpointer { manifest.removed_files.evict_old_removed_files(opt)?; + // TODO(discord9): consider also check object store to clear removed files that are already deleted? How costly it is? + Ok(manifest) } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index b65d9c840d..ab6c9dd26d 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -21,6 +21,7 @@ use futures::TryStreamExt; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::RegionMetadataRef; +use store_api::storage::FileId; use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion}; use crate::error::{ @@ -35,7 +36,7 @@ use crate::manifest::storage::{ ManifestObjectStore, file_version, is_checkpoint_file, is_delta_file, }; use crate::metrics::MANIFEST_OP_ELAPSED; -use crate::region::{RegionLeaderState, RegionRoleState}; +use crate::region::{ManifestStats, RegionLeaderState, RegionRoleState}; use crate::sst::FormatType; /// Options for [RegionManifestManager]. @@ -53,23 +54,10 @@ pub struct RegionManifestOptions { /// Options for updating `removed_files` field in [RegionManifest]. #[derive(Debug, Clone)] +#[cfg_attr(any(test, feature = "test"), derive(Default))] pub struct RemoveFileOptions { - /// Number of removed files to keep in manifest's `removed_files` field before also - /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached. - pub keep_count: usize, - /// Duration to keep removed files in manifest's `removed_files` field before also - /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached. - pub keep_ttl: std::time::Duration, -} - -#[cfg(any(test, feature = "test"))] -impl Default for RemoveFileOptions { - fn default() -> Self { - Self { - keep_count: 256, - keep_ttl: std::time::Duration::from_secs(3600), - } - } + /// Whether GC is enabled. If not, the removed files should always be empty when persisting manifest. + pub enable_gc: bool, } // rewrite note: @@ -144,6 +132,7 @@ pub struct RegionManifestManager { last_version: Arc, checkpointer: Checkpointer, manifest: Arc, + stats: ManifestStats, stopped: bool, } @@ -153,17 +142,17 @@ impl RegionManifestManager { metadata: RegionMetadataRef, flushed_entry_id: u64, options: RegionManifestOptions, - total_manifest_size: Arc, - manifest_version: Arc, sst_format: FormatType, + stats: &ManifestStats, ) -> Result { // construct storage let mut store = ManifestObjectStore::new( &options.manifest_dir, options.object_store.clone(), options.compress_type, - total_manifest_size, + stats.total_manifest_size.clone(), ); + let manifest_version = stats.manifest_version.clone(); info!( "Creating region manifest in {} with metadata {:?}, flushed_entry_id: {}", @@ -213,11 +202,15 @@ impl RegionManifestManager { let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION); manifest_version.store(version, Ordering::Relaxed); + manifest + .removed_files + .update_file_removed_cnt_to_stats(stats); Ok(Self { store, last_version: manifest_version, checkpointer, manifest: Arc::new(manifest), + stats: stats.clone(), stopped: false, }) } @@ -227,8 +220,7 @@ impl RegionManifestManager { /// Returns `Ok(None)` if no such manifest. pub async fn open( options: RegionManifestOptions, - total_manifest_size: Arc, - manifest_version: Arc, + stats: &ManifestStats, ) -> Result> { let _t = MANIFEST_OP_ELAPSED .with_label_values(&["open"]) @@ -239,8 +231,9 @@ impl RegionManifestManager { &options.manifest_dir, options.object_store.clone(), options.compress_type, - total_manifest_size, + stats.total_manifest_size.clone(), ); + let manifest_version = stats.manifest_version.clone(); // recover from storage // construct manifest builder @@ -314,11 +307,15 @@ impl RegionManifestManager { last_checkpoint_version, ); manifest_version.store(version, Ordering::Relaxed); + manifest + .removed_files + .update_file_removed_cnt_to_stats(stats); Ok(Some(Self { store, last_version: manifest_version, checkpointer, manifest: Arc::new(manifest), + stats: stats.clone(), stopped: false, })) } @@ -442,6 +439,9 @@ impl RegionManifestManager { ); let version = self.last_version(); + new_manifest + .removed_files + .update_file_removed_cnt_to_stats(&self.stats); self.manifest = Arc::new(new_manifest); let last_version = self.set_version(self.manifest.manifest_version); info!( @@ -469,6 +469,9 @@ impl RegionManifestManager { let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint); let manifest = builder.try_build()?; let last_version = self.set_version(manifest.manifest_version); + manifest + .removed_files + .update_file_removed_cnt_to_stats(&self.stats); self.manifest = Arc::new(manifest); info!( "Installed region manifest from checkpoint: {}, region: {}", @@ -523,6 +526,9 @@ impl RegionManifestManager { } } let new_manifest = manifest_builder.try_build()?; + new_manifest + .removed_files + .update_file_removed_cnt_to_stats(&self.stats); let updated_manifest = self .checkpointer .update_manifest_removed_files(new_manifest)?; @@ -534,6 +540,17 @@ impl RegionManifestManager { Ok(version) } + /// Clear deleted files from manifest's `removed_files` field without update version. Notice if datanode exit before checkpoint then new manifest by open region may still contain these deleted files, which is acceptable for gc process. + pub fn clear_deleted_files(&mut self, deleted_files: Vec) { + let mut manifest = (*self.manifest()).clone(); + manifest.removed_files.clear_deleted_files(deleted_files); + self.set_manifest(Arc::new(manifest)); + } + + pub(crate) fn set_manifest(&mut self, manifest: Arc) { + self.manifest = manifest; + } + /// Retrieves the current [RegionManifest]. pub fn manifest(&self) -> Arc { self.manifest.clone() @@ -923,6 +940,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_usage(); - assert_eq!(manifest_size, 1764); + assert_eq!(manifest_size, 1378); } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index a4b2c570e7..1e35ae1c06 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -474,7 +474,7 @@ lazy_static! { .unwrap(); /// Counter for the number of files deleted by the GC worker. - pub static ref GC_DEL_FILE_CNT: IntGauge = + pub static ref GC_DELETE_FILE_CNT: IntGauge = register_int_gauge!( "greptime_mito_gc_delete_file_count", "mito gc deleted file count", diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 76ff739351..3306754843 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -507,6 +507,7 @@ impl MitoRegion { let num_rows = version.ssts.num_rows() + version.memtables.num_rows(); let num_files = version.ssts.num_files(); let manifest_version = self.stats.manifest_version(); + let file_removed_cnt = self.stats.file_removed_cnt(); let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed); let written_bytes = self.written_bytes.load(Ordering::Relaxed); @@ -522,6 +523,7 @@ impl MitoRegion { manifest: RegionManifestInfo::Mito { manifest_version, flushed_entry_id, + file_removed_cnt, }, data_topic_latest_entry_id: topic_latest_entry_id, metadata_topic_latest_entry_id: topic_latest_entry_id, @@ -1171,9 +1173,10 @@ pub(crate) type CatchupRegionsRef = Arc; /// Manifest stats. #[derive(Default, Debug, Clone)] -pub(crate) struct ManifestStats { - total_manifest_size: Arc, - manifest_version: Arc, +pub struct ManifestStats { + pub(crate) total_manifest_size: Arc, + pub(crate) manifest_version: Arc, + pub(crate) file_removed_cnt: Arc, } impl ManifestStats { @@ -1184,6 +1187,10 @@ impl ManifestStats { fn manifest_version(&self) -> u64 { self.manifest_version.load(Ordering::Relaxed) } + + fn file_removed_cnt(&self) -> u64 { + self.file_removed_cnt.load(Ordering::Relaxed) + } } #[cfg(test)] @@ -1289,9 +1296,8 @@ mod tests { checkpoint_distance: 10, remove_file_options: Default::default(), }, - Default::default(), - Default::default(), FormatType::PrimaryKey, + &Default::default(), ) .await .unwrap(); @@ -1356,9 +1362,8 @@ mod tests { checkpoint_distance: 10, remove_file_options: Default::default(), }, - Default::default(), - Default::default(), FormatType::PrimaryKey, + &Default::default(), ) .await .unwrap(); diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 35b797363f..6616ba90b8 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -65,7 +65,7 @@ use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::FormatType; use crate::sst::file::RegionFileId; -use crate::sst::file_purger::{FilePurgerRef, create_local_file_purger}; +use crate::sst::file_purger::{FilePurgerRef, create_file_purger}; use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -279,9 +279,8 @@ impl RegionOpener { metadata.clone(), flushed_entry_id, region_manifest_options, - self.stats.total_manifest_size.clone(), - self.stats.manifest_version.clone(), sst_format, + &self.stats, ) .await?; @@ -322,7 +321,8 @@ impl RegionOpener { manifest_manager, RegionRoleState::Leader(RegionLeaderState::Writable), )), - file_purger: create_local_file_purger( + file_purger: create_file_purger( + config.gc.enable, self.purge_scheduler, access_layer, self.cache_manager, @@ -351,7 +351,7 @@ impl RegionOpener { let region = self .maybe_open(config, wal) .await? - .context(EmptyRegionDirSnafu { + .with_context(|| EmptyRegionDirSnafu { region_id, region_dir: ®ion_dir, })?; @@ -413,12 +413,8 @@ impl RegionOpener { &self.region_dir(), &self.object_store_manager, )?; - let Some(manifest_manager) = RegionManifestManager::open( - region_manifest_options, - self.stats.total_manifest_size.clone(), - self.stats.manifest_version.clone(), - ) - .await? + let Some(manifest_manager) = + RegionManifestManager::open(region_manifest_options, &self.stats).await? else { return Ok(None); }; @@ -459,7 +455,8 @@ impl RegionOpener { self.puffin_manager_factory.clone(), self.intermediate_manager.clone(), )); - let file_purger = create_local_file_purger( + let file_purger = create_file_purger( + config.gc.enable, self.purge_scheduler.clone(), access_layer.clone(), self.cache_manager.clone(), @@ -596,8 +593,7 @@ impl RegionOpener { compress_type: manifest_compress_type(config.compress_manifest), checkpoint_distance: config.manifest_checkpoint_distance, remove_file_options: RemoveFileOptions { - keep_count: config.experimental_manifest_keep_removed_file_count, - keep_ttl: config.experimental_manifest_keep_removed_file_ttl, + enable_gc: config.gc.enable, }, }) } @@ -688,12 +684,8 @@ impl RegionMetadataLoader { region_dir, &self.object_store_manager, )?; - let Some(manifest_manager) = RegionManifestManager::open( - region_manifest_options, - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - ) - .await? + let Some(manifest_manager) = + RegionManifestManager::open(region_manifest_options, &Default::default()).await? else { return Ok(None); }; diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 64e83c1a54..81a004ff15 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -80,15 +80,16 @@ pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool { /// only manages the file references without deleting the actual files. /// pub fn create_file_purger( + gc_enabled: bool, scheduler: SchedulerRef, sst_layer: AccessLayerRef, cache_manager: Option, file_ref_manager: FileReferenceManagerRef, ) -> FilePurgerRef { - if is_local_fs(&sst_layer) { - Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager)) - } else { + if gc_enabled && !is_local_fs(&sst_layer) { Arc::new(ObjectStoreFilePurger { file_ref_manager }) + } else { + Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager)) } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index baaa7fe343..8a78acd7bd 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -221,9 +221,9 @@ pub struct TestEnv { data_home: TempDir, intermediate_manager: IntermediateManager, puffin_manager: PuffinManagerFactory, - log_store: Option, + pub(crate) log_store: Option, log_store_factory: LogStoreFactory, - object_store_manager: Option, + pub(crate) object_store_manager: Option, schema_metadata_manager: SchemaMetadataManagerRef, file_ref_manager: FileReferenceManagerRef, kv_backend: KvBackendRef, @@ -287,7 +287,7 @@ impl TestEnv { self.object_store_manager.clone() } - async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine { + pub(crate) async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine { async fn create( zelf: &TestEnv, config: MitoConfig, @@ -541,30 +541,31 @@ impl TestEnv { /// Returns the log store and object store manager. async fn create_log_and_object_store_manager(&self) -> (LogStoreImpl, ObjectStoreManager) { + let log_store = self.create_log_store().await; + let object_store_manager = self.create_object_store_manager(); + + (log_store, object_store_manager) + } + + pub(crate) async fn create_log_store(&self) -> LogStoreImpl { let data_home = self.data_home.path(); let wal_path = data_home.join("wal"); - let object_store_manager = self.create_object_store_manager(); match &self.log_store_factory { LogStoreFactory::RaftEngine(factory) => { let log_store = factory.create_log_store(wal_path).await; - ( - LogStoreImpl::RaftEngine(Arc::new(log_store)), - object_store_manager, - ) + + LogStoreImpl::RaftEngine(Arc::new(log_store)) } LogStoreFactory::Kafka(factory) => { let log_store = factory.create_log_store().await; - ( - LogStoreImpl::Kafka(Arc::new(log_store)), - object_store_manager, - ) + LogStoreImpl::Kafka(Arc::new(log_store)) } } } - fn create_object_store_manager(&self) -> ObjectStoreManager { + pub(crate) fn create_object_store_manager(&self) -> ObjectStoreManager { let data_home = self.data_home.path(); let data_path = data_home.join("data").as_path().display().to_string(); let builder = Fs::default().root(&data_path); @@ -572,6 +573,12 @@ impl TestEnv { ObjectStoreManager::new("default", object_store) } + pub(crate) fn create_in_memory_object_store_manager(&self) -> ObjectStoreManager { + let builder = object_store::services::Memory::default(); + let object_store = ObjectStore::new(builder).unwrap().finish(); + ObjectStoreManager::new("memory", object_store) + } + /// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata` /// is `None`, opens an existing manifest and returns `None` if no such manifest. pub async fn create_manifest_manager( @@ -608,14 +615,13 @@ impl TestEnv { metadata, 0, manifest_opts, - Default::default(), - Default::default(), FormatType::PrimaryKey, + &Default::default(), ) .await .map(Some) } else { - RegionManifestManager::open(manifest_opts, Default::default(), Default::default()).await + RegionManifestManager::open(manifest_opts, &Default::default()).await } } diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 8e5b8b9434..712649b4d6 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -133,9 +133,8 @@ impl SchedulerEnv { checkpoint_distance: 10, remove_file_options: Default::default(), }, - Default::default(), - Default::default(), FormatType::PrimaryKey, + &Default::default(), ) .await .unwrap(), diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 1a19f68551..dacd78d258 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -204,7 +204,7 @@ impl From for GrantedRegion { /// The role of the region. /// TODO(weny): rename it to `RegionRoleState` -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum RegionRole { // Readonly region(mito2) Follower, @@ -497,6 +497,8 @@ pub enum RegionManifestInfo { Mito { manifest_version: u64, flushed_entry_id: u64, + /// Number of files removed in the manifest's `removed_files` field. + file_removed_cnt: u64, }, Metric { data_manifest_version: u64, @@ -508,10 +510,11 @@ pub enum RegionManifestInfo { impl RegionManifestInfo { /// Creates a new [RegionManifestInfo] for mito2 engine. - pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self { + pub fn mito(manifest_version: u64, flushed_entry_id: u64, file_removal_rate: u64) -> Self { Self::Mito { manifest_version, flushed_entry_id, + file_removed_cnt: file_removal_rate, } } @@ -604,6 +607,7 @@ impl Default for RegionManifestInfo { Self::Mito { manifest_version: 0, flushed_entry_id: 0, + file_removed_cnt: 0, } } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 2499958f00..629d2b35fe 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1539,8 +1539,8 @@ type = "time_series" [region_engine.mito.gc] enable = false -lingering_time = "5m" -unknown_file_lingering_time = "6h" +lingering_time = "1m" +unknown_file_lingering_time = "1h" max_concurrent_lister_per_gc_job = 32 max_concurrent_gc_job = 4