feat: adjust some args to gc worker (#7469)

* chore: less stuff sent

Signed-off-by: discord9 <discord9@163.com>

* after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* fix: clarify comment on manifest file removal for GC worker

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-01-06 15:37:05 +08:00
committed by GitHub
parent 5162c1de4d
commit 6f86a22e6f
3 changed files with 49 additions and 18 deletions

View File

@@ -73,11 +73,11 @@ impl Default for GcSchedulerOptions {
retry_backoff_duration: Duration::from_secs(5),
region_gc_concurrency: 16,
min_region_size_threshold: 100 * 1024 * 1024, // 100MB
sst_count_weight: 1.0,
file_removed_count_weight: 0.5,
sst_count_weight: 0.5, // more sst means could potentially remove more files, moderate priority
file_removed_count_weight: 1.0, // more file to be deleted, higher priority
gc_cooldown_period: Duration::from_secs(60 * 5), // 5 minutes
regions_per_table_threshold: 20, // Select top 20 regions per table
mailbox_timeout: Duration::from_secs(60), // 60 seconds
regions_per_table_threshold: 20, // Select top 20 regions per table
mailbox_timeout: Duration::from_secs(60), // 60 seconds
// Perform full file listing every 24 hours to find orphan files
full_file_listing_interval: Duration::from_secs(60 * 60 * 24),
// Clean up stale tracker entries every 6 hours

View File

@@ -249,6 +249,7 @@ impl LocalGcWorker {
.entry(*region_id)
.or_insert_with(HashSet::new)
.extend(file_refs.clone());
// no need to include manifest files here, as they are already included in region manifest
}
Ok(tmp_ref_files)
@@ -328,15 +329,34 @@ impl LocalGcWorker {
let region_id = region.region_id();
debug!("Doing gc for region {}", region_id);
let manifest = region.manifest_ctx.manifest().await;
// If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
let file_ref_manifest_version = self
.file_ref_manifest
.manifest_version
.get(&region.region_id())
.cloned();
if file_ref_manifest_version != Some(manifest.manifest_version) {
// should be rare enough(few seconds after leader update manifest version), just skip gc for this region
warn!(
"Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
file_ref_manifest_version,
region.region_id(),
manifest.manifest_version
);
return Ok(vec![]);
}
// do the time consuming listing only when full_file_listing is true
// and do it first to make sure we have the latest manifest etc.
let all_entries = if self.full_file_listing {
self.list_from_object_store(&region).await?
self.list_from_object_store(region.region_id(), manifest.clone())
.await?
} else {
vec![]
};
let manifest = region.manifest_ctx.manifest().await;
let region_id = manifest.metadata.region_id;
let current_files = &manifest.files;
@@ -509,10 +529,12 @@ impl LocalGcWorker {
/// List all files in the region directory.
/// Returns a vector of all file entries found.
/// This might take a long time if there are many files in the region directory.
async fn list_from_object_store(&self, region: &MitoRegionRef) -> Result<Vec<Entry>> {
async fn list_from_object_store(
&self,
region_id: RegionId,
manifest: Arc<RegionManifest>,
) -> Result<Vec<Entry>> {
let start = tokio::time::Instant::now();
let region_id = region.region_id();
let manifest = region.manifest_ctx.manifest().await;
let current_files = &manifest.files;
let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
.max(1)

View File

@@ -82,14 +82,16 @@ impl FileReferenceManager {
/// Gets all ref files for the given regions, meaning all open FileHandles for those regions
/// and from related regions' manifests.
/// `query_regions_for_mem` queries for in memory file handles.
/// `related_regions_in_manifest` queries for related regions' manifests to get more file refs of given region ids.
pub(crate) async fn get_snapshot_of_file_refs(
&self,
query_regions: Vec<MitoRegionRef>,
related_regions: Vec<(MitoRegionRef, Vec<RegionId>)>,
query_regions_for_mem: Vec<MitoRegionRef>,
related_regions_in_manifest: Vec<(MitoRegionRef, Vec<RegionId>)>,
) -> Result<FileRefsManifest> {
let mut ref_files = HashMap::new();
// get from in memory file handles
for region_id in query_regions.iter().map(|r| r.region_id()) {
for region_id in query_regions_for_mem.iter().map(|r| r.region_id()) {
if let Some(files) = self.ref_file_set(region_id) {
ref_files.insert(region_id, files);
}
@@ -97,13 +99,8 @@ impl FileReferenceManager {
let mut manifest_version = HashMap::new();
for r in &query_regions {
let manifest = r.manifest_ctx.manifest().await;
manifest_version.insert(r.region_id(), manifest.manifest_version);
}
// get file refs from related regions' manifests
for (related_region, queries) in &related_regions {
for (related_region, queries) in &related_regions_in_manifest {
let queries = queries.iter().cloned().collect::<HashSet<_>>();
let manifest = related_region.manifest_ctx.manifest().await;
for meta in manifest.files.values() {
@@ -125,6 +122,18 @@ impl FileReferenceManager {
manifest_version.insert(related_region.region_id(), manifest.manifest_version);
}
for r in &query_regions_for_mem {
let manifest = r.manifest_ctx.manifest().await;
// remove in manifest files for smaller size, since gc worker read from manifest later.
ref_files.entry(r.region_id()).and_modify(|refs| {
*refs = std::mem::take(refs)
.into_iter()
.filter(|f| !manifest.files.contains_key(&f.file_id))
.collect();
});
manifest_version.insert(r.region_id(), manifest.manifest_version);
}
// simply return all ref files, no manifest version filtering for now.
Ok(FileRefsManifest {
file_refs: ref_files,