fix: gc listing op first (#7385)

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-12-11 11:25:05 +08:00
committed by GitHub
parent 276f6bf026
commit a26dee0ca1

View File

@@ -287,6 +287,14 @@ impl LocalGcWorker {
let region_id = region.region_id(); let region_id = region.region_id();
debug!("Doing gc for region {}", region_id); debug!("Doing gc for region {}", region_id);
// do the time consuming listing only when full_file_listing is true
// and do it first to make sure we have the latest manifest etc.
let all_entries = if self.full_file_listing {
self.list_from_object_store(&region).await?
} else {
vec![]
};
let manifest = region.manifest_ctx.manifest().await; let manifest = region.manifest_ctx.manifest().await;
let region_id = manifest.metadata.region_id; let region_id = manifest.metadata.region_id;
let current_files = &manifest.files; let current_files = &manifest.files;
@@ -303,10 +311,6 @@ impl LocalGcWorker {
.map(|s| s.len()) .map(|s| s.len())
.sum::<usize>(); .sum::<usize>();
let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
.max(1)
.min(self.opt.max_concurrent_lister_per_gc_job);
let in_used: HashSet<FileId> = current_files let in_used: HashSet<FileId> = current_files
.keys() .keys()
.cloned() .cloned()
@@ -314,7 +318,7 @@ impl LocalGcWorker {
.collect(); .collect();
let unused_files = self let unused_files = self
.list_to_be_deleted_files(region_id, &in_used, recently_removed_files, concurrency) .list_to_be_deleted_files(region_id, &in_used, recently_removed_files, all_entries)
.await?; .await?;
let unused_file_cnt = unused_files.len(); let unused_file_cnt = unused_files.len();
@@ -442,6 +446,32 @@ impl LocalGcWorker {
Ok(listers) Ok(listers)
} }
/// List all files in the region directory.
/// Returns a vector of all file entries found.
/// This might take a long time if there are many files in the region directory.
async fn list_from_object_store(&self, region: &MitoRegionRef) -> Result<Vec<Entry>> {
let start = tokio::time::Instant::now();
let region_id = region.region_id();
let manifest = region.manifest_ctx.manifest().await;
let current_files = &manifest.files;
let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
.max(1)
.min(self.opt.max_concurrent_lister_per_gc_job);
let listers = self.partition_region_files(region_id, concurrency).await?;
let lister_cnt = listers.len();
// Step 2: Concurrently list all files in the region directory
let all_entries = self.list_region_files_concurrent(listers).await?;
let cnt = all_entries.len();
info!(
"gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
start.elapsed().as_secs_f64(),
region_id
);
Ok(all_entries)
}
/// Concurrently list all files in the region directory using the provided listers. /// Concurrently list all files in the region directory using the provided listers.
/// Returns a vector of all file entries found across all partitions. /// Returns a vector of all file entries found across all partitions.
async fn list_region_files_concurrent( async fn list_region_files_concurrent(
@@ -572,9 +602,8 @@ impl LocalGcWorker {
region_id: RegionId, region_id: RegionId,
in_used: &HashSet<FileId>, in_used: &HashSet<FileId>,
recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>, recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
concurrency: usize, all_entries: Vec<Entry>,
) -> Result<Vec<FileId>> { ) -> Result<Vec<FileId>> {
let start = tokio::time::Instant::now();
let now = chrono::Utc::now(); let now = chrono::Utc::now();
let may_linger_until = self let may_linger_until = self
.opt .opt
@@ -629,8 +658,7 @@ impl LocalGcWorker {
.collect(); .collect();
info!( info!(
"gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest", "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
start.elapsed().as_secs_f64(),
region_id, region_id,
files_to_delete.len() files_to_delete.len()
); );
@@ -638,15 +666,7 @@ impl LocalGcWorker {
return Ok(files_to_delete); return Ok(files_to_delete);
} }
// Full file listing mode: perform expensive list operations to find orphan files // Full file listing mode: get the full list of files from object store
// Step 1: Create partitioned listers for concurrent processing
let listers = self.partition_region_files(region_id, concurrency).await?;
let lister_cnt = listers.len();
// Step 2: Concurrently list all files in the region directory
let all_entries = self.list_region_files_concurrent(listers).await?;
let cnt = all_entries.len();
// Step 3: Filter files to determine which ones can be deleted // Step 3: Filter files to determine which ones can be deleted
let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
@@ -658,12 +678,6 @@ impl LocalGcWorker {
unknown_file_may_linger_until, unknown_file_may_linger_until,
); );
info!(
"gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete",
start.elapsed().as_secs_f64(),
region_id,
all_unused_files_ready_for_delete.len()
);
debug!("All in exist linger files: {:?}", all_in_exist_linger_files); debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
Ok(all_unused_files_ready_for_delete) Ok(all_unused_files_ready_for_delete)