From 97e8a0f652175b5c49bc8722a6cc1173127c8c6b Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 5 Feb 2026 16:42:18 +0800 Subject: [PATCH] refactor: gc job report enum Signed-off-by: discord9 --- src/meta-srv/src/gc/handler.rs | 39 ++++++++++++---- src/meta-srv/src/gc/mock/basic.rs | 50 ++++++++++++++------ src/meta-srv/src/gc/mock/concurrent.rs | 50 +++++++++++++++++--- src/meta-srv/src/gc/mock/err_handle.rs | 61 ++++++++++++++++--------- src/meta-srv/src/gc/mock/integration.rs | 52 ++++++++++----------- src/meta-srv/src/gc/mock/misc.rs | 28 ++++++++++-- src/meta-srv/src/gc/scheduler.rs | 53 ++++++++++++++++----- src/meta-srv/src/service/procedure.rs | 7 +-- 8 files changed, 240 insertions(+), 100 deletions(-) diff --git a/src/meta-srv/src/gc/handler.rs b/src/meta-srv/src/gc/handler.rs index ccbfd00f6d..105ddca58c 100644 --- a/src/meta-srv/src/gc/handler.rs +++ b/src/meta-srv/src/gc/handler.rs @@ -114,12 +114,27 @@ impl GcScheduler { .await; let duration = start_time.elapsed(); - info!( - "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}", - report.per_datanode_reports.len(), - report.failed_datanodes.len(), - duration - ); + match &report { + GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + info!( + "Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}", + per_datanode_reports.len(), + failed_datanodes.len(), + duration + ); + } + GcJobReport::Combined { report } => { + info!( + "Finished GC cycle with combined report. Deleted files: {}, deleted indexes: {}. Duration: {:?}", + report.deleted_files.len(), + report.deleted_indexes.len(), + duration + ); + } + } debug!("Detailed GC Job Report: {report:#?}"); Ok(report) @@ -196,7 +211,8 @@ impl GcScheduler { force_full_listing_by_peer: HashMap>, region_routes_override_by_peer: HashMap, ) -> GcJobReport { - let mut report = GcJobReport::default(); + let mut per_datanode_reports = HashMap::new(); + let mut failed_datanodes: HashMap<_, Vec<_>> = HashMap::new(); // Create a stream of datanode GC tasks with limited concurrency let results: Vec<_> = futures::stream::iter( @@ -237,18 +253,21 @@ impl GcScheduler { for (peer, result) in results { match result { Ok(dn_report) => { - report.per_datanode_reports.insert(peer.id, dn_report); + per_datanode_reports.insert(peer.id, dn_report); } Err(e) => { error!(e; "Failed to process datanode GC for peer {}", peer); // Note: We don't have a direct way to map peer to table_id here, // so we just log the error. The table_reports will contain individual region failures. - report.failed_datanodes.entry(peer.id).or_default().push(e); + failed_datanodes.entry(peer.id).or_default().push(e); } } } - report + GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } } /// Process GC for a single datanode with all its candidate regions. diff --git a/src/meta-srv/src/gc/mock/basic.rs b/src/meta-srv/src/gc/mock/basic.rs index f0455568bf..fb395d899e 100644 --- a/src/meta-srv/src/gc/mock/basic.rs +++ b/src/meta-srv/src/gc/mock/basic.rs @@ -34,8 +34,18 @@ async fn test_parallel_process_datanodes_empty() { .parallel_process_datanodes(HashMap::new(), HashMap::new(), HashMap::new()) .await; - assert_eq!(report.per_datanode_reports.len(), 0); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 0); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } } #[tokio::test] @@ -88,8 +98,18 @@ async fn test_parallel_process_datanodes_with_candidates() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - assert_eq!(report.per_datanode_reports.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } } #[tokio::test] @@ -140,16 +160,18 @@ async fn test_handle_tick() { let report = scheduler.handle_tick().await.unwrap(); // Validate the returned GcJobReport - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have 0 failed datanodes" - ); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode"); + assert_eq!(failed_datanodes.len(), 0, "Should have 0 failed datanodes"); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1); assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1); diff --git a/src/meta-srv/src/gc/mock/concurrent.rs b/src/meta-srv/src/gc/mock/concurrent.rs index 40cbb15168..2554c7046b 100644 --- a/src/meta-srv/src/gc/mock/concurrent.rs +++ b/src/meta-srv/src/gc/mock/concurrent.rs @@ -100,8 +100,18 @@ async fn test_concurrent_table_processing_limits() { .await; // Should process all datanodes - assert_eq!(report.per_datanode_reports.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } } #[tokio::test] @@ -172,11 +182,21 @@ async fn test_datanode_processes_tables_with_partial_gc_failures() { .await; // Should have one datanode with mixed results - assert_eq!(report.per_datanode_reports.len(), 1); + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // also check one failed region (region2 has no GC report, so it should be in need_retry_regions) - let datanode_report = report.per_datanode_reports.values().next().unwrap(); assert_eq!(datanode_report.need_retry_regions.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); } // Region Concurrency Tests @@ -376,7 +396,15 @@ async fn test_region_gc_concurrency_with_partial_failures() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - let report = report.per_datanode_reports.get(&peer.id).unwrap(); + let report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + .. + } => per_datanode_reports.get(&peer.id).unwrap(), + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // Should have 3 successful and 3 failed regions // Even regions (2, 4, 6) should succeed, odd regions (1, 3, 5) should fail @@ -506,7 +534,15 @@ async fn test_region_gc_concurrency_with_retryable_errors() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - let report = report.per_datanode_reports.get(&peer.id).unwrap(); + let report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + .. + } => per_datanode_reports.get(&peer.id).unwrap(), + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // In the new implementation without retry logic, all regions should be processed // The exact behavior depends on how the mock handles the regions diff --git a/src/meta-srv/src/gc/mock/err_handle.rs b/src/meta-srv/src/gc/mock/err_handle.rs index 77671fd177..0dd9b3d115 100644 --- a/src/meta-srv/src/gc/mock/err_handle.rs +++ b/src/meta-srv/src/gc/mock/err_handle.rs @@ -93,19 +93,30 @@ async fn test_gc_regions_failure_handling() { let report = scheduler.handle_tick().await.unwrap(); // Validate the report shows the failure handling - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode despite failure" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have 0 failed datanodes (failure handled via need_retry_regions)" - ); + // Check the report shows the failure handling + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!( + per_datanode_reports.len(), + 1, + "Should process 1 datanode despite failure" + ); + assert_eq!( + failed_datanodes.len(), + 0, + "Should have 0 failed datanodes (failure handled via need_retry_regions)" + ); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // Check that the region is in need_retry_regions due to the failure - let datanode_report = report.per_datanode_reports.values().next().unwrap(); assert_eq!( datanode_report.need_retry_regions.len(), 1, @@ -180,19 +191,25 @@ async fn test_get_file_references_failure() { // Validate the report shows the expected results // In the new implementation, even if get_file_references fails, we still create a datanode report - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have 0 failed datanodes (failure handled gracefully)" - ); + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode"); + assert_eq!( + failed_datanodes.len(), + 0, + "Should have 0 failed datanodes (failure handled gracefully)" + ); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // The region should be processed but may have empty results due to file refs failure - let datanode_report = report.per_datanode_reports.values().next().unwrap(); // The current implementation still processes the region even with file refs failure // and creates an empty entry in deleted_files assert!( diff --git a/src/meta-srv/src/gc/mock/integration.rs b/src/meta-srv/src/gc/mock/integration.rs index 36c21e0770..8aa8f977ad 100644 --- a/src/meta-srv/src/gc/mock/integration.rs +++ b/src/meta-srv/src/gc/mock/integration.rs @@ -86,19 +86,19 @@ async fn test_full_gc_workflow() { let report = scheduler.handle_tick().await.unwrap(); // Validate the returned GcJobReport - should have 1 datanode report - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have no failed datanodes" - ); - - // Get the datanode report - let datanode_report = report.per_datanode_reports.values().next().unwrap(); + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode"); + assert_eq!(failed_datanodes.len(), 0, "Should have no failed datanodes"); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // Check that the region was processed successfully assert!( @@ -216,19 +216,19 @@ async fn test_tracker_cleanup() { let report = scheduler.handle_tick().await.unwrap(); // Validate the returned GcJobReport - should have 1 datanode report - assert_eq!( - report.per_datanode_reports.len(), - 1, - "Should process 1 datanode" - ); - assert_eq!( - report.failed_datanodes.len(), - 0, - "Should have no failed datanodes" - ); - - // Get the datanode report - let datanode_report = report.per_datanode_reports.values().next().unwrap(); + let datanode_report = match &report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode"); + assert_eq!(failed_datanodes.len(), 0, "Should have no failed datanodes"); + per_datanode_reports.values().next().unwrap() + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + }; // Check that the region was processed successfully assert!( diff --git a/src/meta-srv/src/gc/mock/misc.rs b/src/meta-srv/src/gc/mock/misc.rs index 5aa7256edd..76d14136e4 100644 --- a/src/meta-srv/src/gc/mock/misc.rs +++ b/src/meta-srv/src/gc/mock/misc.rs @@ -73,8 +73,18 @@ async fn test_empty_file_refs_manifest() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - assert_eq!(report.per_datanode_reports.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } // Should handle empty file refs gracefully } @@ -150,6 +160,16 @@ async fn test_multiple_regions_per_table() { .parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new()) .await; - assert_eq!(report.per_datanode_reports.len(), 1); - assert_eq!(report.failed_datanodes.len(), 0); + match report { + crate::gc::scheduler::GcJobReport::PerDatanode { + per_datanode_reports, + failed_datanodes, + } => { + assert_eq!(per_datanode_reports.len(), 1); + assert_eq!(failed_datanodes.len(), 0); + } + crate::gc::scheduler::GcJobReport::Combined { .. } => { + panic!("expected per-datanode report"); + } + } } diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs index a30ff5578e..dedaab6747 100644 --- a/src/meta-srv/src/gc/scheduler.rs +++ b/src/meta-srv/src/gc/scheduler.rs @@ -39,10 +39,46 @@ use crate::metrics::{ use crate::service::mailbox::MailboxRef; /// Report for a GC job. -#[derive(Debug, Default)] -pub struct GcJobReport { - pub per_datanode_reports: HashMap, - pub failed_datanodes: HashMap>, +#[derive(Debug)] +pub enum GcJobReport { + PerDatanode { + per_datanode_reports: HashMap, + failed_datanodes: HashMap>, + }, + Combined { + report: GcReport, + }, +} + +impl Default for GcJobReport { + fn default() -> Self { + Self::PerDatanode { + per_datanode_reports: HashMap::new(), + failed_datanodes: HashMap::new(), + } + } +} + +impl GcJobReport { + pub fn combined(report: GcReport) -> Self { + Self::Combined { report } + } + + pub fn merge_to_report(self) -> GcReport { + match self { + GcJobReport::Combined { report } => report, + GcJobReport::PerDatanode { + per_datanode_reports, + .. + } => { + let mut combined = GcReport::default(); + for (_datanode_id, report) in per_datanode_reports { + combined.merge(report); + } + combined + } + } + } } /// [`Event`] represents various types of events that can be processed by the gc ticker. @@ -207,7 +243,7 @@ impl GcScheduler { // Empty regions list, return empty report if regions.is_empty() { info!("Finished manual gc request"); - return Ok(GcJobReport::default()); + return Ok(GcJobReport::combined(GcReport::default())); } let full_listing = full_file_listing.unwrap_or(false); @@ -259,12 +295,7 @@ impl GcScheduler { combined_report.merge(report); } - let mut per_datanode_reports = HashMap::new(); - per_datanode_reports.insert(0, combined_report); - let report = GcJobReport { - per_datanode_reports, - failed_datanodes: HashMap::new(), - }; + let report = GcJobReport::combined(combined_report); info!("Finished manual gc request"); Ok(report) diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 269dfd7d1d..e09f0ec1bc 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -385,12 +385,7 @@ impl Metasrv { } fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::storage::GcReport { - // Merge all datanode reports into a single GcReport - let mut gc_report = store_api::storage::GcReport::default(); - for (_datanode_id, report) in job_report.per_datanode_reports { - gc_report.merge(report); - } - gc_report + job_report.merge_to_report() } fn gc_report_to_response(report: &store_api::storage::GcReport) -> GcResponse {