diff --git a/src/meta-srv/src/gc/options.rs b/src/meta-srv/src/gc/options.rs index 02ed25323a..ed47e13304 100644 --- a/src/meta-srv/src/gc/options.rs +++ b/src/meta-srv/src/gc/options.rs @@ -73,11 +73,11 @@ impl Default for GcSchedulerOptions { retry_backoff_duration: Duration::from_secs(5), region_gc_concurrency: 16, min_region_size_threshold: 100 * 1024 * 1024, // 100MB - sst_count_weight: 1.0, - file_removed_count_weight: 0.5, + sst_count_weight: 0.5, // more sst means could potentially remove more files, moderate priority + file_removed_count_weight: 1.0, // more file to be deleted, higher priority gc_cooldown_period: Duration::from_secs(60 * 5), // 5 minutes - regions_per_table_threshold: 20, // Select top 20 regions per table - mailbox_timeout: Duration::from_secs(60), // 60 seconds + regions_per_table_threshold: 20, // Select top 20 regions per table + mailbox_timeout: Duration::from_secs(60), // 60 seconds // Perform full file listing every 24 hours to find orphan files full_file_listing_interval: Duration::from_secs(60 * 60 * 24), // Clean up stale tracker entries every 6 hours diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index 3b5e988295..a190484248 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -249,6 +249,7 @@ impl LocalGcWorker { .entry(*region_id) .or_insert_with(HashSet::new) .extend(file_refs.clone()); + // no need to include manifest files here, as they are already included in region manifest } Ok(tmp_ref_files) @@ -328,15 +329,34 @@ impl LocalGcWorker { let region_id = region.region_id(); debug!("Doing gc for region {}", region_id); + + let manifest = region.manifest_ctx.manifest().await; + // If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use. + let file_ref_manifest_version = self + .file_ref_manifest + .manifest_version + .get(®ion.region_id()) + .cloned(); + if file_ref_manifest_version != Some(manifest.manifest_version) { + // should be rare enough(few seconds after leader update manifest version), just skip gc for this region + warn!( + "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region", + file_ref_manifest_version, + region.region_id(), + manifest.manifest_version + ); + return Ok(vec![]); + } + // do the time consuming listing only when full_file_listing is true // and do it first to make sure we have the latest manifest etc. let all_entries = if self.full_file_listing { - self.list_from_object_store(®ion).await? + self.list_from_object_store(region.region_id(), manifest.clone()) + .await? } else { vec![] }; - let manifest = region.manifest_ctx.manifest().await; let region_id = manifest.metadata.region_id; let current_files = &manifest.files; @@ -509,10 +529,12 @@ impl LocalGcWorker { /// List all files in the region directory. /// Returns a vector of all file entries found. /// This might take a long time if there are many files in the region directory. - async fn list_from_object_store(&self, region: &MitoRegionRef) -> Result> { + async fn list_from_object_store( + &self, + region_id: RegionId, + manifest: Arc, + ) -> Result> { let start = tokio::time::Instant::now(); - let region_id = region.region_id(); - let manifest = region.manifest_ctx.manifest().await; let current_files = &manifest.files; let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES) .max(1) diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index cdc3b8370e..b36895f919 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -82,14 +82,16 @@ impl FileReferenceManager { /// Gets all ref files for the given regions, meaning all open FileHandles for those regions /// and from related regions' manifests. + /// `query_regions_for_mem` queries for in memory file handles. + /// `related_regions_in_manifest` queries for related regions' manifests to get more file refs of given region ids. pub(crate) async fn get_snapshot_of_file_refs( &self, - query_regions: Vec, - related_regions: Vec<(MitoRegionRef, Vec)>, + query_regions_for_mem: Vec, + related_regions_in_manifest: Vec<(MitoRegionRef, Vec)>, ) -> Result { let mut ref_files = HashMap::new(); // get from in memory file handles - for region_id in query_regions.iter().map(|r| r.region_id()) { + for region_id in query_regions_for_mem.iter().map(|r| r.region_id()) { if let Some(files) = self.ref_file_set(region_id) { ref_files.insert(region_id, files); } @@ -97,13 +99,8 @@ impl FileReferenceManager { let mut manifest_version = HashMap::new(); - for r in &query_regions { - let manifest = r.manifest_ctx.manifest().await; - manifest_version.insert(r.region_id(), manifest.manifest_version); - } - // get file refs from related regions' manifests - for (related_region, queries) in &related_regions { + for (related_region, queries) in &related_regions_in_manifest { let queries = queries.iter().cloned().collect::>(); let manifest = related_region.manifest_ctx.manifest().await; for meta in manifest.files.values() { @@ -125,6 +122,18 @@ impl FileReferenceManager { manifest_version.insert(related_region.region_id(), manifest.manifest_version); } + for r in &query_regions_for_mem { + let manifest = r.manifest_ctx.manifest().await; + // remove in manifest files for smaller size, since gc worker read from manifest later. + ref_files.entry(r.region_id()).and_modify(|refs| { + *refs = std::mem::take(refs) + .into_iter() + .filter(|f| !manifest.files.contains_key(&f.file_id)) + .collect(); + }); + manifest_version.insert(r.region_id(), manifest.manifest_version); + } + // simply return all ref files, no manifest version filtering for now. Ok(FileRefsManifest { file_refs: ref_files,