refactor: gc job report enum

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-02-05 16:42:18 +08:00
parent 91c6be273c
commit 97e8a0f652
8 changed files with 240 additions and 100 deletions

View File

@@ -114,12 +114,27 @@ impl GcScheduler {
.await;
let duration = start_time.elapsed();
info!(
"Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
report.per_datanode_reports.len(),
report.failed_datanodes.len(),
duration
);
match &report {
GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
info!(
"Finished GC cycle. Processed {} datanodes ({} failed). Duration: {:?}",
per_datanode_reports.len(),
failed_datanodes.len(),
duration
);
}
GcJobReport::Combined { report } => {
info!(
"Finished GC cycle with combined report. Deleted files: {}, deleted indexes: {}. Duration: {:?}",
report.deleted_files.len(),
report.deleted_indexes.len(),
duration
);
}
}
debug!("Detailed GC Job Report: {report:#?}");
Ok(report)
@@ -196,7 +211,8 @@ impl GcScheduler {
force_full_listing_by_peer: HashMap<Peer, HashSet<RegionId>>,
region_routes_override_by_peer: HashMap<Peer, Region2Peers>,
) -> GcJobReport {
let mut report = GcJobReport::default();
let mut per_datanode_reports = HashMap::new();
let mut failed_datanodes: HashMap<_, Vec<_>> = HashMap::new();
// Create a stream of datanode GC tasks with limited concurrency
let results: Vec<_> = futures::stream::iter(
@@ -237,18 +253,21 @@ impl GcScheduler {
for (peer, result) in results {
match result {
Ok(dn_report) => {
report.per_datanode_reports.insert(peer.id, dn_report);
per_datanode_reports.insert(peer.id, dn_report);
}
Err(e) => {
error!(e; "Failed to process datanode GC for peer {}", peer);
// Note: We don't have a direct way to map peer to table_id here,
// so we just log the error. The table_reports will contain individual region failures.
report.failed_datanodes.entry(peer.id).or_default().push(e);
failed_datanodes.entry(peer.id).or_default().push(e);
}
}
}
report
GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
}
}
/// Process GC for a single datanode with all its candidate regions.

View File

@@ -34,8 +34,18 @@ async fn test_parallel_process_datanodes_empty() {
.parallel_process_datanodes(HashMap::new(), HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 0);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 0);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
}
#[tokio::test]
@@ -88,8 +98,18 @@ async fn test_parallel_process_datanodes_with_candidates() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
}
#[tokio::test]
@@ -140,16 +160,18 @@ async fn test_handle_tick() {
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes"
);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode");
assert_eq!(failed_datanodes.len(), 0, "Should have 0 failed datanodes");
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1);
assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1);

View File

@@ -100,8 +100,18 @@ async fn test_concurrent_table_processing_limits() {
.await;
// Should process all datanodes
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
}
#[tokio::test]
@@ -172,11 +182,21 @@ async fn test_datanode_processes_tables_with_partial_gc_failures() {
.await;
// Should have one datanode with mixed results
assert_eq!(report.per_datanode_reports.len(), 1);
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// also check one failed region (region2 has no GC report, so it should be in need_retry_regions)
let datanode_report = report.per_datanode_reports.values().next().unwrap();
assert_eq!(datanode_report.need_retry_regions.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
}
// Region Concurrency Tests
@@ -376,7 +396,15 @@ async fn test_region_gc_concurrency_with_partial_failures() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
let report = report.per_datanode_reports.get(&peer.id).unwrap();
let report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
..
} => per_datanode_reports.get(&peer.id).unwrap(),
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// Should have 3 successful and 3 failed regions
// Even regions (2, 4, 6) should succeed, odd regions (1, 3, 5) should fail
@@ -506,7 +534,15 @@ async fn test_region_gc_concurrency_with_retryable_errors() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
let report = report.per_datanode_reports.get(&peer.id).unwrap();
let report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
..
} => per_datanode_reports.get(&peer.id).unwrap(),
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// In the new implementation without retry logic, all regions should be processed
// The exact behavior depends on how the mock handles the regions

View File

@@ -93,19 +93,30 @@ async fn test_gc_regions_failure_handling() {
let report = scheduler.handle_tick().await.unwrap();
// Validate the report shows the failure handling
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode despite failure"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled via need_retry_regions)"
);
// Check the report shows the failure handling
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(
per_datanode_reports.len(),
1,
"Should process 1 datanode despite failure"
);
assert_eq!(
failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled via need_retry_regions)"
);
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// Check that the region is in need_retry_regions due to the failure
let datanode_report = report.per_datanode_reports.values().next().unwrap();
assert_eq!(
datanode_report.need_retry_regions.len(),
1,
@@ -180,19 +191,25 @@ async fn test_get_file_references_failure() {
// Validate the report shows the expected results
// In the new implementation, even if get_file_references fails, we still create a datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled gracefully)"
);
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode");
assert_eq!(
failed_datanodes.len(),
0,
"Should have 0 failed datanodes (failure handled gracefully)"
);
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// The region should be processed but may have empty results due to file refs failure
let datanode_report = report.per_datanode_reports.values().next().unwrap();
// The current implementation still processes the region even with file refs failure
// and creates an empty entry in deleted_files
assert!(

View File

@@ -86,19 +86,19 @@ async fn test_full_gc_workflow() {
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport - should have 1 datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have no failed datanodes"
);
// Get the datanode report
let datanode_report = report.per_datanode_reports.values().next().unwrap();
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode");
assert_eq!(failed_datanodes.len(), 0, "Should have no failed datanodes");
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// Check that the region was processed successfully
assert!(
@@ -216,19 +216,19 @@ async fn test_tracker_cleanup() {
let report = scheduler.handle_tick().await.unwrap();
// Validate the returned GcJobReport - should have 1 datanode report
assert_eq!(
report.per_datanode_reports.len(),
1,
"Should process 1 datanode"
);
assert_eq!(
report.failed_datanodes.len(),
0,
"Should have no failed datanodes"
);
// Get the datanode report
let datanode_report = report.per_datanode_reports.values().next().unwrap();
let datanode_report = match &report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1, "Should process 1 datanode");
assert_eq!(failed_datanodes.len(), 0, "Should have no failed datanodes");
per_datanode_reports.values().next().unwrap()
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
};
// Check that the region was processed successfully
assert!(

View File

@@ -73,8 +73,18 @@ async fn test_empty_file_refs_manifest() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
// Should handle empty file refs gracefully
}
@@ -150,6 +160,16 @@ async fn test_multiple_regions_per_table() {
.parallel_process_datanodes(datanode_to_candidates, HashMap::new(), HashMap::new())
.await;
assert_eq!(report.per_datanode_reports.len(), 1);
assert_eq!(report.failed_datanodes.len(), 0);
match report {
crate::gc::scheduler::GcJobReport::PerDatanode {
per_datanode_reports,
failed_datanodes,
} => {
assert_eq!(per_datanode_reports.len(), 1);
assert_eq!(failed_datanodes.len(), 0);
}
crate::gc::scheduler::GcJobReport::Combined { .. } => {
panic!("expected per-datanode report");
}
}
}

View File

@@ -39,10 +39,46 @@ use crate::metrics::{
use crate::service::mailbox::MailboxRef;
/// Report for a GC job.
#[derive(Debug, Default)]
pub struct GcJobReport {
pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
#[derive(Debug)]
pub enum GcJobReport {
PerDatanode {
per_datanode_reports: HashMap<DatanodeId, GcReport>,
failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
},
Combined {
report: GcReport,
},
}
impl Default for GcJobReport {
fn default() -> Self {
Self::PerDatanode {
per_datanode_reports: HashMap::new(),
failed_datanodes: HashMap::new(),
}
}
}
impl GcJobReport {
pub fn combined(report: GcReport) -> Self {
Self::Combined { report }
}
pub fn merge_to_report(self) -> GcReport {
match self {
GcJobReport::Combined { report } => report,
GcJobReport::PerDatanode {
per_datanode_reports,
..
} => {
let mut combined = GcReport::default();
for (_datanode_id, report) in per_datanode_reports {
combined.merge(report);
}
combined
}
}
}
}
/// [`Event`] represents various types of events that can be processed by the gc ticker.
@@ -207,7 +243,7 @@ impl GcScheduler {
// Empty regions list, return empty report
if regions.is_empty() {
info!("Finished manual gc request");
return Ok(GcJobReport::default());
return Ok(GcJobReport::combined(GcReport::default()));
}
let full_listing = full_file_listing.unwrap_or(false);
@@ -259,12 +295,7 @@ impl GcScheduler {
combined_report.merge(report);
}
let mut per_datanode_reports = HashMap::new();
per_datanode_reports.insert(0, combined_report);
let report = GcJobReport {
per_datanode_reports,
failed_datanodes: HashMap::new(),
};
let report = GcJobReport::combined(combined_report);
info!("Finished manual gc request");
Ok(report)

View File

@@ -385,12 +385,7 @@ impl Metasrv {
}
fn gc_job_report_to_gc_report(job_report: crate::gc::GcJobReport) -> store_api::storage::GcReport {
// Merge all datanode reports into a single GcReport
let mut gc_report = store_api::storage::GcReport::default();
for (_datanode_id, report) in job_report.per_datanode_reports {
gc_report.merge(report);
}
gc_report
job_report.merge_to_report()
}
fn gc_report_to_response(report: &store_api::storage::GcReport) -> GcResponse {